破大防!这个开源库,竟能让APP日常任务自动化变得如此简单
0x1、引言
恰逢某电商618活动,前两天写了篇 《节约"阳寿"——某电商618活动自动化》,脚本这两天都有挂,偶尔没事就会优化下,但始终还是觉得有些美中不足。昨天下班路上,还想着重构下,随手列了下思路:
实际运行中,通过图片相似度判定的方案并 不太可靠,经常出现误判,比如:
任务描述截图是这个:
而品牌墙任务对应的截图应该是这个:
这样都能有0.76的相似度,不过,其实也 河里,均值哈希算法比对的是:两幅图灰度后像素的平均值。
除此之外还有个大问题:动态计算任务描述文字区域非常繁琐,不同手机的分辨率和屏幕密度不同,对应的实际坐标点也不同。调整计算方式多次依旧效果甚微,后面放弃动态算了,直接写死特定设备的点击区域坐标~
昨晚睡前想了想,还是得走OCR,要么用靠谱的 第三方库 (训练好的),要么注册多几个OCR平台,轮流白嫖,并想办法减少重复识别 (搭配图片相似度判定,结果复用)。
0x2、库体验
演示地址502了,看截图也不是很清楚,不过看着很牛批啊,直接clone一波,然后cd到目录下,输入启动命令:
python backend/main.py
当然,一般是运行不起来的,相关依赖都没有装,报缺啥,你就pip装啥,比如笔者依次就装了这些:
# Python 高性能Web框架
pip install tornado
# opencv → cv2
pip install opencv-python
# ONNX格式的机器学习模型的高性能推理引擎
pip install onnxruntime
# 小型动态图形计算库,将输入的图形路径进行处理
pip install pyclipper
# 空间几何对象库,支持点线面等集合对象及相关空间操作
pip install shapely
该装的都装好了,在此执行运行命令,终端最后会输出一个内网的ip地址:
复制到浏览器打开,然后选择一张图片传入,接着点击识别,静待识别完成:
这识别效果,我直接 震惊!!!
识别率高不说,连文字区域的坐标也给出来了,你知道这意味着什么吗?
不用自己再去计算裁剪区域,只需做下文本匹配,就可以知道什么类型的任务。
真 · 起飞,这不得赶紧安排一波~
0x3、直接开搞
① 模拟上传 & 结果解析
就是要在Python代码里调用API接口,上传图片进行识别,然后解析识别结果。直接打开Chrome,F12抓包,上传图片,点击识别:
multipart/form-data
上传文件的一种方式,可以理解为 多部分表单数据,就好像平时写邮件时经常会加上附件,而附件也是用表单添加。看下具体提交的表单数据:
可以,使用requests来模拟请求,根据请求参数写出下述代码:
import socket
import requests as r
local_ocr_base_url = "http://{}:8089".format(socket.gethostbyname(socket.gethostname()))
local_ocr_tr_run_url = local_ocr_base_url + "/api/tr-run/"
def picture_local_ocr(pic_path):
upload_files = {'file': open(pic_path, 'rb'), 'compress': 960}
# 发送请求会自动加上Content-Type,不要手多加上,加了会报错
resp = r.post(local_ocr_tr_run_url, files=upload_files)
print(resp.text)
if __name__ == '__main__':
picture_local_ocr('test.jpg')
运行结果如下 (输出结果复制到Json格式化工具中):
可以,解析一波数据,返回格式:文字 : (x起始坐标,y起始坐标,x结束坐标,y结束坐标)
def extract_text(origin_data_dict):
text_dict = {}
raw_out = origin_data_dict['data']['raw_out']
if raw_out is not None:
for raw in raw_out:
text_dict[raw[1]] = (raw[0][0][0], raw[0][0][1], raw[0][17][0], raw[0][18][1])
return text_dict
else:
print("Json数据解析异常")
打印提取后的结果如下:
② 编写任务
可以,非常完美,接着就是定义各种类型任务的处理了,具体代码如下:
from airtest.core.api import *
from poco.drivers.android.uiautomation import AndroidUiautomationPoco
import random
import cp_utils
class Task:
def __init__(self, to_finish_node=None, task_name=None, logger=None):
self.to_finish_node = to_finish_node
self.task_name = task_name
self.logger = cp_utils.default_logger() if logger is None else logger
self.po = AndroidUiautomationPoco(use_airtest_input=True, screenshot_each_action=False)
def start(self):
self.logger.info("任务【{}】执行开始".format(self.task_name))
self.doing()
self.logger.info("任务【{}】执行结束".format(self.task_name))
def doing(self):
# 具体要完成的任务
pass
def to_finish_position(self):
sleep(1)
return (self.to_finish_node[0] + random.randint(10, self.to_finish_node[2] - self.to_finish_node[0]),
self.to_finish_node[1] + random.randint(10, self.to_finish_node[3] - self.to_finish_node[1]))
def browser_8s(self):
task_flag = exists(Template(r"8s_task_flag.png", record_pos=(-0.383, -0.431), resolution=(1080, 2160)))
if task_flag:
sleep(10)
keyevent("KEYCODE_BACK")
else:
sleep(20)
keyevent("KEYCODE_BACK")
class BrowserTask(Task):
def __init__(self):
super().__init__(task_name="浏览可得3000金币")
def doing(self):
touch(self.to_finish_position())
sleep(2)
keyevent("KEYCODE_BACK")
class SmallAppTask(Task):
def __init__(self):
super().__init__(task_name="去参与小程序活动可得8000金币")
def doing(self):
sleep(3)
activity_infos = shell('dumpsys activity top | grep ACTIVITY')
activity_pos = activity_infos.find("某东包名")
if activity_pos == -1:
start_app("com.jingdong.app.mall")
sleep(2)
class BrowseAttention8sTask(Task):
def __init__(self):
super().__init__(task_name="浏览并关注8s可得8000金币")
def doing(self):
touch(self.to_finish_position())
self.browser_8s()
class Browse8sTask(Task):
def __init__(self):
super().__init__(task_name="浏览8s可得7000金币")
def doing(self):
touch(self.to_finish_position())
self.browser_8s()
class Browser4Commodity(Task):
def __init__(self):
super().__init__(task_name="累计浏览4个商品可得5000金币")
# 点我浏览的坐标点
self.click_pos_tuple = (
(366, 1115),
(922, 1117),
(394, 1822),
(911, 1872),
)
def doing(self):
touch(self.to_finish_position())
# 静待片刻等待加载完毕
sleep(3)
for click_pos in self.click_pos_tuple:
# 坐标加上随机数,不然每次点同一个位置,太假了
touch((click_pos[0] + random.randint(0, 10), click_pos[1] + random.randint(0, 10)))
sleep(2)
keyevent("KEYCODE_BACK")
sleep(2)
keyevent("KEYCODE_BACK")
class AddOnBrowser4Commodity(Task):
def __init__(self):
super().__init__(task_name="累计浏览并加购4个商品可得4000金币")
# 点我浏览的坐标点
self.click_pos_tuple = (
(366, 1115),
(922, 1117),
(394, 1822),
(911, 1872),
)
def doing(self):
touch(self.to_finish_position())
# 静待片刻等待加载完毕
sleep(3)
for click_pos in self.click_pos_tuple:
# 坐标加上随机数,不然每次点同一个位置,太假了
touch((click_pos[0] + random.randint(0, 10), click_pos[1] + random.randint(0, 10)))
sleep(2)
keyevent("KEYCODE_BACK")
sleep(2)
keyevent("KEYCODE_BACK")
class JoinAndBrowser(Task):
def __init__(self):
super().__init__(task_name="成功入会并浏览可得3000-8000金币")
def doing(self):
touch(self.to_finish_position())
sleep(2)
keyevent("KEYCODE_BACK")
class FocusOnAndBrowser(Task):
def __init__(self):
super().__init__(task_name="浏览并关注可得3000金币")
def doing(self):
touch(self.to_finish_position())
sleep(2)
keyevent("KEYCODE_BACK")
class Browser2000Order10000Task(Task):
def __init__(self):
super().__init__(task_name="下单再得10000金币")
def doing(self):
touch(self.to_finish_position())
sleep(2)
keyevent("KEYCODE_BACK")
class InviteTask(Task):
def __init__(self):
super().__init__(task_name="邀请任务")
def doing(self):
根据实际情况对任务进行补全和调整即可~
③ 任务判定
接着就是任务判定的相关逻辑了:
from jd_task import *
import ocr_utils
import cp_file_utils
import re
import difflib
# 手机设备id,通过adb devices可以获取到
device_id = 'xxx'
# 临时图片保存路径
temp_save_dir = os.path.join(os.getcwd(), "log")
# 日志工具初始化
logger = cp_utils.logging_init()
# 匹配任务描述的正则
task_desc_pattern = re.compile(r"可得.*?金", re.S)
# 匹配任务计数器的正则
task_counter_pattern = re.compile(r"(\d)/(\d)", re.S)
# 标记
no_task_flag = False # 用来检测任务是否都完成的标记
# 一些初始化工作
def init():
cp_file_utils.is_dir_existed(temp_save_dir, True, True)
# 连接设备
auto_setup(__file__, logdir=True, devices=["android://127.0.0.1:5037/{}".format(device_id)])
logger.info("初始化完成...")
# 初始化任务状态
def task_status():
global no_task_flag
# 生成截图
snapshot_path = os.path.join(temp_save_dir, snapshot()['screen'])
# 进行文字识别
ocr_dict = ocr_utils.picture_local_ocr(snapshot_path)
# 依次保存:去完成、任务描述、任务数列表
to_finish_node_list = []
task_desc_list = []
task_cur_sum_list = []
for ocr_key in ocr_dict.keys():
if "去完成" in ocr_key:
to_finish_node_list.append({ocr_key: ocr_dict[ocr_key]})
elif task_desc_pattern.search(ocr_key) is not None:
task_desc_list.append({ocr_key: ocr_dict[ocr_key]})
else:
task_cur_sum_result = task_counter_pattern.search(ocr_key)
if task_cur_sum_result is not None:
task_cur_sum_list.append({task_cur_sum_result: ocr_dict[ocr_key]})
# 根据Y周差值绝对值<80,将三者关联
task_list = [] # 任务列表
for to_finish_node in to_finish_node_list:
node = list(to_finish_node.values())[0]
task_wrapper = TaskWrapper(node)
for task_desc in task_desc_list:
if abs(list(task_desc.values())[0][20] - node[1]) < 80:
task_wrapper.task_desc = list(task_desc.keys())[0].split("、")[-1].lstrip()
break
for task_cur_sum in task_cur_sum_list:
if abs(list(task_cur_sum.values())[0][21] - node[1]) < 80:
task_wrapper.cur_count = list(task_cur_sum.keys())[0].group(1)
task_wrapper.sum_count = list(task_cur_sum.keys())[0].group(2)
break
task_result = task_wrapper.generate_task_list()
if task_result is not None:
task_list += task_result
task_list_len = len(task_list)
logger.info("待完成任务数:{}".format(len(task_list)))
if task_list_len == 0:
if no_task_flag:
logger.info("所有任务已完成")
return
else:
logger.info("当前所有任务已完成,检测是否仍有新任务")
no_task_flag = True
task_status()
else:
for task in task_list:
task.start()
sleep(2)
logger.info("当前所有任务已完成,检测是否仍有新任务")
no_task_flag = False
task_status()
class TaskWrapper:
def __init__(self, to_finis_node=None, task_desc=None, cur_count=0, sum_count=0):
self.to_finis_node = to_finis_node
self.task_desc = task_desc
self.cur_count = cur_count
self.sum_count = sum_count
# 生成任务列表
def generate_task_list(self):
if self.sum_count == 0 or self.cur_count < self.sum_count:
if self.task_desc is None:
return []
task_type = None
if self.compare_desc("每邀1个好友可得10000金币"):
task_type = 'InviteTask()'
elif self.compare_desc("去参与小程序活动可得8000金币"):
task_type = 'SmallAppTask()'
elif self.compare_desc("浏览并关注8s可得8000金币"):
task_type = 'BrowseAttention8sTask()'
elif self.compare_desc("浏览8s可得7000金币"):
task_type = 'Browse8sTask()'
elif self.compare_desc("累计浏览4个商品可得5000金币"):
task_type = 'Browser4Commodity()'
elif self.compare_desc("累计浏览并加购4个商品可得4000金币"):
task_type = 'AddOnBrowser4Commodity()'
elif self.compare_desc("成功入会并浏览可得3000-8000金币"):
task_type = 'JoinAndBrowser()'
elif self.compare_desc("浏览并关注可得3000金币"):
task_type = 'FocusOnAndBrowserTask()'
elif self.compare_desc("浏览可得4000金币"):
task_type = 'ZhongCaoTask()'
elif self.compare_desc("浏览可得3000金币"):
task_type = 'BrowserTask()'
elif self.compare_desc("下单再得10000金币"):
task_type = 'Browser2000Order10000Task()'
task_list = []
task_count = int(self.sum_count) - int(self.cur_count)
if task_count <= 0 and int(self.sum_count) == 0:
task_count = 4
for i in range(0, task_count):
task = eval(task_type)
task.to_finish_node = self.to_finis_node
task_list.append(task)
return task_list
def compare_desc(self, target_desc):
return difflib.SequenceMatcher(None, self.task_desc, target_desc).quick_ratio() > 0.75
def show(self):
print("{}={}={}/{}".format(self.to_finis_node, self.task_desc, self.cur_count, self.sum_count))
if __name__ == '__main__':
init()
task_status()
运行看下效果:
是的,就是这么简单,此处应有掌声,通过hineseocr_lite这个开源库,即可轻松实现APP日常任务自动化,读者们还不赶紧试试么~
转载自:https://juejin.cn/post/7103410257740693511