【Python 实战】---- 批量识别图片中的文字,存入excel中【使用百度的通用文字识别】
1. 需求分析
- 识别图片中的文字【采用百度的通用文字识别】;
- 文字筛选,按照分类获取对应的文本;
- 采用 openpyxl 实现将数据存入 excel 中。
2. 获取 access_token
- 获取本地缓存的 access_token;
- 如果本地缓存的 access_token 过期,从百度获取远程 access_token。
# 获取 access_token
def get_access_token():
access_token = get_local_access_token()
if access_token == False:
return get_request_access_token()
else:
return access_token
3. 本地 access_token
- 是否存在保存 token 的文件夹,不存在就创建,并且返回 False,表示本地没有找到 access_token;
- 获取文件夹中的 access_token 文件,筛选文本文件,access_token 保存在文本文件中;
- 如果没有找到保存 access_token 的文件,返回 False;
- 获取文件名中的数字,进行排序,获取最后一个最新的文件;
- 获取存储token的时间;
- 由于token的有效时间是30天,因此判断29进行刷新;
- 获取token执行的当前时间;
- 判断如果超出有效期,重新刷新 access_token;
- 获取本地文件中缓存的 access_token。
# 获取本地的 access_token
def get_local_access_token():
# 是否存在保存 token 的文件夹,不存在就创建,并且返回 False,表示本地没有找到 access_token
if not os.path.exists(f'./token/'):
os.makedirs(f'./token/')
return False
# 获取文件夹中的token
files = os.listdir("./token")
file_names = list(filter(lambda x : x.split('.').pop() in ['txt'], files))
# 如果没有找到保存 access_token 的文件,返回 False
if len(file_names) == 0:
return False
sort_names = list(sorted(file_names, key=lambda x:(int(re.sub('\D', '', x)),x)))
last_time_name = sort_names[-1]
# 存储token的时间
save_time = int(re.sub('\D', '', last_time_name))
# 由于token的有效时间是30天,因此判断29进行刷新
effective_time = 60 * 60 * 24 * 29
# 获取token执行的当前时间
current_time = int(time.time())
# 保存 access_token 的变量
access_token = ""
# 判断如果超出有效期,重新刷新 access_token
if current_time - save_time > effective_time:
return False
else:
# 获取本地文件中缓存的 access_token
with open("./token/" + last_time_name, "r",encoding="utf-8") as f:
access_token = f.read()
return access_token
4. 远程 access_token
- 使用在百度控制台获取的 key 和 secret,组装一个获取 access_token 的地址;
- 定义一个发起请求的 headers ;
- 使用 requests 发起请求,获取 access_token ;
- 获取返回结果【response】中的 access_token ;
- 判断 access_token 存在,就以当前时间戳为文件名,将 access_token 写入文件中;
- 返回获取的远程的 access_token;
- 如果 access_token 无效,输出报错,关闭弹窗。
# 获取百度的 access_token
def get_request_access_token():
url = f'https://aip.baidubce.com/oauth/2.0/token?grant_type=client_credentials&client_id={key}&client_secret={secret}'
payload = ""
headers = {
'Content-Type': 'application/json',
'Accept': 'application/json'
}
response = requests.request("POST", url, headers=headers, data=payload)
if response:
access_token = response.json().get('access_token')
if access_token:
with open("./token/" + str(int(time.time())) + ".txt", "w",encoding="utf-8") as f:
f.write(access_token)
return str(access_token)
else:
print(response.json())
print('无正确的 access_token 返回!!!')
print('3s 后自动关闭打印窗口!')
time.sleep(3)
print('关闭打印窗口!')
5. 获取所有需要识别的图片
- 获取文件夹下的所有文件名称;
- 过滤获取所有的图片文件名列表。
# 获取文件夹下所有图片文件名称
def get_all_image_names(path):
names = os.listdir(path)
image_names = list(filter(lambda x : x.split('.').pop() in ['jpg', 'png', 'jpeg', 'bmp'], names))
return image_names
6. 获取所有图片的图片信息
- 使用 all_list_data 存储返回图片的信息;
- 循环图片名列表,读取每一张图片的信息;
- 返回所有图片的信息。
# 获取所有图片的图片信息
def get_all_image_info(path, image_names, access_token):
all_list_data = []
for name in image_names:
all_list_data += get_image_to_text(access_token, f"{path}/{name}")
# print('all_list_data',all_list_data)
return all_list_data
7. 获取图片中的文字
- 设置采用识别文字的api接口;
- 读取图片,组装为请求参数;
- 用请求接口和 access_token 拼接为完整的获取图片中文本信息的接口;
- 使用 requests 发起请求,获取图片中识别的文本信息;
- 读取返回的文本结果 words_result;
- 如果 words_result 存在,使用 words_to_object 对每个返回结果进行处理;
- 否则输出错误,同时关闭窗口。
# 获取图片中的文字
def get_image_to_text(access_token, path):
request_url = "https://aip.baidubce.com/rest/2.0/ocr/v1/accurate_basic"
# 二进制方式打开图片文件
with open(path, 'rb') as f:
img = base64.b64encode(f.read())
params = {"image":img}
request_url = request_url + "?access_token=" + access_token
headers = {'content-type': 'application/x-www-form-urlencoded'}
response = requests.post(request_url, data=params, headers=headers)
if response:
words_result = response.json().get("words_result")
if words_result:
print(f'图片:{path} 读取完成!')
return words_to_object(words_result)
time.sleep(0.5)
else:
print(response.json())
print('识别文字无返回!!!')
print('3s 后自动关闭打印窗口!')
time.sleep(3)
print('关闭打印窗口!')
8. 文本信息筛选
- 通过【收益率】获取需要数据的起始标识;
- 筛选获取【清仓次数】关键字,为当前次进入;
- 否则【清仓】存在,就是结束。
# 将文本信息列表转字典
def words_to_object(words_result):
start = 0
date_list = []
# print('words_result',words_result)
# 获取起始角标
for index,item in enumerate(words_result):
if item.get("words") == "收益率":
start = index
break
for index,item in enumerate(words_result):
if index > start:
words = item.get("words")
if '清仓次数' in words:
words_split = words.split(',清仓盈利')
value = words_split[1]
start_word = index
if value == '' and words_result[index + 1]:
value = words_result[index + 1].get("words")
start_word = index + 1
if index < len(words_result) - 1:
date_list.append({
"name": words_split[0],
"value": value,
"lists": [[]]
})
elif '清仓' in words:
date_list[-1]["lists"][-1].append(words)
if index < len(words_result) - 1 and '清仓次数' not in words_result[index + 1].get("words"):
date_list[-1]["lists"].append([])
else:
if index > start_word:
date_list[-1]["lists"][-1].append(words)
return date_list
9. 读取数据保存到excel中
- 将需要的数据拼接到一个列表中;
- 获取盈利数据转浮点数,返回列表;
- 对盈利点数求和;
- 通过盈利点数排序;
- 通过持有天数排序;
- 循环将数据存入到excel表格中;
- 判断表格是否存在,存在就删除,重新保存;
- 不存在就直接保存;
- 关闭弹窗,程序运行结束。
# 将读取数据保存到excel中
def save_info_to_excel(infos):
info = openpyxl.Workbook()
sheet = info.active
all_infos = []
for item in infos:
lists = item.get("lists")
if lists and len(lists) > 0:
all_infos += lists
all_infos_total = list(map(lambda vals: float(vals[2].replace(',','')), all_infos))
total = sum(all_infos_total)
# print('total',total)
sorted_infos = list(sorted(all_infos, key=lambda vals: float(vals[2].replace(',',''))))
# print('sorted_infos',sorted_infos)
days_infos = list(sorted(all_infos, key=lambda vals: int(vals[1])))
# print('days_infos',days_infos)
for index,vals in enumerate(all_infos):
# 直接数据列表
sheet.cell(row=index+1, column=1).value = vals[0]
sheet.cell(row=index+1, column=2).value = vals[1]
sheet.cell(row=index+1, column=3).value = vals[2]
sheet.cell(row=index+1, column=4).value = vals[3]
sheet.cell(row=index+1, column=5).value = vals[4]
# 盈利金额排序
infos = sorted_infos[index]
sheet.cell(row=index+1, column=7).value = infos[0]
sheet.cell(row=index+1, column=8).value = infos[1]
sheet.cell(row=index+1, column=9).value = infos[2]
sheet.cell(row=index+1, column=10).value = infos[3]
sheet.cell(row=index+1, column=11).value = infos[4]
# 按照持有时间排序
days = days_infos[index]
sheet.cell(row=index+1, column=13).value = days[0]
sheet.cell(row=index+1, column=14).value = days[1]
sheet.cell(row=index+1, column=15).value = days[2]
sheet.cell(row=index+1, column=16).value = days[3]
sheet.cell(row=index+1, column=17).value = days[4]
# 总积
sheet.cell(row=len(all_infos) + 1, column=1).value = '总计'
sheet.cell(row=len(all_infos) + 1, column=2).value = round(total,2)
if not os.path.exists('./股票清仓信息.xlsx'):
info.save('./股票清仓信息.xlsx')
else:
os.remove('./股票清仓信息.xlsx')
info.save('./股票清仓信息.xlsx')
print('股票清仓信息.xlsx保存成功')
print('3s 后自动关闭打印窗口!')
time.sleep(3)
print('关闭打印窗口!')
10. 获取信息图片示例
11. 运行实例
12. 运行结果
13. 各个文件的位置
14. 完整代码
import requests
import json
import base64
import os
import time
import re
import openpyxl
from openpyxl.styles import *
id = '35108270'
key = 'xxx'
secret = 'xxx'
# 获取 access_token
def get_access_token():
access_token = get_local_access_token()
if access_token == False:
return get_request_access_token()
else:
return access_token
# 获取本地的 access_token
def get_local_access_token():
# 是否存在保存 token 的文件夹,不存在就创建,并且返回 False,表示本地没有找到 access_token
if not os.path.exists(f'./token/'):
os.makedirs(f'./token/')
return False
# 获取文件夹中的token
files = os.listdir("./token")
file_names = list(filter(lambda x : x.split('.').pop() in ['txt'], files))
# 如果没有找到保存 access_token 的文件,返回 False
if len(file_names) == 0:
return False
sort_names = list(sorted(file_names, key=lambda x:(int(re.sub('\D', '', x)),x)))
last_time_name = sort_names[-1]
# 存储token的时间
save_time = int(re.sub('\D', '', last_time_name))
# 由于token的有效时间是30天,因此判断29进行刷新
effective_time = 60 * 60 * 24 * 29
# 获取token执行的当前时间
current_time = int(time.time())
# 保存 access_token 的变量
access_token = ""
# 判断如果超出有效期,重新刷新 access_token
if current_time - save_time > effective_time:
return False
else:
# 获取本地文件中缓存的 access_token
with open("./token/" + last_time_name, "r",encoding="utf-8") as f:
access_token = f.read()
return access_token
# 获取百度的 access_token
def get_request_access_token():
url = f'https://aip.baidubce.com/oauth/2.0/token?grant_type=client_credentials&client_id={key}&client_secret={secret}'
payload = ""
headers = {
'Content-Type': 'application/json',
'Accept': 'application/json'
}
response = requests.request("POST", url, headers=headers, data=payload)
if response:
access_token = response.json().get('access_token')
if access_token:
with open("./token/" + str(int(time.time())) + ".txt", "w",encoding="utf-8") as f:
f.write(access_token)
return str(access_token)
else:
print(response.json())
print('无正确的 access_token 返回!!!')
print('3s 后自动关闭打印窗口!')
time.sleep(3)
print('关闭打印窗口!')
# 获取图片中的文字
def get_image_to_text(access_token, path):
request_url = "https://aip.baidubce.com/rest/2.0/ocr/v1/accurate_basic"
# 二进制方式打开图片文件
with open(path, 'rb') as f:
img = base64.b64encode(f.read())
params = {"image":img}
request_url = request_url + "?access_token=" + access_token
headers = {'content-type': 'application/x-www-form-urlencoded'}
response = requests.post(request_url, data=params, headers=headers)
if response:
words_result = response.json().get("words_result")
if words_result:
print(f'图片:{path} 读取完成!')
return words_to_object(words_result)
time.sleep(0.5)
else:
print(response.json())
print('识别文字无返回!!!')
print('3s 后自动关闭打印窗口!')
time.sleep(3)
print('关闭打印窗口!')
# 将文本信息列表转字典
def words_to_object(words_result):
start = 0
date_list = []
# print('words_result',words_result)
# 获取起始角标
for index,item in enumerate(words_result):
if item.get("words") == "收益率":
start = index
break
for index,item in enumerate(words_result):
if index > start:
words = item.get("words")
if '清仓次数' in words:
words_split = words.split(',清仓盈利')
value = words_split[1]
start_word = index
if value == '' and words_result[index + 1]:
value = words_result[index + 1].get("words")
start_word = index + 1
if index < len(words_result) - 1:
date_list.append({
"name": words_split[0],
"value": value,
"lists": [[]]
})
elif '清仓' in words:
date_list[-1]["lists"][-1].append(words)
if index < len(words_result) - 1 and '清仓次数' not in words_result[index + 1].get("words"):
date_list[-1]["lists"].append([])
else:
if index > start_word:
date_list[-1]["lists"][-1].append(words)
return date_list
# 获取所有图片的图片信息
def get_all_image_info(path, image_names, access_token):
all_list_data = []
for name in image_names:
all_list_data += get_image_to_text(access_token, f"{path}/{name}")
# print('all_list_data',all_list_data)
return all_list_data
# 获取文件夹下所有图片文件名称
def get_all_image_names(path):
names = os.listdir(path)
image_names = list(filter(lambda x : x.split('.').pop() in ['jpg', 'png', 'jpeg', 'bmp'], names))
return image_names
# 将读取数据保存到excel中
def save_info_to_excel(infos):
info = openpyxl.Workbook()
sheet = info.active
all_infos = []
for item in infos:
lists = item.get("lists")
if lists and len(lists) > 0:
all_infos += lists
all_infos_total = list(map(lambda vals: float(vals[2].replace(',','')), all_infos))
total = sum(all_infos_total)
# print('total',total)
sorted_infos = list(sorted(all_infos, key=lambda vals: float(vals[2].replace(',',''))))
# print('sorted_infos',sorted_infos)
days_infos = list(sorted(all_infos, key=lambda vals: int(vals[1])))
# print('days_infos',days_infos)
for index,vals in enumerate(all_infos):
# 直接数据列表
sheet.cell(row=index+1, column=1).value = vals[0]
sheet.cell(row=index+1, column=2).value = vals[1]
sheet.cell(row=index+1, column=3).value = vals[2]
sheet.cell(row=index+1, column=4).value = vals[3]
sheet.cell(row=index+1, column=5).value = vals[4]
# 盈利金额排序
infos = sorted_infos[index]
sheet.cell(row=index+1, column=7).value = infos[0]
sheet.cell(row=index+1, column=8).value = infos[1]
sheet.cell(row=index+1, column=9).value = infos[2]
sheet.cell(row=index+1, column=10).value = infos[3]
sheet.cell(row=index+1, column=11).value = infos[4]
# 按照持有时间排序
days = days_infos[index]
sheet.cell(row=index+1, column=13).value = days[0]
sheet.cell(row=index+1, column=14).value = days[1]
sheet.cell(row=index+1, column=15).value = days[2]
sheet.cell(row=index+1, column=16).value = days[3]
sheet.cell(row=index+1, column=17).value = days[4]
# 总积
sheet.cell(row=len(all_infos) + 1, column=1).value = '总计'
sheet.cell(row=len(all_infos) + 1, column=2).value = round(total,2)
if not os.path.exists('./股票清仓信息.xlsx'):
info.save('./股票清仓信息.xlsx')
else:
os.remove('./股票清仓信息.xlsx')
info.save('./股票清仓信息.xlsx')
print('股票清仓信息.xlsx保存成功')
print('3s 后自动关闭打印窗口!')
time.sleep(3)
print('关闭打印窗口!')
if __name__ == '__main__':
path = './images'
# 获取 access_token
access_token = get_access_token()
# 获取images下所有文件
image_names = get_all_image_names(path)
# 获取所有图片的信息
if access_token and len(image_names) > 0:
all_info = get_all_image_info(path, image_names, access_token)
# 将信息存储到excel表格中
save_info_to_excel(all_info)
15. 总结
- 识别存在一定的误差,所以对返回数据进行处理时,需要细心筛选你需要的数据;
- access_token 是 30 天有效期,因此建议请求一次,就将最新的进行存储到本地,下次直接使用本地有效 access_token;
- 投资有风险,入行需谨慎。
转载自:https://juejin.cn/post/7387320448168443942