1 Star 0 Fork 0

beyondme121/pdf-extractor

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
patch_gen_data_from_pdfs.py 10.78 KB
一键复制 编辑 原始数据 按行查看 历史
beyondme121 提交于 2022-08-14 09:11 . 提取参数和参数组
import pdfplumber as pdf_parser
import pandas as pd
import numpy as np
import os
import re
import utils
from tqdm import tqdm
pd.set_option('display.max_columns', None)
# 显示所有行
pd.set_option('display.max_rows', None)
# 设置value的显示长度为100,默认为50
pd.set_option('max_colwidth', 500)
class PDFParser(object):
def __init__(self, dir_pdf_filepath, page_from_to_config, output_path):
# self.__filename = filename
self.__pages = [] # 保存过滤后的文件页码, 根据指定条件
self.__result = [] # 保存最终的数据
self.dir_path = dir_pdf_filepath
self.page_from_to_config = page_from_to_config
self.output_path = output_path
def parse_pdf_table(self, filename, page_from, page_to):
"""
读取pdf文件, 从指定的开始和结束页码识别pdf中的table,
:param filename:
:param page_from:
:param page_to:
:return:
"""
with pdf_parser.open(filename) as pdf:
for i in tqdm(range(page_from - 1, page_to), desc='Processing' + filename):
current_page = pdf.pages[i]
table = current_page.extract_table()
if table is None or len(table) <= 2:
continue
else:
including_na_table = self.parse_empty_to_nan(table)
self.__result.append(including_na_table)
self.__pages.append(i)
return self
# 提取pdf-page的文字版本
def parse_page_to_text(self, filename, page_from, page_to):
with pdf_parser.open(filename) as pdf:
for i in tqdm(range(page_from - 1, page_to), desc='Processing' + filename):
current_page = pdf.pages[i]
page_text = current_page.extract_words()
print(page_text)
# self.__result.append(including_na_table)
# self.__pages.append(i)
return self
@staticmethod
def parse_empty_to_nan(page_table):
"""
将空单元格转换成 nan, 便于pandas 填充nan
:param page_table:
:return:
"""
_table = []
for row in page_table:
_row = []
for cell in row:
if cell is None:
_row.append(np.nan)
continue
elif re.match(r'^\s?$', cell):
_row.append(np.nan)
else:
_row.append(cell)
_table.append(_row)
return _table
def transform_data_new(self):
"""
转换数据
1. 跨页码时,在新的页码表格内容的第一行为空, 自动填充上一页表格的最后有内容的单元格内容
:return:
"""
_result = []
for ind, item in enumerate(self.__result): # item: 2 dim
for index in range(len(item)): # inner: 1 dim
if 0 < index < len(item) - 1:
pre_item = item[index - 1]
current_item = item[index]
next_item = item[index + 1]
# 如果当前行的前两列为空,并且当前行的下一行不为空,那就把当前行的其他列和当前行的上一行的内容进行合并, 同时删除当前行
if pd.isna(current_item[0]) and pd.isna(current_item[1]) and pd.isna(next_item[0]) is False:
# print("pre_item:", pre_item)
# print("current_item:", current_item)
# print("next_item:", next_item)
# print("==>", pre_item[2].join(current_item[2]))
_result.append(current_item)
else:
# print("inner else current_item:", current_item)
_result.append(current_item)
else:
# print("outer else current_item:", item[index])
_result.append(item[index])
# if pd.isna(item[index][0]) and pd.isna(item[index][1]) and pd.isna(item[index+1][0]) is False:
# item[index-1][2] = item[index-1][2].join(item[index][2])
# print("current:", item[index])
# print("next:", item[index + 1])
df = pd.DataFrame(_result)
df.fillna(method='ffill', inplace=True) # na单元格填充
self.__result = df.drop_duplicates()
return self
def transform_data(self, is_fill):
"""
:param is_fill: 提取的行数据,当当前单元格内容为nan,是否使用上一个单元格内容进行填充
:return:
"""
_list = []
print("is_fill,", is_fill)
for item in self.__result:
df = pd.DataFrame(item)
if is_fill is True:
df.fillna(method='ffill', inplace=True) # na单元格填充
df.drop_duplicates() # 删除每一页重复行
_list.append(df)
# df.drop_duplicates()
self.__result = pd.concat(_list).drop_duplicates()
return self
# 对原始数据进行向上填充
def transform_ffill(self):
_result = []
for index, page in enumerate(self.__result): # item: 2 dim
if index == 0:
for j, item in enumerate(page):
_result.append(item)
else:
del page[0]
for j, item in enumerate(page):
_result.append(item)
df = pd.DataFrame(_result)
df.fillna(method='ffill', inplace=True) # na单元格填充
self.__result = df.drop_duplicates()
return self
def transform_data_review(self, is_fill):
_result = []
# 保存每一页表头的列的数量
page_columns_count = []
# 遍历每一页的table数组, 保存表头的列数
for index, page in enumerate(self.__result): # item: 2 dim
for j, item in enumerate(page):
if j == 0:
page_columns_count.append(len(item))
# 如果每页的列数相同直接转换(即每一页的任意一列都没有纵向单元格拆分)
if (np.sum(page_columns_count) / len(page_columns_count)) % 2 == 0:
for index, page in enumerate(self.__result):
# 如果不是第一页的table, 保留表头名称
if index != 0:
del page[0]
for j, item in enumerate(page): # item是某一页的每一行
# print(item)
_result.append(item)
else:
for index, page in enumerate(self.__result):
# 当前page页,表头列的数量
header_columns_count = len(page[0])
# 如果不是第一页的table, 并且表头的列数和第一页的列数相同, 删除第一行的表头, 然后存储内容数据, 不需要单独处理
if index == 0:
for j, item in enumerate(page): # item是某一页的每一行
# print("当前页的表头与第一页的表头一致", page_from + index + 1)
# print(item)
_result.append(item)
else:
# 如果不是第一页,后面的page页需要把table标题删除
del page[0]
if header_columns_count <= page_columns_count[0]:
# print("当前页的表头与第一页的表头一致", page_from + index + 1)
for j, item in enumerate(page): # item是某一页的每一行
# print(item)
_result.append(item)
else:
# print("差异表头:", page_from + index + 1)
# 当某一页的列数量大于第一页的列数量,根据观察,将当前行的下标为2,3的内容进行拼接
for j, item in enumerate(page):
# print(item)
# print(item[2], pd.isna(item[2]), item[3], pd.isna(item[3]))
if pd.isna(item[2]) is True and pd.isna(item[3]) is True:
item[2], item[3] = '', ''
elif pd.isna(item[3]) is True:
item[3] = ''
else:
item[2] = str(item[2]) + ":" + str(item[3])
# 最终删除纵向(一个单元格从上到下的线)拆分的单元格
del item[3]
_result.append(item)
df = pd.DataFrame(_result)
# 如果需要填充,解决上下合并单元格的问题
if is_fill == 'True':
print(is_fill)
df.fillna(method='ffill', inplace=True) # na单元格填充
self.__result = df.drop_duplicates()
return self
# 指定输出路径文件,将结果保存到路径中
def to_excel_file(self, file_path):
if os.path.exists(file_path):
print('预先删除历史文件数据')
os.remove(file_path)
else:
# 如果文件所在路径不存在,返回False, 并创建路径
is_file_path_exists = utils.create_file_path_not_exists(file_path)
# 如果文件所在路径存在
if is_file_path_exists:
print('文件' + file_path + '不存在, 程序自动创建输出文件')
self.__result.to_excel(file_path, header=False, index=False)
self.__result = []
# 暴露统一调用接口API
def patch_generate(self):
config = pd.read_json(self.page_from_to_config)
for index, row in config.iterrows():
input_pdf_file = self.dir_path + row["filename"]
if pd.isna(row["output_file"]) is False:
output_file = row["output_file"] + '.xlsx'
else:
output_file = row["filename"].split('.')[0] + '.xlsx'
# 提取表格数据: 融入具体场景,加入容错的数据转换
self.parse_pdf_table(input_pdf_file, row["page_from"], row["page_to"]).transform_data_review(
is_fill=row["is_fill"]).to_excel_file(
self.output_path + output_file)
self.parse_page_to_text(input_pdf_file, row["page_from"], row["page_to"])
# 原始提取
# self.parse_pdf_table(input_pdf_file, row["page_from"], row["page_to"]).transform_data(
# is_fill=row["is_fill"]).to_excel_file(
# self.output_path + output_file)
# pdf_instance = PDFParser(dir_pdf_filepath="./product-book/",
# page_from_to_config="page_from_to_config.json",
# output_path="./output/"
# )
# pdf_instance.patch_generate()
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Python
1
https://gitee.com/sanfeng8848/pdf-extractor.git
[email protected]:sanfeng8848/pdf-extractor.git
sanfeng8848
pdf-extractor
pdf-extractor
main

搜索帮助