

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 处理受限的呼叫和断开的连接
<a name="handling-errors"></a>

如果您超过每秒最大交易次数 (TPS)、导致服务限制您的应用程序，或者连接断开，Amazon Textract 操作可能会失败。例如，如果您在短时间内对 Amazon Textract 操作进行了太多调用，则会限制您的呼叫并发送`ProvisionedThroughputExceededException`操作响应中出现错误。有关 Amazon Textract TPS 配额的信息，请参阅[Amazon Textract 配额](https://docs.aws.amazon.com/general/latest/gr/textract.html).

您可以通过自动重试操作来管理限制和断开的连接。您可以通过包括`Config`当您创建 Amazon Textract 客户端时，将参数。我们建议重试计数为 5。这些区域有：AWS在失败并引发异常之前，开发工具包会重试操作指定次数。有关更多信息，请参阅 。[AWS 中的错误重试和指数退避](https://docs.aws.amazon.com/general/latest/gr/api-retries.html).

**注意**  
自动重试适用于同步操作和异步操作。在指定自动重试之前，请确保您拥有最新版本的 AWS 开发工具包。有关更多信息，请参阅 [第 2 步：设置AWS CLI和AWS软件开发工具包](setup-awscli-sdk.md)。

以下示例说明了当您处理多个文档时，如何自动重试 Amazon Textract 操作。

**先决条件**
+ 如果您尚未执行以下操作，请：

  1. 使用创建或更新 IAM 用户`AmazonTextractFullAccess`和`AmazonS3ReadOnlyAccess`权限。有关更多信息，请参阅 [第 1 步：设置 AWS 账户并创建 IAM 用户](setting-up.md#setting-up-iam)。

  1. 安装和配置 AWS CLI 和 AWS 开发工具包。有关更多信息，请参阅 [第 2 步：设置AWS CLI和AWS软件开发工具包](setup-awscli-sdk.md)。

**自动重试操作**

1. 将多个文档图像上传到 S3 存储桶以运行同步示例。将多页文档上传到 S3 存储桶并运行`StartDocumentTextDetection`在它上面运行异步示例。

   有关说明，请参阅[将对象上传到 Amazon S3](https://docs.aws.amazon.com/AmazonS3/latest/user-guide/UploadingObjectsintoAmazonS3.html)中的*Amazon Simple Storage Service 用户指南*.

1. 以下示例演示了如何使用`Config`参数以自动重试操作。同步示例调用`DetectDocumentText`操作，而异步示例调用`GetDocumentTextDetection`operation.

------
#### [ Sync Example ]

   使用以下示例调用`DetectDocumentText`对 Amazon S3 存储桶中的文档进行操作。In`main`，更改的值`bucket`到您的 S3 存储桶。更改的值`documents`转到您在步骤 2 中上传的文档图像的名称。

   ```
   import boto3
   from botocore.client import Config
   # Documents
   
   def process_multiple_documents(bucket, documents):
       
       config = Config(retries = dict(max_attempts = 5))
    
       # Amazon Textract client
       textract = boto3.client('textract', config=config)
    
       for documentName in documents:
    
           print("\nProcessing: {}\n==========================================".format(documentName))
    
           # Call Amazon Textract
           response = textract.detect_document_text(
               Document={
                   'S3Object': {
                       'Bucket': bucket,
                       'Name': documentName
                   }
               })
    
           # Print detected text
           for item in response["Blocks"]:
               if item["BlockType"] == "LINE":
                   print ('\033[94m' +  item["Text"] + '\033[0m')
   
   
   def main():
       bucket = ""
       documents = ["document-image-1.png",
       "document-image-2.png", "document-image-3.png",
       "document-image-4.png", "document-image-5.png" ]
       process_multiple_documents(bucket, documents)
   
   
   
   if __name__ == "__main__":
       main()
   ```

------
#### [ Async Example ]

   使用以下示例调用 `GetDocumentTextDetection` 操作。假定您已经打过电话`StartDocumentTextDetection`在您的 Amazon S3 存储桶中的文档上，并获得了`JobId`. In`main`，更改的值`bucket`对于您的 S3 存储桶和的值`roleArn`转到分配给你的 Textract 角色的 Arn。您还需要更改的值`document`将替换为您的 Amazon S3 存储桶中的多页文档的名称。最后，将值替换为`region_name`提供您所在区域的名称并提供`GetResults`以您的名称为的函数`jobId`.

   ```
   import boto3
   from botocore.client import Config
   
   class DocumentProcessor:
       jobId = ''
       region_name = ''
   
       roleArn = ''
       bucket = ''
       document = ''
   
       sqsQueueUrl = ''
       snsTopicArn = ''
       processType = ''
   
       def __init__(self, role, bucket, document, region):
           self.roleArn = role
           self.bucket = bucket
           self.document = document
           self.region_name = region
           self.config = Config(retries = dict(max_attempts = 5))
   
           self.textract = boto3.client('textract', region_name=self.region_name, config=self.config)
           self.sqs = boto3.client('sqs')
           self.sns = boto3.client('sns')
   
   # Display information about a block
       def DisplayBlockInfo(self, block):
   
           print("Block Id: " + block['Id'])
           print("Type: " + block['BlockType'])
           if 'EntityTypes' in block:
               print('EntityTypes: {}'.format(block['EntityTypes']))
   
           if 'Text' in block:
               print("Text: " + block['Text'])
   
           if block['BlockType'] != 'PAGE':
               print("Confidence: " + "{:.2f}".format(block['Confidence']) + "%")
   
           print('Page: {}'.format(block['Page']))
   
           if block['BlockType'] == 'CELL':
               print('Cell Information')
               print('\tColumn: {} '.format(block['ColumnIndex']))
               print('\tRow: {}'.format(block['RowIndex']))
               print('\tColumn span: {} '.format(block['ColumnSpan']))
               print('\tRow span: {}'.format(block['RowSpan']))
   
               if 'Relationships' in block:
                   print('\tRelationships: {}'.format(block['Relationships']))
   
           print('Geometry')
           print('\tBounding Box: {}'.format(block['Geometry']['BoundingBox']))
           print('\tPolygon: {}'.format(block['Geometry']['Polygon']))
   
           if block['BlockType'] == 'SELECTION_ELEMENT':
               print('    Selection element detected: ', end='')
               if block['SelectionStatus'] == 'SELECTED':
                   print('Selected')
               else:
                   print('Not selected')
   
       def GetResults(self, jobId):
           maxResults = 1000
           paginationToken = None
           finished = False
   
           while finished == False:
   
               response = None
   
               if paginationToken == None:
                   response = self.textract.get_document_text_detection(JobId=jobId,
                                                                            MaxResults=maxResults)
               else:
                   response = self.textract.get_document_text_detection(JobId=jobId,
                                                                            MaxResults=maxResults,
                                                                            NextToken=paginationToken)
   
               blocks = response['Blocks']
               print('Detected Document Text')
               print('Pages: {}'.format(response['DocumentMetadata']['Pages']))
   
               # Display block information
               for block in blocks:
                   self.DisplayBlockInfo(block)
                   print()
                   print()
   
               if 'NextToken' in response:
                   paginationToken = response['NextToken']
               else:
                   finished = True
   
   def main():
       roleArn = 'role-arn'
       bucket = 'bucket-name'
       document = 'document-name'
       region_name = 'region-name'
       analyzer = DocumentProcessor(roleArn, bucket, document, region_name)
       analyzer.GetResults("job-id")
   
   if __name__ == "__main__":
       main()
   ```

------