将表导出到 CSV 文件 - Amazon Textract

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

将表导出到 CSV 文件

这些 Python 示例演示了如何将表从文档图像导出为以逗号分隔的值 (CSV) 文件。

同步文档分析的示例从调用中收集表格信息AnalyzeDocument. 异步文档分析的示例调用StartDocumentAnalysis然后从中检索结果GetDocumentAnalysis如同Block对象。

表格信息返回为Block来自调用的对象AnalyzeDocument. 有关更多信息,请参阅 。这些区域有:Block对象存储在用于将表数据导出到 CSV 文件的地图结构中。

Synchronous

在此示例中,您将使用以下函数:

  • get_table_csv_results— 调用AnalyzeDocument,然后构建文档中检测到的表格的映射。创建所有检测到的表格的 CSV 表示形式。

  • generate_table_csv— 为单个表生成 CSV 文件。

  • get_rows_columns_map— 从地图中获取行和列。

  • get_text— 从单元格中获取文本。

将表导出到 CSV 文件
  1. 配置您的环境。有关更多信息,请参阅 先决条件

  2. 将以下示例代码保存到名为的文件中textract_python_table_parser.py.

    import webbrowser, os import json import boto3 import io from io import BytesIO import sys from pprint import pprint def get_rows_columns_map(table_result, blocks_map): rows = {} for relationship in table_result['Relationships']: if relationship['Type'] == 'CHILD': for child_id in relationship['Ids']: cell = blocks_map[child_id] if cell['BlockType'] == 'CELL': row_index = cell['RowIndex'] col_index = cell['ColumnIndex'] if row_index not in rows: # create new row rows[row_index] = {} # get the text value rows[row_index][col_index] = get_text(cell, blocks_map) return rows def get_text(result, blocks_map): text = '' if 'Relationships' in result: for relationship in result['Relationships']: if relationship['Type'] == 'CHILD': for child_id in relationship['Ids']: word = blocks_map[child_id] if word['BlockType'] == 'WORD': text += word['Text'] + ' ' if word['BlockType'] == 'SELECTION_ELEMENT': if word['SelectionStatus'] =='SELECTED': text += 'X ' return text def get_table_csv_results(file_name): with open(file_name, 'rb') as file: img_test = file.read() bytes_test = bytearray(img_test) print('Image loaded', file_name) # process using image bytes # get the results client = boto3.client('textract') response = client.analyze_document(Document={'Bytes': bytes_test}, FeatureTypes=['TABLES']) # Get the text blocks blocks=response['Blocks'] pprint(blocks) blocks_map = {} table_blocks = [] for block in blocks: blocks_map[block['Id']] = block if block['BlockType'] == "TABLE": table_blocks.append(block) if len(table_blocks) <= 0: return "<b> NO Table FOUND </b>" csv = '' for index, table in enumerate(table_blocks): csv += generate_table_csv(table, blocks_map, index +1) csv += '\n\n' return csv def generate_table_csv(table_result, blocks_map, table_index): rows = get_rows_columns_map(table_result, blocks_map) table_id = 'Table_' + str(table_index) # get cells. csv = 'Table: {0}\n\n'.format(table_id) for row_index, cols in rows.items(): for col_index, text in cols.items(): csv += '{}'.format(text) + "," csv += '\n' csv += '\n\n\n' return csv def main(file_name): table_csv = get_table_csv_results(file_name) output_file = 'output.csv' # replace content with open(output_file, "wt") as fout: fout.write(table_csv) # show the results print('CSV OUTPUT FILE: ', output_file) if __name__ == "__main__": file_name = sys.argv[1] main(file_name)
  3. 在命令提示符处,输入以下命令:Replacefile将显示您要分析的文档图像文件的名称。

    python textract_python_table_parser.py file

运行示例时,CSV 输出将保存在名为的文件中output.csv.

Asynchronous

在此示例中,您将使用两个不同的脚本。第一个脚本启动了异步分析文档的过程StartDocumentAnalysis并获取Block返回的信息GetDocumentAnalysis. 第二个脚本采用返回的Block每个页面的信息,将数据格式化为表格,然后将表格保存到 CSV 文件中。

将表导出到 CSV 文件
  1. 配置您的环境。有关更多信息,请参阅 先决条件

  2. 确保你已按照见上的说明进行操作为异步操作配置 Amazon Textract. 该页面上记录的过程使您能够发送和接收有关异步作业完成状态的消息。

  3. 在以下代码示例中,替换roleArn将 Arn 分配给您在步骤 2 中创建的角色。替换的值bucket将包含您的文档的 S3 存储桶的名称。替换的值document将包含您的 S3 存储桶中的文档的名称。替换的值region_name将包含您的存储桶所在区域的名称。

    将以下示例代码保存到名为的文件中start_doc_analysis_for_table_extraction.py。.

    import boto3 import time class DocumentProcessor: jobId = '' region_name = '' roleArn = '' bucket = '' document = '' sqsQueueUrl = '' snsTopicArn = '' processType = '' def __init__(self, role, bucket, document, region): self.roleArn = role self.bucket = bucket self.document = document self.region_name = region self.textract = boto3.client('textract', region_name=self.region_name) self.sqs = boto3.client('sqs') self.sns = boto3.client('sns') def ProcessDocument(self): jobFound = False response = self.textract.start_document_analysis(DocumentLocation={'S3Object': {'Bucket': self.bucket, 'Name': self.document}}, FeatureTypes=["TABLES", "FORMS"], NotificationChannel={'RoleArn': self.roleArn, 'SNSTopicArn': self.snsTopicArn}) print('Processing type: Analysis') print('Start Job Id: ' + response['JobId']) print('Done!') def CreateTopicandQueue(self): millis = str(int(round(time.time() * 1000))) # Create SNS topic snsTopicName = "AmazonTextractTopic" + millis topicResponse = self.sns.create_topic(Name=snsTopicName) self.snsTopicArn = topicResponse['TopicArn'] # create SQS queue sqsQueueName = "AmazonTextractQueue" + millis self.sqs.create_queue(QueueName=sqsQueueName) self.sqsQueueUrl = self.sqs.get_queue_url(QueueName=sqsQueueName)['QueueUrl'] attribs = self.sqs.get_queue_attributes(QueueUrl=self.sqsQueueUrl, AttributeNames=['QueueArn'])['Attributes'] sqsQueueArn = attribs['QueueArn'] # Subscribe SQS queue to SNS topic self.sns.subscribe(TopicArn=self.snsTopicArn, Protocol='sqs', Endpoint=sqsQueueArn) # Authorize SNS to write SQS queue policy = """{{ "Version":"2012-10-17", "Statement":[ {{ "Sid":"MyPolicy", "Effect":"Allow", "Principal" : {{"AWS" : "*"}}, "Action":"SQS:SendMessage", "Resource": "{}", "Condition":{{ "ArnEquals":{{ "aws:SourceArn": "{}" }} }} }} ] }}""".format(sqsQueueArn, self.snsTopicArn) response = self.sqs.set_queue_attributes( QueueUrl=self.sqsQueueUrl, Attributes={ 'Policy': policy }) def main(): roleArn = 'role-arn' bucket = 'bucket-name' document = 'document-name' region_name = 'region-name' analyzer = DocumentProcessor(roleArn, bucket, document, region_name) analyzer.CreateTopicandQueue() analyzer.ProcessDocument() if __name__ == "__main__": main()
  4. 运行该代码。该代码将打印一个 JobId。向下复制这个 JobId。

  5. 等待您的作业完成处理,完成后,将以下代码复制到名为的文件get_doc_analysis_for_table_extraction.py. 替换的值jobId用你之前复制的 Job ID。替换的值region_name使用与 Textract 角色关联的区域的名称。替换的值file_name使用您要为输出 CSV 指定的名称。

    import boto3 from pprint import pprint jobId = 'job-id' region_name = 'region-name' file_name = "output-file-name.csv" textract = boto3.client('textract', region_name=region_name) # Display information about a block def DisplayBlockInfo(block): print("Block Id: " + block['Id']) print("Type: " + block['BlockType']) if 'EntityTypes' in block: print('EntityTypes: {}'.format(block['EntityTypes'])) if 'Text' in block: print("Text: " + block['Text']) if block['BlockType'] != 'PAGE': print("Confidence: " + "{:.2f}".format(block['Confidence']) + "%") def GetResults(jobId, file_name): maxResults = 1000 paginationToken = None finished = False while finished == False: response = None if paginationToken == None: response = textract.get_document_analysis(JobId=jobId, MaxResults=maxResults) else: response = textract.get_document_analysis(JobId=jobId, MaxResults=maxResults, NextToken=paginationToken) blocks = response['Blocks'] table_csv = get_table_csv_results(blocks) output_file = file_name # replace content with open(output_file, "at") as fout: fout.write(table_csv) # show the results print('Detected Document Text') print('Pages: {}'.format(response['DocumentMetadata']['Pages'])) print('OUTPUT TO CSV FILE: ', output_file) # Display block information for block in blocks: DisplayBlockInfo(block) print() print() if 'NextToken' in response: paginationToken = response['NextToken'] else: finished = True def get_rows_columns_map(table_result, blocks_map): rows = {} for relationship in table_result['Relationships']: if relationship['Type'] == 'CHILD': for child_id in relationship['Ids']: try: cell = blocks_map[child_id] if cell['BlockType'] == 'CELL': row_index = cell['RowIndex'] col_index = cell['ColumnIndex'] if row_index not in rows: # create new row rows[row_index] = {} # get the text value rows[row_index][col_index] = get_text(cell, blocks_map) except KeyError: print("Error extracting Table data - {}:".format(KeyError)) pass return rows def get_text(result, blocks_map): text = '' if 'Relationships' in result: for relationship in result['Relationships']: if relationship['Type'] == 'CHILD': for child_id in relationship['Ids']: try: word = blocks_map[child_id] if word['BlockType'] == 'WORD': text += word['Text'] + ' ' if word['BlockType'] == 'SELECTION_ELEMENT': if word['SelectionStatus'] == 'SELECTED': text += 'X ' except KeyError: print("Error extracting Table data - {}:".format(KeyError)) return text def get_table_csv_results(blocks): pprint(blocks) blocks_map = {} table_blocks = [] for block in blocks: blocks_map[block['Id']] = block if block['BlockType'] == "TABLE": table_blocks.append(block) if len(table_blocks) <= 0: return "<b> NO Table FOUND </b>" csv = '' for index, table in enumerate(table_blocks): csv += generate_table_csv(table, blocks_map, index + 1) csv += '\n\n' return csv def generate_table_csv(table_result, blocks_map, table_index): rows = get_rows_columns_map(table_result, blocks_map) table_id = 'Table_' + str(table_index) # get cells. csv = 'Table: {0}\n\n'.format(table_id) for row_index, cols in rows.items(): for col_index, text in cols.items(): csv += '{}'.format(text) + "," csv += '\n' csv += '\n\n\n' return csv response_blocks = GetResults(jobId, file_name)
  6. 运行该代码。

    获得结果后,请务必删除关联的 SNS 和 SQS 资源,否则可能会为它们产生费用。