时间:2021-07-01

1 · 2017年01月13日 小小的建议 没缩进不能看,不如贴gist 单纯的adb的封装有很多人写了 比如我(#厚脸皮) https://github.com/264768502/adb_wrapper 比如这贴: https://testerhome.com/topics/6938 如果要处理UI的话,其实有现成的,比如pyuiautomator或者Appium 96 yaboandriod · #2 · 2017年01月16日 controller代码 import xml.etree.ElementTree as ET import os import sys import subprocess as sp import time import logging import re import codecs import datetime import ui_hooks import socket import threading WINDOW_FACTOR = [index, text, resource-id, class, package, visible, content-desc, checkable, checked, clickable, enabled, focusable, focused, scrollable, long-clickable, password, bottom, selected, bounds] RETRY_MAX_TIME = 3 def get_data(address): result={} temp = ‘‘ f1 = open(address,r) temp = f1.read() result = eval(temp) f1.close() return result class Operator: def __init__(self,device_id=‘‘,log_class=None,log_path=‘‘,**kwargs): self.device_id = device_id self.log_class = log_class self.log_path = log_path self.current_path = os.path.dirname(__file__) self.home_dir = os.path.expanduser(~) if not os.path.isdir(self.log_path): try: os.makedirs(log_path) except OSError: pass self.failure_pic_location = os.path.join(self.log_path,screenshots) if not os.path.isdir(self.failure_pic_location): try: os.makedirs(self.failure_pic_location) except OSError: pass self.temporary = os.path.join(self.log_path,temp) if not os.path.isdir(self.temporary): try: os.makedirs(self.temporary) except OSError: pass self.hooks = ui_hooks.Hooks(self.device_id) def get_instance(self): op = Operator(self.device_id,self.log_class,self.log_path) return op def decode_utf(self,var): utype = codecs.lookup("utf-8") return utype.decode(var)[0] def update_dictory(self,dictory): keys_list = dictory.keys() values_list = dictory.values() temp={} for i in range(len(keys_list)): temp[keys_list[i]] = self.decode_utf(values_list[i]) return temp def search(self,**kwargs): global paras_dictory,retry_time temp = kwargs paras_dictory = self.update_dictory(temp) if paras_dictory.has_key("retry_time"): retry_time = paras_dictory["retry_time"] else: retry_time = None return self.judge(paras_dictory,retry_time) def judge(self,dic,retry_time): timer = 0 global xml_path xml_path = self.get_xml() if xml_path is None: print "Error : can not generate window dump file." panel = None return panel else: self.native_unlock_screen() if retry_time is not None: while 1: if timer < int(retry_time): if self.parse_window_dump(paras_dictory) is None: print "Warnning : there is no view or many views match your require, retry again!" self.little_swipe() time.sleep(1) timer+=1 self.judge(paras_dictory,retry_time) else: op = self.get_instance() panel = Panel(op) return panel break else: panel = None return panel else: if self.parse_window_dump(paras_dictory) is None: print "Warnning : there is no view or many views match your require, start to search hooks library...!" self.hooks.get_generators(self.get_all_nodes()) status = self.hooks.parse() # if status is True, it means find corresponding frame and click next button to skip this frame if status: time.sleep(1) self.judge(paras_dictory,retry_time).click() else: panel = None return panel else: op = self.get_instance() panel = Panel(op) return panel def get_intents_dictory(self): bluesea_dir = os.path.abspath(os.path.join(self.current_path,os.pardir)) app_intents_file_path = os.path.join(bluesea_dir,repository/intent_list.cfg) return get_data(app_intents_file_path) def hardware(self,**kwargs): executor = self.get_instance() dut = Device(executor) return dut def get_all_nodes(self): parser = ET.parse(xml_path) root = parser.getroot() node_instance_list = root.iter(node) return node_instance_list def get_scrollable_view(self): for ins in self.get_all_nodes(): if ins.attrib["scrollable"] == "true": bound = ins.get("bounds") break else: bound = None return bound def native_is_screen_lock(self): is_screen_lock = False for ins in self.get_all_nodes(): id_collector = ins.attrib[resource-id] if "id/lock_icon" in id_collector.strip(\n): is_screen_lock = True break return is_screen_lock def parse_window_dump(self,parametres=None): r1 = r[A-Za-z]+:id/(\w+) bound = None p_counter = [] bound_list = [] counter_id = 0 counter_index = 0 counter_desc = 0 counter_pack = 0 counter_text = 0 counter_enabled = 0 false_flag = 0 if parametres.has_key(id): view_id = parametres[id] p_counter.append(view_id) if parametres.has_key(text): view_text = parametres[text] p_counter.append(view_text) if parametres.has_key(description): view_description = parametres[description] p_counter.append(view_description) if parametres.has_key(index): view_index = parametres[index] p_counter.append(view_index) if parametres.has_key(package): view_package = parametres[package] p_counter.append(view_package) if parametres.has_key(enabled): view_enabled = parametres[enabled] p_counter.append(view_enabled) if len(p_counter) == 0: print "Error : invalid input, there is no useful parameter given!!!" sys.exit(-1) for factor in self.get_all_nodes(): id_collector = factor.attrib[resource-id] text_collector = factor.attrib[text] index_collector = factor.attrib[index] desc_collector = factor.attrib[content-desc] pack_collector = factor.attrib[package] enabled_collector = factor.attrib[enabled] for your_input in p_counter: id_strs = re.findall(r1,id_collector.strip(\n)) id_string = id_strs if not id_strs == []: id_string=id_strs[0] else: id_string = id_collector.strip(\n\r) if your_input == id_string: bound_id = factor.get("bounds") counter_id +=1 if your_input == text_collector.strip(\n): bound_text = factor.get("bounds") counter_text +=1 if your_input == index_collector.strip(\n): bound_index = factor.get("bounds") counter_index +=1 if your_input == desc_collector.strip(\n): bound_desc = factor.get("bounds") counter_desc +=1 if your_input == pack_collector.strip(\n): bound_pack = factor.get("bounds") counter_pack +=1 if your_input == enabled_collector.strip(\n): bound_enabled = factor.get("bounds") counter_enabled +=1 if counter_id == 0: false_flag += 1 if counter_index == 0: false_flag += 1 if counter_pack == 0: false_flag += 1 if counter_desc == 0: false_flag += 1 if counter_text == 0: false_flag += 1 if counter_enabled == 0: false_flag += 1 if counter_id == 1: bound_list.append(bound_id) if counter_text == 1: bound_list.append(bound_text) if counter_desc == 1: bound_list.append(bound_desc) if counter_pack == 1: bound_list.append(bound_pack) if counter_index == 1: bound_list.append(bound_index) if counter_enabled == 1: bound_list.append(bound_enabled) if false_flag <= 6 - len(p_counter): if len(bound_list) == 0: bound = None elif len(bound_list) == 1: bound = bound_list[0] elif len(bound_list) == 2: temp_bound = bound_list[0] if bound_list[1] == temp_bound: bound = temp_bound else: bound = None elif len(bound_list) > 2: temp_bound = bound_list[0] for bo in range(1,len(bound_list)): if bound_list[bo] != temp_bound: bound = None break else: bound = temp_bound else: bound = None return bound def get_xml(self): retry_count = 0 while 1: if retry_count < RETRY_MAX_TIME: self.adb_root() shell_cmd = self.adb_shell() dump_xml_command = shell_cmd + uiautomator dump /sdcard/window_dump.xml self.execute_command(dump_xml_command).wait() xml_file_on_device_path = /sdcard/window_dump.xml xml_file_on_server_path = self.temporary xml_file_origin = os.path.join(xml_file_on_server_path,window_dump.xml) xml_new_name = self.device_id + _ + window_dump.xml xml_file = os.path.join(xml_file_on_server_path,xml_new_name) get_xml_command = adb -s + self.device_id + pull + xml_file_on_device_path + + xml_file_on_server_path self.execute_command(get_xml_command).wait() try: os.rename(xml_file_origin,xml_file) except OSError as oer: print oer self.get_xml() if os.path.isfile(xml_file): return xml_file break else: retry_count += 1 else: print "Unkown issue of adb, uiautomator dump file failed!" break return def adb_root(self): root_command = adb -s + self.device_id + root self.execute_command(root_command).wait() def generate_folder_locate_picture(self): address_picture = self.failure_pic_location if not os.path.isdir(address_picture): try: os.makedirs(address_picture) except OSError: pass return address_picture def generate_screenshot_name_format(self): dt = datetime.datetime.now().strftime(%Y_%m_%d_%H_%M_%S) file_name = screenshot_ + dt + .png return file_name def adb_command(self): adb_command = adb -s + self.device_id + return adb_command def adb_shell(self): adb_shell_command = adb -s + self.device_id + shell return adb_shell_command def execute_command(self,cmd,ignore_print=True): ccmd = cmd if ignore_print: self.log_class.info("Execute command : {0}".format(ccmd)) else: pass proc = sp.Popen(ccmd.split( ),stdout=sp.PIPE) return proc def check_adb_works(self,proc): pass def get_dumpsys_display(self): dumpsys_display_command = self.adb_shell() + dumpsys display out = self.execute_command(dumpsys_display_command) lines = out.stdout.read().split(\n) return lines def bound_to_list(self): s = self.parse_window_dump(paras_dictory) temp = s.replace([,‘‘).replace(],,).split(,) temp.remove(‘‘) return temp def get_centre_coordinate(self): c_list = self.bound_to_list() point_c = [] co_x1 = c_list[0] co_x2 = c_list[2] point_c.append(str((int(co_x1)+int(co_x2))/2)) co_y1 = c_list[1] co_y2 = c_list[3] point_c.append(str((int(co_y1)+int(co_y2))/2)) return point_c def native_power_on(self): if not self.native_check_screen_on(): power_on_command = self.adb_shell() + input keyevent 26 self.execute_command(power_on_command).wait() def native_unlock_screen(self): self.native_power_on() if self.native_is_screen_lock(): self.native_push_up() def native_check_screen_on(self): is_screen_on = False for line in self.get_dumpsys_display(): if mScreenState in line and ON in line: is_screen_on = True return is_screen_on def native_push_up(self): device_resolution = self.get_max_resolution() max_w = device_resolution[0] max_h = device_resolution[1] if max_w ==unknow or max_h ==unknow: print "Error : dumpsys display failed" sys.exit(-1) else: x1 = str(int(max_w)/2) x2 = str(int(max_w)/2) y1 = str((int(max_h)*4)/5) y2 = str(int(max_h)/5) push_up_command = self.adb_shell() + input swipe + x1 + + y1 + + x2 + + y2 self.execute_command(push_up_command).wait() def get_max_resolution(self): max_resolution = [] for line in self.get_dumpsys_display(): if mDisplayWidth in line: max_resolution.append(line.strip(\r).split(=)[-1]) if mDisplayHeight in line: max_resolution.append(line.strip(\r).split(=)[-1]) return max_resolution def take_snapshot(self): global xml_path xml_path=self.get_xml() file_path = self.generate_folder_locate_picture() picture_name = self.generate_screenshot_name_format() uiautomator_dump_file_name = picture_name.split(.)[0] + .uix uiautomator_dump_file = os.path.join(file_path,uiautomator_dump_file_name) file_path_device = os.path.join(/sdcard/ + picture_name) take_snapshot_command = self.adb_shell() + /system/bin/screencap -p + file_path_device pull_snapshot_command = adb -s + self.device_id + pull + file_path_device + + file_path get_uiautomator_dump_command = self.adb_shell() + uiautomator dump pull_uiautomator_dump_command = adb -s + self.device_id + pull /sdcard/window_dump.xml + + uiautomator_dump_file self.native_unlock_screen() time.sleep(0.5) self.execute_command(take_snapshot_command).wait() self.execute_command(pull_snapshot_command).wait() self.execute_command(get_uiautomator_dump_command).wait() self.execute_command(pull_uiautomator_dump_command).wait() def little_swipe(self): device_resolution_2 = self.get_max_resolution() max_w_2 = device_resolution_2[0] max_h_2 = device_resolution_2[1] x1_2 = str(int(max_w_2)/2) x2_2 = str(int(max_w_2)/2) y2_2 = str((int(max_h_2)*2)/5) y1_2 = str((int(max_h_2)*4)/5) little_swipe_command = self.adb_shell() + input swipe + x1_2 + + y1_2 + + x2_2 + + y2_2 self.execute_command(little_swipe_command).wait() class Device(object): def __init__(self,executor): self.executor = executor def return_home(self): go_home_command = self.executor.adb_shell() + input keyevent 3 self.executor.execute_command(go_home_command).wait() def press_enter(self): press_enter_command = self.executor.adb_shell() + input keyevent 66 self.executor.execute_command(press_enter_command).wait() def press_menu(self): press_menu_command = self.executor.adb_shell() + input keyevent 82 self.executor.execute_command(press_menu_command).wait() def shift_tab(self,counter=1): tab_key = 61 shift_tab_command = self.executor.adb_shell() + input keyevent + (tab_key*int(counter)).strip() self.executor.execute_command(shift_tab_command).wait() def shift_down(self,counter=1): down_key = 20 press_down_command = self.executor.adb_shell() + input keyevent + (down_key*int(counter)).strip() self.executor.execute_command(press_down_command).wait() def shift_up(self,counter=1): up_key = 19 press_up_command = self.executor.adb_shell() + input keyevent + (up_key*int(counter)).strip() self.executor.execute_command(press_up_command).wait() def shift_left(self,counter=1): press_left_command = self.executor.adb_shell() + input keyevent 21 for j in range(int(counter)): self.executor.execute_command(press_left_command).wait() time.sleep(1) def volume_up(self,counter=1): volume_up_command = self.executor.adb_shell() + input keyevent 24 for j in range(int(counter)): self.executor.execute_command(volume_up_command).wait() time.sleep(0.5) def volume_down(self,counter=1): volume_down_command = self.executor.adb_shell() + input keyevent 25 for j in range(int(counter)): self.executor.execute_command(volume_down_command).wait() time.sleep(0.5) def shift_right(self,counter=1): press_right_command = self.executor.adb_shell() + input keyevent 22 for k in range(int(counter)): self.executor.execute_command(press_right_command).wait() time.sleep(1) def push_up(self): device_resolution = self.executor.get_max_resolution() max_w = device_resolution[0] max_h = device_resolution[1] if max_w ==unknow or max_h ==unknow: print "Error : dumpsys display failed" sys.exit(-1) else: x1 = str(int(max_w)/2) x2 = str(int(max_w)/2) y1 = str((int(max_h)*4)/5) y2 = str(int(max_h)/5) push_up_command = self.executor.adb_shell() + input swipe + x1 + + y1 + + x2 + + y2 self.executor.execute_command(push_up_command).wait() def swipe_right(self): device_resolution = self.executor.get_max_resolution() max_w = device_resolution[0] max_h = device_resolution[1] if max_w ==unknow or max_h ==unknow: print "Error : dumpsys display failed" sys.exit(-1) else: x1 = str(int(max_w)/5) x2 = str((int(max_w)*4)/5) y1 = str(int(max_h)/2) y2 = str(int(max_h)/2) swipe_right_command = self.executor.adb_shell() + input swipe + x1 + + y1 + + x2 + + y2 self.executor.execute_command(swipe_right_command).wait() def check_boot_status(self): is_booted = False check_boot_status_command = self.executor.adb_shell() + getprop grep_command = grep bootanim p5 = sp.Popen(check_boot_status_command.split( ),stdout=sp.PIPE) p6 = sp.Popen(grep_command.split( ),stdin=p5.stdout,stdout=sp.PIPE) out = p6.stdout.read().strip(\n) if "stopped" and 1 in out: is_booted = True return is_booted def check_reboot_progress(self): reboot_execute_success = False if not self.check_boot_status(): reboot_execute_success = True return reboot_execute_success def reboot_device(self): reboot_device_command = self.executor.adb_command() + reboot self.executor.execute_command(reboot_device_command).wait() def input_text(self,string=%s): string = string.strip().replace( ,%s) input_text_command = self.executor.adb_shell() + input text + string self.executor.execute_command(input_text_command).wait() def swipe_left(self): device_resolution = self.executor.get_max_resolution() max_w = device_resolution[0] max_h = device_resolution[1] if max_w ==unknow or max_h ==unknow: print "Error : dumpsys display failed" sys.exit(-1) else: x1 = str((int(max_w)*4)/5) x2 = str(int(max_w)/5) y1 = str(int(max_h)/2) y2 = str(int(max_h)/2) swipe_left_command = self.executor.adb_shell() + input swipe + x1 + + y1
