前面写了关于整个云真机平台设计的帖子云真机平台的设计与实现,实际的框架实现起来涉及知识点还是很多的。
简介
前面写了关于整个云真机平台设计的帖子云真机平台的设计与实现,实际的框架实现起来涉及知识点还是很多的。参考了atx-android-provider的代码,拆分需要的方法实现了该模块,这篇算是对python+安卓设备provider实现的基本讲解。
实现效果
gif有点慢,主要展示的是设备接入后自动刷新列表,设备掉线后状态自动更新。
数据流程
获取依赖文件
安卓控制主要需要的是minitouch、minicap、atx-agent 这三个文件,由于不同设备的cpu架构不同,我们需要根据不同的设备cpu丢进去不同的执行文件。
1 2 3 4 5 6 7
| def _init_binaries(self): d = self._device sdk = d.getprop("ro.build.version.sdk") abi = d.getprop('ro.product.cpu.abi') abis = (d.getprop('ro.product.cpu.abilist').strip() or abi).split(",") logger.debug("%s sdk: %s, abi: %s, abis: %s", self, sdk, abi, abis)
|
官方npm库minicap-prebuilt有编译好的文件,atx的作者已经打包放到git上了,可以直接下载stf-binaries,通过adb push进手机。
在windows上开发时,发现从zipfile中 push 类似minicap
可执行文件时,部分机型会报错。catch报错后,将下载的zip文件给解压,直接从文件夹中调adb push。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| stf_zippath = fetching.get_stf_binaries() zip_folder, _ = os.path.splitext(os.path.basename(stf_zippath)) prefix = zip_folder + "/node_modules/minicap-prebuilt/prebuilt/" self._push_stf(prefix + abi + "/lib/android-" + sdk + "/minicap.so", "/data/local/tmp/minicap.so", mode=0o644, zipfile_path=stf_zippath) try: self._push_stf(prefix + abi + "/bin/minicap", "/data/local/tmp/minicap", zipfile_path=stf_zippath) except Exception as e: logger.warn("Device: %s push minicap fail", self._serial) realFile = "vendor/" + prefix + abi + "/bin/minicap" if os.path.isfile(realFile): self._device.sync.push(realFile, "/data/local/tmp/minicap") else: with zipfile.ZipFile(stf_zippath) as zFile: zFile.extractall(path='vendor/') self._device.sync.push(realFile, "/data/local/tmp/minicap")
|
设备监听服务
Provider是通过adb track_devices来监听设备的状态,同时会对新接入设备进行初始化检查,在此事件中,可以用来定制设备的过滤规则。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| async for event in adb.track_devices(): logger.debug("%s", event) if not allow_remote: if re.match(r"(\d+)\.(\d+)\.(\d+)\.(\d+):(\d+)", event.serial): logger.debug("Skip remote device: %s", event) continue if event.present: try: udid = serial2udid[event.serial] = event.serial udid2serial[udid] = event.serial device = AndroidDevice(event.serial, partial(callback, udid)) await device.init() await device.open_identify() udid2device[udid] = device await serverCon.update_device({ "udid": udid, "platform": "android", "colding": False, "provider_name": provider_info()['name'], "provider_ip": provider_info()['ip'], "provider": device.addrs(), "properties": await device.properties(), }) logger.info("Device:%s is ready", event.serial) except RuntimeError: logger.warning("Device:%s initialize failed", event.serial) except Exception as e: logger.error("Unknown error: %s", e) traceback.print_exc() else: udid = serial2udid[event.serial] if udid in udid2device: udid2device[udid].close() udid2device.pop(udid, None) await serverCon.disconnect_device({ "udid": udid, })
|
长连接封装
因为要监听服务端一些辅助操作指令,所以采用的socket长连接,来与服务端通信。这里对服务进行了些封装方便调用扩展。
我的服务端是 flask-socket.io , 这里采用的python-socketio作为客户端。socket使用的坑很多,不细说了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| async def connect(self): cnt = 0 while True: try: socketIO = await self._connect() cnt = 0 return socketIO except Exception as e: cnt = min(30, cnt + 1) logger.warning("WS connect error: %s, reconnect after %ds", e, cnt + 1)
async def _connect(self): socketIO.connect('http://localhost:5000', namespaces=['/provider'])
async def update_device(self,data): data = json.dumps(data) self._socketIO.emit('update_device', data, '/provider')
async def disconnect_device(self,data): data = json.dumps(data) self._socketIO.emit('disconnect_device', data, '/provider')
|
辅助功能服务
因为minitouch只是提供了UI基于坐标的点击、滑动操作,而实际使用过程中,还需要一些手机的热键操作,如:电源键、home、back、menu、截图等操作。通过监听服务的广播事件,调用本地的adb去发送keyevent code来操作指定的设备。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| @socketIO.on('provider_command') def provider_command(data): serial = data['params']['deviceId'] if data['command'] == 'rotate': rotate(serial, data['params']['orientation']) elif data['command'] == 'home': home(serial) elif data['command'] == 'menu': menu(serial) elif data['command'] == 'back': back(serial) elif data['command'] == 'power': power(serial) elif data['command'] == 'sendkeys': sendkeys(serial, data['params']['value'])
|
根据adb封装的辅助指令
例如:
1 2 3
| def home(serial): adbclient.shell(serial,"input keyevent HOME") logger.info('KEYCODE_HOME event %s to sucessful!'%(serial))
|