Esportazione di tabelle in un file CSV - Amazon Textract

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Esportazione di tabelle in un file CSV

In questi esempi Python viene illustrato come esportare tabelle da un'immagine di un documento in un file (CSV) con valori separati dalla virgola.

L'esempio per l'analisi sincrona dei documenti raccoglie le informazioni della tabella da una chiamata aAnalyzeDocument. L'esempio per l'analisi asincrona dei documenti effettua una chiamata aStartDocumentAnalysise quindi recupera i risultati daGetDocumentAnalysiscomeBlockoggetti.

Le informazioni sulla tabella vengono restituite comeBlockoggetti da una chiamata aAnalyzeDocument. Per ulteriori informazioni, consultare Tabelle. LaBlockgli oggetti vengono memorizzati in una struttura mappa utilizzata per esportare i dati della tabella in un file CSV.

Synchronous

In questo esempio, utilizzerai le funzioni:

  • get_table_csv_results— ChiamaAnalyzeDocumente crea una mappa di tabelle rilevate nel documento. Crea una rappresentazione CSV di tutte le tabelle rilevate.

  • generate_table_csv— Genera il file CSV per una singola tabella.

  • get_rows_columns_map— Ottiene le righe e le colonne dalla mappa.

  • get_text— Ottiene il testo da una cella.

Per esportare le tabelle in un file CSV
  1. Configura l'ambiente. Per ulteriori informazioni, consultare Prerequisiti.

  2. Salvare il seguente codice di esempio in un file denominatotextract_python_table_parser.py.

    import webbrowser, os import json import boto3 import io from io import BytesIO import sys from pprint import pprint def get_rows_columns_map(table_result, blocks_map): rows = {} for relationship in table_result['Relationships']: if relationship['Type'] == 'CHILD': for child_id in relationship['Ids']: cell = blocks_map[child_id] if cell['BlockType'] == 'CELL': row_index = cell['RowIndex'] col_index = cell['ColumnIndex'] if row_index not in rows: # create new row rows[row_index] = {} # get the text value rows[row_index][col_index] = get_text(cell, blocks_map) return rows def get_text(result, blocks_map): text = '' if 'Relationships' in result: for relationship in result['Relationships']: if relationship['Type'] == 'CHILD': for child_id in relationship['Ids']: word = blocks_map[child_id] if word['BlockType'] == 'WORD': text += word['Text'] + ' ' if word['BlockType'] == 'SELECTION_ELEMENT': if word['SelectionStatus'] =='SELECTED': text += 'X ' return text def get_table_csv_results(file_name): with open(file_name, 'rb') as file: img_test = file.read() bytes_test = bytearray(img_test) print('Image loaded', file_name) # process using image bytes # get the results client = boto3.client('textract') response = client.analyze_document(Document={'Bytes': bytes_test}, FeatureTypes=['TABLES']) # Get the text blocks blocks=response['Blocks'] pprint(blocks) blocks_map = {} table_blocks = [] for block in blocks: blocks_map[block['Id']] = block if block['BlockType'] == "TABLE": table_blocks.append(block) if len(table_blocks) <= 0: return "<b> NO Table FOUND </b>" csv = '' for index, table in enumerate(table_blocks): csv += generate_table_csv(table, blocks_map, index +1) csv += '\n\n' return csv def generate_table_csv(table_result, blocks_map, table_index): rows = get_rows_columns_map(table_result, blocks_map) table_id = 'Table_' + str(table_index) # get cells. csv = 'Table: {0}\n\n'.format(table_id) for row_index, cols in rows.items(): for col_index, text in cols.items(): csv += '{}'.format(text) + "," csv += '\n' csv += '\n\n\n' return csv def main(file_name): table_csv = get_table_csv_results(file_name) output_file = 'output.csv' # replace content with open(output_file, "wt") as fout: fout.write(table_csv) # show the results print('CSV OUTPUT FILE: ', output_file) if __name__ == "__main__": file_name = sys.argv[1] main(file_name)
  3. Al prompt dei comandi inserisci il comando seguente: Replace (Sostituisci)filecon il nome del file immagine del documento che si desidera analizzare.

    python textract_python_table_parser.py file

Quando si esegue l'esempio, l'output CSV viene salvato in un file denominatooutput.csv.

Asynchronous

In questo esempio, utilizzerai due script diversi. Il primo script avvia il processo di analisi asincrona dei documenti conStartDocumentAnalysise ottiene ilBlockinformazioni restituite daGetDocumentAnalysis. Il secondo script prende il valore restituitoBlockinformazioni per ogni pagina, formatta i dati come tabella e salva le tabelle in un file CSV.

Per esportare le tabelle in un file CSV
  1. Configura l'ambiente. Per ulteriori informazioni, consultare Prerequisiti.

  2. Assicurati di aver seguito le istruzioni fornite a vediConfigurazione di Amazon Textract per operazioni asincrone. Il processo documentato in quella pagina consente di inviare e ricevere messaggi sullo stato di completamento dei processi asincroni.

  3. Nel seguente esempio di codice, sostituire il valore diroleArncon l'Arn assegnato al ruolo creato nella fase 2. Sostituisci il valore dibucketcon il nome del bucket S3 contenente il documento. Sostituisci il valore didocumentcon il nome del documento nel bucket S3. Sostituisci il valore diregion_namecon il nome della regione del bucket.

    Salvare il seguente codice di esempio in un file denominatostart_doc_analysis_for_table_extraction.py..

    import boto3 import time class DocumentProcessor: jobId = '' region_name = '' roleArn = '' bucket = '' document = '' sqsQueueUrl = '' snsTopicArn = '' processType = '' def __init__(self, role, bucket, document, region): self.roleArn = role self.bucket = bucket self.document = document self.region_name = region self.textract = boto3.client('textract', region_name=self.region_name) self.sqs = boto3.client('sqs') self.sns = boto3.client('sns') def ProcessDocument(self): jobFound = False response = self.textract.start_document_analysis(DocumentLocation={'S3Object': {'Bucket': self.bucket, 'Name': self.document}}, FeatureTypes=["TABLES", "FORMS"], NotificationChannel={'RoleArn': self.roleArn, 'SNSTopicArn': self.snsTopicArn}) print('Processing type: Analysis') print('Start Job Id: ' + response['JobId']) print('Done!') def CreateTopicandQueue(self): millis = str(int(round(time.time() * 1000))) # Create SNS topic snsTopicName = "AmazonTextractTopic" + millis topicResponse = self.sns.create_topic(Name=snsTopicName) self.snsTopicArn = topicResponse['TopicArn'] # create SQS queue sqsQueueName = "AmazonTextractQueue" + millis self.sqs.create_queue(QueueName=sqsQueueName) self.sqsQueueUrl = self.sqs.get_queue_url(QueueName=sqsQueueName)['QueueUrl'] attribs = self.sqs.get_queue_attributes(QueueUrl=self.sqsQueueUrl, AttributeNames=['QueueArn'])['Attributes'] sqsQueueArn = attribs['QueueArn'] # Subscribe SQS queue to SNS topic self.sns.subscribe(TopicArn=self.snsTopicArn, Protocol='sqs', Endpoint=sqsQueueArn) # Authorize SNS to write SQS queue policy = """{{ "Version":"2012-10-17", "Statement":[ {{ "Sid":"MyPolicy", "Effect":"Allow", "Principal" : {{"AWS" : "*"}}, "Action":"SQS:SendMessage", "Resource": "{}", "Condition":{{ "ArnEquals":{{ "aws:SourceArn": "{}" }} }} }} ] }}""".format(sqsQueueArn, self.snsTopicArn) response = self.sqs.set_queue_attributes( QueueUrl=self.sqsQueueUrl, Attributes={ 'Policy': policy }) def main(): roleArn = 'role-arn' bucket = 'bucket-name' document = 'document-name' region_name = 'region-name' analyzer = DocumentProcessor(roleArn, bucket, document, region_name) analyzer.CreateTopicandQueue() analyzer.ProcessDocument() if __name__ == "__main__": main()
  4. Eseguire il codice. Il codice stamperà un JobId. Copia questo JobId.

  5. Attendi che il lavoro finisca l'elaborazione e, una volta terminato, copia il seguente codice in un file denominatoget_doc_analysis_for_table_extraction.py. Sostituisci il valore dijobIdcon il Job ID copiato in precedenza. Sostituisci il valore diregion_namecon il nome della regione associata al ruolo Textract. Sostituisci il valore difile_namecon il nome da assegnare all'output CSV.

    import boto3 from pprint import pprint jobId = 'job-id' region_name = 'region-name' file_name = "output-file-name.csv" textract = boto3.client('textract', region_name=region_name) # Display information about a block def DisplayBlockInfo(block): print("Block Id: " + block['Id']) print("Type: " + block['BlockType']) if 'EntityTypes' in block: print('EntityTypes: {}'.format(block['EntityTypes'])) if 'Text' in block: print("Text: " + block['Text']) if block['BlockType'] != 'PAGE': print("Confidence: " + "{:.2f}".format(block['Confidence']) + "%") def GetResults(jobId, file_name): maxResults = 1000 paginationToken = None finished = False while finished == False: response = None if paginationToken == None: response = textract.get_document_analysis(JobId=jobId, MaxResults=maxResults) else: response = textract.get_document_analysis(JobId=jobId, MaxResults=maxResults, NextToken=paginationToken) blocks = response['Blocks'] table_csv = get_table_csv_results(blocks) output_file = file_name # replace content with open(output_file, "at") as fout: fout.write(table_csv) # show the results print('Detected Document Text') print('Pages: {}'.format(response['DocumentMetadata']['Pages'])) print('OUTPUT TO CSV FILE: ', output_file) # Display block information for block in blocks: DisplayBlockInfo(block) print() print() if 'NextToken' in response: paginationToken = response['NextToken'] else: finished = True def get_rows_columns_map(table_result, blocks_map): rows = {} for relationship in table_result['Relationships']: if relationship['Type'] == 'CHILD': for child_id in relationship['Ids']: try: cell = blocks_map[child_id] if cell['BlockType'] == 'CELL': row_index = cell['RowIndex'] col_index = cell['ColumnIndex'] if row_index not in rows: # create new row rows[row_index] = {} # get the text value rows[row_index][col_index] = get_text(cell, blocks_map) except KeyError: print("Error extracting Table data - {}:".format(KeyError)) pass return rows def get_text(result, blocks_map): text = '' if 'Relationships' in result: for relationship in result['Relationships']: if relationship['Type'] == 'CHILD': for child_id in relationship['Ids']: try: word = blocks_map[child_id] if word['BlockType'] == 'WORD': text += word['Text'] + ' ' if word['BlockType'] == 'SELECTION_ELEMENT': if word['SelectionStatus'] == 'SELECTED': text += 'X ' except KeyError: print("Error extracting Table data - {}:".format(KeyError)) return text def get_table_csv_results(blocks): pprint(blocks) blocks_map = {} table_blocks = [] for block in blocks: blocks_map[block['Id']] = block if block['BlockType'] == "TABLE": table_blocks.append(block) if len(table_blocks) <= 0: return "<b> NO Table FOUND </b>" csv = '' for index, table in enumerate(table_blocks): csv += generate_table_csv(table, blocks_map, index + 1) csv += '\n\n' return csv def generate_table_csv(table_result, blocks_map, table_index): rows = get_rows_columns_map(table_result, blocks_map) table_id = 'Table_' + str(table_index) # get cells. csv = 'Table: {0}\n\n'.format(table_id) for row_index, cols in rows.items(): for col_index, text in cols.items(): csv += '{}'.format(text) + "," csv += '\n' csv += '\n\n\n' return csv response_blocks = GetResults(jobId, file_name)
  6. Eseguire il codice.

    Dopo aver ottenuto i risultati, accertarsi di eliminare le risorse SNS e SQS associate oppure è possibile accumulare addebiti per loro.