likes
comments
collection
share

【Python 实战】---- 批量识别图片中的文字,存入excel中【使用百度的通用文字识别】

作者站长头像
站长
· 阅读数 44

1. 需求分析

  1. 识别图片中的文字【采用百度的通用文字识别】;
  2. 文字筛选,按照分类获取对应的文本;
  3. 采用 openpyxl 实现将数据存入 excel 中。

2. 获取 access_token

  1. 获取本地缓存的 access_token;
  2. 如果本地缓存的 access_token 过期,从百度获取远程 access_token。
# 获取 access_token
def get_access_token():
  access_token = get_local_access_token()
  if access_token == False:
    return get_request_access_token()
  else:
    return access_token

3. 本地 access_token

  1. 是否存在保存 token 的文件夹,不存在就创建,并且返回 False,表示本地没有找到 access_token;
  2. 获取文件夹中的 access_token 文件,筛选文本文件,access_token 保存在文本文件中;
  3. 如果没有找到保存 access_token 的文件,返回 False;
  4. 获取文件名中的数字,进行排序,获取最后一个最新的文件;
  5. 获取存储token的时间;
  6. 由于token的有效时间是30天,因此判断29进行刷新;
  7. 获取token执行的当前时间;
  8. 判断如果超出有效期,重新刷新 access_token;
  9. 获取本地文件中缓存的 access_token。
# 获取本地的 access_token
def get_local_access_token():
  # 是否存在保存 token 的文件夹,不存在就创建,并且返回 False,表示本地没有找到 access_token
  if not os.path.exists(f'./token/'):
    os.makedirs(f'./token/')
    return False
  # 获取文件夹中的token
  files = os.listdir("./token")
  file_names = list(filter(lambda x : x.split('.').pop() in ['txt'], files))
  # 如果没有找到保存 access_token 的文件,返回 False
  if len(file_names) == 0:
    return False
  sort_names = list(sorted(file_names, key=lambda x:(int(re.sub('\D', '', x)),x)))
  last_time_name = sort_names[-1]
  # 存储token的时间
  save_time = int(re.sub('\D', '', last_time_name))
  # 由于token的有效时间是30天,因此判断29进行刷新
  effective_time = 60 * 60 * 24 * 29
  # 获取token执行的当前时间
  current_time = int(time.time())
  # 保存 access_token 的变量
  access_token = ""
  # 判断如果超出有效期,重新刷新 access_token
  if current_time - save_time > effective_time:
    return False
  else:
    # 获取本地文件中缓存的 access_token
    with open("./token/" + last_time_name, "r",encoding="utf-8") as f:
      access_token = f.read()
  return access_token

4. 远程 access_token

  1. 使用在百度控制台获取的 key 和 secret,组装一个获取 access_token 的地址;
  2. 定义一个发起请求的 headers ;
  3. 使用 requests 发起请求,获取 access_token ;
  4. 获取返回结果【response】中的 access_token ;
  5. 判断 access_token 存在,就以当前时间戳为文件名,将 access_token 写入文件中;
  6. 返回获取的远程的 access_token;
  7. 如果 access_token 无效,输出报错,关闭弹窗。
# 获取百度的 access_token
def get_request_access_token():
  url = f'https://aip.baidubce.com/oauth/2.0/token?grant_type=client_credentials&client_id={key}&client_secret={secret}'
  payload = ""
  headers = {
    'Content-Type': 'application/json',
    'Accept': 'application/json'
  }
  response = requests.request("POST", url, headers=headers, data=payload)
  if response:
    access_token = response.json().get('access_token')
    if access_token:
      with open("./token/" + str(int(time.time())) + ".txt", "w",encoding="utf-8") as f:
        f.write(access_token)
      return str(access_token)
    else:
      print(response.json())
      print('无正确的 access_token 返回!!!')
      print('3s 后自动关闭打印窗口!')
      time.sleep(3)
      print('关闭打印窗口!')

5. 获取所有需要识别的图片

  1. 获取文件夹下的所有文件名称;
  2. 过滤获取所有的图片文件名列表。
# 获取文件夹下所有图片文件名称
def get_all_image_names(path):
  names = os.listdir(path)
  image_names = list(filter(lambda x : x.split('.').pop() in ['jpg', 'png', 'jpeg', 'bmp'], names))
  return image_names

6. 获取所有图片的图片信息

  1. 使用 all_list_data 存储返回图片的信息;
  2. 循环图片名列表,读取每一张图片的信息;
  3. 返回所有图片的信息。
# 获取所有图片的图片信息
def get_all_image_info(path, image_names, access_token):
  all_list_data = []
  for name in image_names:
    all_list_data += get_image_to_text(access_token, f"{path}/{name}")
  # print('all_list_data',all_list_data)
  return all_list_data

7. 获取图片中的文字

  1. 设置采用识别文字的api接口;
  2. 读取图片,组装为请求参数;
  3. 用请求接口和 access_token 拼接为完整的获取图片中文本信息的接口;
  4. 使用 requests 发起请求,获取图片中识别的文本信息;
  5. 读取返回的文本结果 words_result;
  6. 如果 words_result 存在,使用 words_to_object 对每个返回结果进行处理;
  7. 否则输出错误,同时关闭窗口。
# 获取图片中的文字
def get_image_to_text(access_token, path):
  request_url = "https://aip.baidubce.com/rest/2.0/ocr/v1/accurate_basic"
  # 二进制方式打开图片文件
  with open(path, 'rb') as f:
    img = base64.b64encode(f.read())
    params = {"image":img}
    request_url = request_url + "?access_token=" + access_token
    headers = {'content-type': 'application/x-www-form-urlencoded'}
    response = requests.post(request_url, data=params, headers=headers)
    if response:
      words_result = response.json().get("words_result")
      if words_result:
        print(f'图片:{path} 读取完成!')
        return words_to_object(words_result)
        time.sleep(0.5)
      else:
        print(response.json())
        print('识别文字无返回!!!')
        print('3s 后自动关闭打印窗口!')
        time.sleep(3)
        print('关闭打印窗口!')

8. 文本信息筛选

  1. 通过【收益率】获取需要数据的起始标识;
  2. 筛选获取【清仓次数】关键字,为当前次进入;
  3. 否则【清仓】存在,就是结束。
# 将文本信息列表转字典
def words_to_object(words_result):
  start = 0
  date_list = []
  # print('words_result',words_result)
  # 获取起始角标
  for index,item in enumerate(words_result):
    if item.get("words") == "收益率":
      start = index
      break
  for index,item in enumerate(words_result):
    if index > start:
      words = item.get("words")
      if '清仓次数' in words:
        words_split = words.split(',清仓盈利')
        value = words_split[1]
        start_word = index
        if value == '' and words_result[index + 1]:
          value = words_result[index + 1].get("words")
          start_word = index + 1
        if index < len(words_result) - 1:
          date_list.append({
            "name": words_split[0],
            "value": value,
            "lists": [[]]
          })
      elif '清仓' in words:
        date_list[-1]["lists"][-1].append(words)
        if index < len(words_result) - 1 and '清仓次数' not in words_result[index + 1].get("words"):
          date_list[-1]["lists"].append([])
      else:
        if index > start_word:
          date_list[-1]["lists"][-1].append(words)
  return date_list

9. 读取数据保存到excel中

  1. 将需要的数据拼接到一个列表中;
  2. 获取盈利数据转浮点数,返回列表;
  3. 对盈利点数求和;
  4. 通过盈利点数排序;
  5. 通过持有天数排序;
  6. 循环将数据存入到excel表格中;
  7. 判断表格是否存在,存在就删除,重新保存;
  8. 不存在就直接保存;
  9. 关闭弹窗,程序运行结束。
# 将读取数据保存到excel中
def save_info_to_excel(infos):
  info = openpyxl.Workbook()
  sheet = info.active
  all_infos = []
  for item in infos:
    lists = item.get("lists")
    if lists and len(lists) > 0:
      all_infos += lists
  all_infos_total = list(map(lambda vals: float(vals[2].replace(',','')), all_infos))      
  total = sum(all_infos_total)
  # print('total',total)
  sorted_infos = list(sorted(all_infos, key=lambda vals: float(vals[2].replace(',',''))))
  # print('sorted_infos',sorted_infos)
  days_infos = list(sorted(all_infos, key=lambda vals: int(vals[1])))
  # print('days_infos',days_infos)
  for index,vals in enumerate(all_infos):
    # 直接数据列表
    sheet.cell(row=index+1, column=1).value = vals[0]
    sheet.cell(row=index+1, column=2).value = vals[1]
    sheet.cell(row=index+1, column=3).value = vals[2]
    sheet.cell(row=index+1, column=4).value = vals[3]
    sheet.cell(row=index+1, column=5).value = vals[4]
    # 盈利金额排序
    infos = sorted_infos[index]
    sheet.cell(row=index+1, column=7).value = infos[0]
    sheet.cell(row=index+1, column=8).value = infos[1]
    sheet.cell(row=index+1, column=9).value = infos[2]
    sheet.cell(row=index+1, column=10).value = infos[3]
    sheet.cell(row=index+1, column=11).value = infos[4]
    # 按照持有时间排序
    days = days_infos[index]
    sheet.cell(row=index+1, column=13).value = days[0]
    sheet.cell(row=index+1, column=14).value = days[1]
    sheet.cell(row=index+1, column=15).value = days[2]
    sheet.cell(row=index+1, column=16).value = days[3]
    sheet.cell(row=index+1, column=17).value = days[4]
  # 总积
  sheet.cell(row=len(all_infos) + 1, column=1).value = '总计'
  sheet.cell(row=len(all_infos) + 1, column=2).value = round(total,2)

  if not os.path.exists('./股票清仓信息.xlsx'):
    info.save('./股票清仓信息.xlsx')
  else:
    os.remove('./股票清仓信息.xlsx')
    info.save('./股票清仓信息.xlsx')
  print('股票清仓信息.xlsx保存成功')
  print('3s 后自动关闭打印窗口!')
  time.sleep(3)
  print('关闭打印窗口!')

10. 获取信息图片示例

【Python 实战】---- 批量识别图片中的文字,存入excel中【使用百度的通用文字识别】 【Python 实战】---- 批量识别图片中的文字,存入excel中【使用百度的通用文字识别】

11. 运行实例

【Python 实战】---- 批量识别图片中的文字,存入excel中【使用百度的通用文字识别】

12. 运行结果

【Python 实战】---- 批量识别图片中的文字,存入excel中【使用百度的通用文字识别】

13. 各个文件的位置

【Python 实战】---- 批量识别图片中的文字,存入excel中【使用百度的通用文字识别】

14. 完整代码

import requests
import json
import base64
import os
import time
import re
import openpyxl
from openpyxl.styles import *

id = '35108270'
key = 'xxx'
secret = 'xxx'

# 获取 access_token
def get_access_token():
  access_token = get_local_access_token()
  if access_token == False:
    return get_request_access_token()
  else:
    return access_token

# 获取本地的 access_token
def get_local_access_token():
  # 是否存在保存 token 的文件夹,不存在就创建,并且返回 False,表示本地没有找到 access_token
  if not os.path.exists(f'./token/'):
    os.makedirs(f'./token/')
    return False
  # 获取文件夹中的token
  files = os.listdir("./token")
  file_names = list(filter(lambda x : x.split('.').pop() in ['txt'], files))
  # 如果没有找到保存 access_token 的文件,返回 False
  if len(file_names) == 0:
    return False
  sort_names = list(sorted(file_names, key=lambda x:(int(re.sub('\D', '', x)),x)))
  last_time_name = sort_names[-1]
  # 存储token的时间
  save_time = int(re.sub('\D', '', last_time_name))
  # 由于token的有效时间是30天,因此判断29进行刷新
  effective_time = 60 * 60 * 24 * 29
  # 获取token执行的当前时间
  current_time = int(time.time())
  # 保存 access_token 的变量
  access_token = ""
  # 判断如果超出有效期,重新刷新 access_token
  if current_time - save_time > effective_time:
    return False
  else:
    # 获取本地文件中缓存的 access_token
    with open("./token/" + last_time_name, "r",encoding="utf-8") as f:
      access_token = f.read()
  return access_token

# 获取百度的 access_token
def get_request_access_token():
  url = f'https://aip.baidubce.com/oauth/2.0/token?grant_type=client_credentials&client_id={key}&client_secret={secret}'
  payload = ""
  headers = {
    'Content-Type': 'application/json',
    'Accept': 'application/json'
  }
  response = requests.request("POST", url, headers=headers, data=payload)
  if response:
    access_token = response.json().get('access_token')
    if access_token:
      with open("./token/" + str(int(time.time())) + ".txt", "w",encoding="utf-8") as f:
        f.write(access_token)
      return str(access_token)
    else:
      print(response.json())
      print('无正确的 access_token 返回!!!')
      print('3s 后自动关闭打印窗口!')
      time.sleep(3)
      print('关闭打印窗口!')

# 获取图片中的文字
def get_image_to_text(access_token, path):
  request_url = "https://aip.baidubce.com/rest/2.0/ocr/v1/accurate_basic"
  # 二进制方式打开图片文件
  with open(path, 'rb') as f:
    img = base64.b64encode(f.read())
    params = {"image":img}
    request_url = request_url + "?access_token=" + access_token
    headers = {'content-type': 'application/x-www-form-urlencoded'}
    response = requests.post(request_url, data=params, headers=headers)
    if response:
      words_result = response.json().get("words_result")
      if words_result:
        print(f'图片:{path} 读取完成!')
        return words_to_object(words_result)
        time.sleep(0.5)
      else:
        print(response.json())
        print('识别文字无返回!!!')
        print('3s 后自动关闭打印窗口!')
        time.sleep(3)
        print('关闭打印窗口!')

# 将文本信息列表转字典
def words_to_object(words_result):
  start = 0
  date_list = []
  # print('words_result',words_result)
  # 获取起始角标
  for index,item in enumerate(words_result):
    if item.get("words") == "收益率":
      start = index
      break
  for index,item in enumerate(words_result):
    if index > start:
      words = item.get("words")
      if '清仓次数' in words:
        words_split = words.split(',清仓盈利')
        value = words_split[1]
        start_word = index
        if value == '' and words_result[index + 1]:
          value = words_result[index + 1].get("words")
          start_word = index + 1
        if index < len(words_result) - 1:
          date_list.append({
            "name": words_split[0],
            "value": value,
            "lists": [[]]
          })
      elif '清仓' in words:
        date_list[-1]["lists"][-1].append(words)
        if index < len(words_result) - 1 and '清仓次数' not in words_result[index + 1].get("words"):
          date_list[-1]["lists"].append([])
      else:
        if index > start_word:
          date_list[-1]["lists"][-1].append(words)
  return date_list

# 获取所有图片的图片信息
def get_all_image_info(path, image_names, access_token):
  all_list_data = []
  for name in image_names:
    all_list_data += get_image_to_text(access_token, f"{path}/{name}")
  # print('all_list_data',all_list_data)
  return all_list_data

# 获取文件夹下所有图片文件名称
def get_all_image_names(path):
  names = os.listdir(path)
  image_names = list(filter(lambda x : x.split('.').pop() in ['jpg', 'png', 'jpeg', 'bmp'], names))
  return image_names

# 将读取数据保存到excel中
def save_info_to_excel(infos):
  info = openpyxl.Workbook()
  sheet = info.active
  all_infos = []
  for item in infos:
    lists = item.get("lists")
    if lists and len(lists) > 0:
      all_infos += lists
  all_infos_total = list(map(lambda vals: float(vals[2].replace(',','')), all_infos))      
  total = sum(all_infos_total)
  # print('total',total)
  sorted_infos = list(sorted(all_infos, key=lambda vals: float(vals[2].replace(',',''))))
  # print('sorted_infos',sorted_infos)
  days_infos = list(sorted(all_infos, key=lambda vals: int(vals[1])))
  # print('days_infos',days_infos)
  for index,vals in enumerate(all_infos):
    # 直接数据列表
    sheet.cell(row=index+1, column=1).value = vals[0]
    sheet.cell(row=index+1, column=2).value = vals[1]
    sheet.cell(row=index+1, column=3).value = vals[2]
    sheet.cell(row=index+1, column=4).value = vals[3]
    sheet.cell(row=index+1, column=5).value = vals[4]
    # 盈利金额排序
    infos = sorted_infos[index]
    sheet.cell(row=index+1, column=7).value = infos[0]
    sheet.cell(row=index+1, column=8).value = infos[1]
    sheet.cell(row=index+1, column=9).value = infos[2]
    sheet.cell(row=index+1, column=10).value = infos[3]
    sheet.cell(row=index+1, column=11).value = infos[4]
    # 按照持有时间排序
    days = days_infos[index]
    sheet.cell(row=index+1, column=13).value = days[0]
    sheet.cell(row=index+1, column=14).value = days[1]
    sheet.cell(row=index+1, column=15).value = days[2]
    sheet.cell(row=index+1, column=16).value = days[3]
    sheet.cell(row=index+1, column=17).value = days[4]
  # 总积
  sheet.cell(row=len(all_infos) + 1, column=1).value = '总计'
  sheet.cell(row=len(all_infos) + 1, column=2).value = round(total,2)

  if not os.path.exists('./股票清仓信息.xlsx'):
    info.save('./股票清仓信息.xlsx')
  else:
    os.remove('./股票清仓信息.xlsx')
    info.save('./股票清仓信息.xlsx')
  print('股票清仓信息.xlsx保存成功')
  print('3s 后自动关闭打印窗口!')
  time.sleep(3)
  print('关闭打印窗口!')

if __name__ == '__main__':
  path = './images'
  # 获取 access_token
  access_token = get_access_token()
  # 获取images下所有文件
  image_names = get_all_image_names(path)
  # 获取所有图片的信息
  if access_token and len(image_names) > 0:
    all_info = get_all_image_info(path, image_names, access_token)
    # 将信息存储到excel表格中
    save_info_to_excel(all_info)

15. 总结

  1. 识别存在一定的误差,所以对返回数据进行处理时,需要细心筛选你需要的数据;
  2. access_token 是 30 天有效期,因此建议请求一次,就将最新的进行存储到本地,下次直接使用本地有效 access_token;
  3. 投资有风险,入行需谨慎。
转载自:https://juejin.cn/post/7387320448168443942
评论
请登录