

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

# 테이블을 CSV 파일로 내보내기
<a name="examples-export-table-csv"></a>

이 Python 예제는 문서의 이미지에서 테이블을 CSV (쉼표로 구분된 값) 파일로 내보내는 방법을 보여줍니다.

동기식 문서 분석의 예에서는 호출에서 테이블 정보를 수집합니다.[AnalyzeDocument](API_AnalyzeDocument.md). 비동기 문서 분석의 예제는 다음을 호출합니다.[StartDocumentAnalysis](API_StartDocumentAnalysis.md)그런 다음 다음 결과를 반환합니다.[GetDocumentAnalysis](API_GetDocumentAnalysis.md)같이`Block`객체.

테이블 정보는 다음과 같이 반환됩니다.[Block](API_Block.md)호출에서 받는 객체[AnalyzeDocument](API_AnalyzeDocument.md). 자세한 정보는 [테이블](how-it-works-tables.md)을 참조하십시오. 이`Block`객체는 테이블 데이터를 CSV 파일로 내보내는 데 사용되는 맵 구조에 저장됩니다.

------
#### [ Synchronous ]

이 예제에서는 다음과 같은 함수를 사용합니다.
+ `get_table_csv_results`— 통화[AnalyzeDocument](API_AnalyzeDocument.md)을 (를) 클릭하고 문서에서 검색된 테이블 맵을 작성합니다. 감지된 모든 테이블의 CSV 표현을 만듭니다.
+ `generate_table_csv`— 개별 테이블에 대한 CSV 파일을 생성합니다.
+ `get_rows_columns_map`— 맵에서 행과 열을 가져옵니다.
+ `get_text`— 셀에서 텍스트를 가져옵니다.

**테이블을 CSV 파일로 내보내려면**

1. 환경을 구성합니다. 자세한 정보는 [사전 조건](examples-blocks.md#examples-prerequisites)을 참조하십시오.

1. 다음 예제 코드를 이라는 파일에 저장합니다.*textract\$1python\$1table\$1parser.py*.

   ```
   import webbrowser, os
   import json
   import boto3
   import io
   from io import BytesIO
   import sys
   from pprint import pprint
   
   
   def get_rows_columns_map(table_result, blocks_map):
       rows = {}
       for relationship in table_result['Relationships']:
           if relationship['Type'] == 'CHILD':
               for child_id in relationship['Ids']:
                   cell = blocks_map[child_id]
                   if cell['BlockType'] == 'CELL':
                       row_index = cell['RowIndex']
                       col_index = cell['ColumnIndex']
                       if row_index not in rows:
                           # create new row
                           rows[row_index] = {}
                           
                       # get the text value
                       rows[row_index][col_index] = get_text(cell, blocks_map)
       return rows
   
   
   def get_text(result, blocks_map):
       text = ''
       if 'Relationships' in result:
           for relationship in result['Relationships']:
               if relationship['Type'] == 'CHILD':
                   for child_id in relationship['Ids']:
                       word = blocks_map[child_id]
                       if word['BlockType'] == 'WORD':
                           text += word['Text'] + ' '
                       if word['BlockType'] == 'SELECTION_ELEMENT':
                           if word['SelectionStatus'] =='SELECTED':
                               text +=  'X '    
       return text
   
   
   def get_table_csv_results(file_name):
   
       with open(file_name, 'rb') as file:
           img_test = file.read()
           bytes_test = bytearray(img_test)
           print('Image loaded', file_name)
   
       # process using image bytes
       # get the results
       client = boto3.client('textract')
   
       response = client.analyze_document(Document={'Bytes': bytes_test}, FeatureTypes=['TABLES'])
   
       # Get the text blocks
       blocks=response['Blocks']
       pprint(blocks)
   
       blocks_map = {}
       table_blocks = []
       for block in blocks:
           blocks_map[block['Id']] = block
           if block['BlockType'] == "TABLE":
               table_blocks.append(block)
   
       if len(table_blocks) <= 0:
           return "<b> NO Table FOUND </b>"
   
       csv = ''
       for index, table in enumerate(table_blocks):
           csv += generate_table_csv(table, blocks_map, index +1)
           csv += '\n\n'
   
       return csv
   
   def generate_table_csv(table_result, blocks_map, table_index):
       rows = get_rows_columns_map(table_result, blocks_map)
   
       table_id = 'Table_' + str(table_index)
       
       # get cells.
       csv = 'Table: {0}\n\n'.format(table_id)
   
       for row_index, cols in rows.items():
           
           for col_index, text in cols.items():
               csv += '{}'.format(text) + ","
           csv += '\n'
           
       csv += '\n\n\n'
       return csv
   
   def main(file_name):
       table_csv = get_table_csv_results(file_name)
   
       output_file = 'output.csv'
   
       # replace content
       with open(output_file, "wt") as fout:
           fout.write(table_csv)
   
       # show the results
       print('CSV OUTPUT FILE: ', output_file)
   
   
   if __name__ == "__main__":
       file_name = sys.argv[1]
       main(file_name)
   ```

1. 명령 프롬프트에서 다음 명령을 입력합니다. Replace`file`분석할 문서 이미지 파일의 이름을 사용합니다.

   ```
   python textract_python_table_parser.py file
   ```

예제를 실행하면 CSV 출력이 라는 파일에 저장됩니다.`output.csv`.

------
#### [ Asynchronous ]

이 예에서는 두 개의 서로 다른 스크립트를 make를 사용합니다. 첫 번째 스크립트는 다음과 같이 문서를 비동기적으로 분석하는 프로세스를 시작합니다.`StartDocumentAnalysis`를 가져옵니다.`Block`다음에 의해 반환되는 정보`GetDocumentAnalysis`. 두 번째 스크립트는 반환된 스크립트를 사용합니다.`Block`각 페이지에 대한 정보는 데이터를 테이블로 포맷하고 테이블을 CSV 파일에 저장합니다.

**테이블을 CSV 파일로 내보내려면**

1. 환경을 구성합니다. 자세한 정보는 [사전 조건](examples-blocks.md#examples-prerequisites)을 참조하십시오.

1. 참조 에 제공된 지침을 따랐는지 확인합니다.[비동기 작업을 위한 Amazon Textract 구성](api-async-roles.md). 이 페이지에 설명된 프로세스를 통해 비동기 작업의 완료 상태에 대한 메시지를 보내고 받을 수 있습니다.

1. 다음 코드 예제에서 다음 값을 바꿉니다.`roleArn`Arn이 2단계에서 생성한 역할에 할당됩니다. 값 바꾸기`bucket`문서가 포함된 S3 버킷의 이름입니다. 값 바꾸기`document`을 S3 버킷에 있는 문서의 이름으로 값 바꾸기`region_name`을 버킷 리전 이름으로 바꿉니다.

   다음 예제 코드를 이라는 파일에 저장합니다.*start\$1doc\$1analysis\$1for\$1table\$1extraction.py.*.

   ```
   import boto3
   import time
   
   class DocumentProcessor:
   
       jobId = ''
       region_name = ''
   
       roleArn = ''
       bucket = ''
       document = ''
   
       sqsQueueUrl = ''
       snsTopicArn = ''
       processType = ''
   
       def __init__(self, role, bucket, document, region):
           self.roleArn = role
           self.bucket = bucket
           self.document = document
           self.region_name = region
   
           self.textract = boto3.client('textract', region_name=self.region_name)
           self.sqs = boto3.client('sqs')
           self.sns = boto3.client('sns')
   
       def ProcessDocument(self):
   
           jobFound = False
   
           response = self.textract.start_document_analysis(DocumentLocation={'S3Object': {'Bucket': self.bucket, 'Name': self.document}},
                   FeatureTypes=["TABLES", "FORMS"], NotificationChannel={'RoleArn': self.roleArn, 'SNSTopicArn': self.snsTopicArn})
           print('Processing type: Analysis')
   
           print('Start Job Id: ' + response['JobId'])
   
           print('Done!')
   
       def CreateTopicandQueue(self):
   
           millis = str(int(round(time.time() * 1000)))
   
           # Create SNS topic
           snsTopicName = "AmazonTextractTopic" + millis
   
           topicResponse = self.sns.create_topic(Name=snsTopicName)
           self.snsTopicArn = topicResponse['TopicArn']
   
           # create SQS queue
           sqsQueueName = "AmazonTextractQueue" + millis
           self.sqs.create_queue(QueueName=sqsQueueName)
           self.sqsQueueUrl = self.sqs.get_queue_url(QueueName=sqsQueueName)['QueueUrl']
   
           attribs = self.sqs.get_queue_attributes(QueueUrl=self.sqsQueueUrl,
                                                   AttributeNames=['QueueArn'])['Attributes']
   
           sqsQueueArn = attribs['QueueArn']
   
           # Subscribe SQS queue to SNS topic
           self.sns.subscribe(TopicArn=self.snsTopicArn, Protocol='sqs', Endpoint=sqsQueueArn)
   
           # Authorize SNS to write SQS queue
           policy = """{{
         "Version":"2012-10-17",
         "Statement":[
           {{
             "Sid":"MyPolicy",
             "Effect":"Allow",
             "Principal" : {{"AWS" : "*"}},
             "Action":"SQS:SendMessage",
             "Resource": "{}",
             "Condition":{{
               "ArnEquals":{{
                 "aws:SourceArn": "{}"
               }}
             }}
           }}
         ]
       }}""".format(sqsQueueArn, self.snsTopicArn)
   
           response = self.sqs.set_queue_attributes(
               QueueUrl=self.sqsQueueUrl,
               Attributes={
                   'Policy': policy
               })
   
   def main():
       roleArn = 'role-arn'
       bucket = 'bucket-name'
       document = 'document-name'
       region_name = 'region-name'
   
       analyzer = DocumentProcessor(roleArn, bucket, document, region_name)
       analyzer.CreateTopicandQueue()
       analyzer.ProcessDocument()
   
   if __name__ == "__main__":
       main()
   ```

1. 코드를 실행합니다. 코드는 JobId 인쇄합니다. 이 JobId 복사합니다.

1.  작업 처리가 완료될 때까지 기다렸다가 작업이 완료된 후 다음 코드를 라는 파일에 복사합니다.*get\$1doc\$1analysis\$1for\$1table\$1extraction.py*. 값 바꾸기`jobId`이전에 복사한 Job ID를 사용합니다. 값 바꾸기`region_name`Textract 역할과 연결된 리전의 이름을 사용합니다. 값 바꾸기`file_name`를 CSV로 제공하려는 이름으로

   ```
   import boto3
   from pprint import pprint
   
   jobId = 'job-id'
   region_name = 'region-name'
   file_name = "output-file-name.csv"
   
   textract = boto3.client('textract', region_name=region_name)
   
   # Display information about a block
   def DisplayBlockInfo(block):
       print("Block Id: " + block['Id'])
       print("Type: " + block['BlockType'])
       if 'EntityTypes' in block:
           print('EntityTypes: {}'.format(block['EntityTypes']))
   
       if 'Text' in block:
           print("Text: " + block['Text'])
   
       if block['BlockType'] != 'PAGE':
           print("Confidence: " + "{:.2f}".format(block['Confidence']) + "%")
   
   def GetResults(jobId, file_name):
       maxResults = 1000
       paginationToken = None
       finished = False
   
       while finished == False:
   
           response = None
   
           if paginationToken == None:
               response = textract.get_document_analysis(JobId=jobId, MaxResults=maxResults)
           else:
               response = textract.get_document_analysis(JobId=jobId, MaxResults=maxResults,
                                                              NextToken=paginationToken)
   
           blocks = response['Blocks']
           table_csv = get_table_csv_results(blocks)
           output_file = file_name
           # replace content
           with open(output_file, "at") as fout:
               fout.write(table_csv)
           # show the results
           print('Detected Document Text')
           print('Pages: {}'.format(response['DocumentMetadata']['Pages']))
           print('OUTPUT TO CSV FILE: ', output_file)
   
           # Display block information
           for block in blocks:
               DisplayBlockInfo(block)
               print()
               print()
   
           if 'NextToken' in response:
               paginationToken = response['NextToken']
           else:
               finished = True
   
   
   def get_rows_columns_map(table_result, blocks_map):
       rows = {}
       for relationship in table_result['Relationships']:
           if relationship['Type'] == 'CHILD':
               for child_id in relationship['Ids']:
                   try:
                       cell = blocks_map[child_id]
                       if cell['BlockType'] == 'CELL':
                           row_index = cell['RowIndex']
                           col_index = cell['ColumnIndex']
                           if row_index not in rows:
                               # create new row
                               rows[row_index] = {}
   
                           # get the text value
                           rows[row_index][col_index] = get_text(cell, blocks_map)
                   except KeyError:
                       print("Error extracting Table data - {}:".format(KeyError))
                       pass
       return rows
   
   
   def get_text(result, blocks_map):
       text = ''
       if 'Relationships' in result:
           for relationship in result['Relationships']:
               if relationship['Type'] == 'CHILD':
                   for child_id in relationship['Ids']:
                       try:
                           word = blocks_map[child_id]
                           if word['BlockType'] == 'WORD':
                               text += word['Text'] + ' '
                           if word['BlockType'] == 'SELECTION_ELEMENT':
                               if word['SelectionStatus'] == 'SELECTED':
                                   text += 'X '
                       except KeyError:
                           print("Error extracting Table data - {}:".format(KeyError))
   
       return text
   
   
   def get_table_csv_results(blocks):
   
       pprint(blocks)
   
       blocks_map = {}
       table_blocks = []
       for block in blocks:
           blocks_map[block['Id']] = block
           if block['BlockType'] == "TABLE":
               table_blocks.append(block)
   
       if len(table_blocks) <= 0:
           return "<b> NO Table FOUND </b>"
   
       csv = ''
       for index, table in enumerate(table_blocks):
           csv += generate_table_csv(table, blocks_map, index + 1)
           csv += '\n\n'
   
       return csv
   
   
   def generate_table_csv(table_result, blocks_map, table_index):
       rows = get_rows_columns_map(table_result, blocks_map)
   
       table_id = 'Table_' + str(table_index)
   
       # get cells.
       csv = 'Table: {0}\n\n'.format(table_id)
   
       for row_index, cols in rows.items():
   
           for col_index, text in cols.items():
               csv += '{}'.format(text) + ","
           csv += '\n'
   
       csv += '\n\n\n'
       return csv
   
   response_blocks = GetResults(jobId, file_name)
   ```

1. 코드를 실행합니다.

   결과를 얻은 후에는 관련 SNS 및 SQS 리소스를 삭제해야 합니다. 그렇지 않으면 해당 리소스에 대한 요금이 발생할 수 있습니다.

------