Amazon S3 イベント通知を使用した加速クロール - AWS Glue

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Amazon S3 イベント通知を使用した加速クロール

Amazon S3 または Data Catalog ターゲットからオブジェクトを一覧表示する代わりに、Amazon S3 イベントを使用して変更を検索するようにクローラーを設定できます。この機能は、Amazon S3 または Data Catalog ターゲット全体を一覧表示するのではなく、Amazon S3 イベントを使用してイベントをトリガーしたサブフォルダからのすべてのファイルを一覧表示して 2 つのクロール間の変更を識別することによって、再クロール時間を短縮します。

最初のクロールでは、ターゲットからのすべての Amazon S3 オブジェクトを一覧表示します。最初のクロールの成功後、手動または設定されたスケジュールでリクロールを選択できます。クローラーは、すべてのオブジェクトをリストするのではなく、それらのイベントのオブジェクトのみをリストします。

Amazon S3 イベントベースのクローラーに移行する利点は以下のとおりです。

  • ターゲットからのすべてのオブジェクトの一覧表示を要しない場合は、より速く再クロールできます。その代わりに、オブジェクトが追加または削除される特定のフォルダが一覧表示されます。

  • オブジェクトが追加または削除される特定のフォルダの一覧表示を行うと、全体的なクロールコストが削減されます。

Amazon S3 イベントクロールは、クローラーのスケジュールに基づいて SQS キューから Amazon S3 イベントを使うことで実行します。キューにイベントがない場合、費用はかかりません。Amazon S3 イベントは、SQS キューに直接送信するように設定できます。また、複数のコンシューマーが同じイベント、SNS と SQS の組み合わせを必要とする場合にも設定できます。詳細については、「Amazon S3 イベント通知のアカウントを設定します。」を参照してください。

イベントモードでクローラーを作成して設定した後の最初のクロールは、Amazon S3 または Data Catalog ターゲットの完全な一覧表示を行う一覧表示モードで実行されます。次のログは、最初に成功したクロール、「クロールは Amazon S3 イベントを使用して実行します」の後に、Amazon S3 イベントを使用してクロールのオペレーションを確認します。

Amazon S3 イベントクロールを作成し、クロールに影響を与える可能性のあるクローラーのプロパティを更新すると、クロールがリストモードで動作し、「クロールは S3 イベントモードで実行されていません」というログが追加されます。

注記

クロールごとに消費するメッセージの最大数は 10,000 メッセージです。

カタログターゲット

ターゲットが Data Catalog の場合、クローラーは変更内容 (テーブル内の追加パーティションなど) で Data Catalog 内の既存のテーブルを更新します。

Amazon S3 イベント通知のアカウントを設定します。

このセクションでは、Amazon S3 イベント通知用にアカウントを設定する方法について説明します。また、スクリプトまたは AWS Glue コンソールを使用して行う手順について説明します。

前提条件

以下の設定タスクを実行します。括弧内の値は、スクリプトの構成可能な設定を参照している点に注意してください。

  1. Amazon S3 バケットを作成します (s3_bucket_name)。

  2. 識別バケットのパスであるクローラーターゲット (folder_name「test1」などの) を識別します。

  3. クローラー名を準備します (crawler_name)

  4. クローラー名と同じである可能性がある SNS トピック名 (sns_topic_name) を準備します。

  5. クローラーが実行し、S3 バケットが存在する AWS リージョンを準備します。 (region)

  6. Amazon S3 イベントを取得するためにメールアドレスを使用する場合は、オプションでメールアドレスを準備します(subscribing_email)。

CloudFormation スタックを使用してリソースを作成することもできます。以下のステップを実行します。

  1. 米国東部 (バージニア北部) で CloudFormation スタックを起動します。

  2. [パラメータ] に、Amazon S3 バケットの名前 (アカウント番号を含む) を入力します。

  3. I acknowledge that AWS CloudFormation might create IAM resources with custom names を選択します。

  4. [Create stack] を選択します。

制限:

  • ターゲットが Amazon S3 または Data Catalog であるかにかかわらず、クローラーがサポートするのは単一のターゲットのみです。

  • プライベート VPC の SQS はサポートされていません。

  • Amazon S3 サンプリングはサポートされていません。

  • クローラーターゲットは、Amazon S3 ターゲットの場合はフォルダ、Data Catalog ターゲットの場合は 1 つ、または複数の AWS Glue Data Catalog テーブルにする必要があります。

  • 「すべての」パスのワイルドカードをサポートしていません: s3: //%

  • Data Catalog ターゲットの場合、Amazon S3 イベントモードでは、すべてのカタログテーブルが同じ Amazon S3 バケットをポイントする必要があります。

  • Data Catalog ターゲットの場合、カタログテーブルは Delta Lake 形式の Amazon S3 ロケーションをポイントしない必要があります (_symlink フォルダが含まれる、またはカタログテーブルの InputFormat を確認)。

Amazon S3 イベントベースのクローラーを使用するには、S3 ターゲットと同じプレフィックスからフィルタリングされたイベント付きの S3 バケットでイベント通知を有効にし、SQS に保存する必要があります。SQS とイベント通知は、[チュートリアル:通知のバケットを設定する] または、SQS を生成し、ターゲットから Amazon S3 イベントを設定するスクリプト。 の手順に従って、コンソールから設定できます。

SQS ポリシー

クローラーが使用するロールに添付する必要がある次の SQS ポリシーを追加します。

{ "Version": "2012-10-17", "Statement": [ { "Sid": "VisualEditor0", "Effect": "Allow", "Action": [ "sqs:DeleteMessage", "sqs:GetQueueUrl", "sqs:ListDeadLetterSourceQueues", "sqs:ReceiveMessage", "sqs:GetQueueAttributes", "sqs:ListQueueTags", "sqs:SetQueueAttributes", "sqs:PurgeQueue" ], "Resource": "arn:aws:sqs:{region}:{accountID}:cfn-sqs-queue" } ] }

SQS を生成し、ターゲットから Amazon S3 イベントを設定するスクリプト。

前提条件が満たされていることを確認したら、以下の Python スクリプトを実行して SQS を作成します。構成可能な設定を、前提条件から用意された名前に置き換えます。

注記

スクリプトを実行した後、SQS コンソールにログインして、作成した SQS の ARN を見つけます。

Amazon SQS は、可視性タイムアウト、つまり Amazon SQS が他のカスタマーがそのメッセージの受信や処理できなくなる期間を設定します。可視性タイムアウトをクロールのランタイム時間とほぼ等しく設定します。

#!venv/bin/python import boto3 import botocore #---------Start : READ ME FIRST ----------------------# # 1. Purpose of this script is to create the SQS, SNS and enable S3 bucket notification. # The following are the operations performed by the scripts: # a. Enable S3 bucket notification to trigger 's3:ObjectCreated:' and 's3:ObjectRemoved:' events. # b. Create SNS topic for fan out. # c. Create SQS queue for saving events which will be consumed by the crawler. # SQS Event Queue ARN will be used to create the crawler after running the script. # 2. This script does not create the crawler. # 3. SNS topic is created to support FAN out of S3 events. If S3 event is also used by another # purpose, SNS topic created by the script can be used. # 1. Creation of bucket is an optional step. # To create a bucket set create_bucket variable to true. # 2. The purpose of crawler_name is to easily locate the SQS/SNS. # crawler_name is used to create SQS and SNS with the same name as crawler. # 3. 'folder_name' is the target of crawl inside the specified bucket 's3_bucket_name' # #---------End : READ ME FIRST ------------------------# #--------------------------------# # Start : Configurable settings # #--------------------------------# #Create region = 'us-west-2' s3_bucket_name = 's3eventtestuswest2' folder_name = "test" crawler_name = "test33S3Event" sns_topic_name = crawler_name sqs_queue_name = sns_topic_name create_bucket = False #-------------------------------# # End : Configurable settings # #-------------------------------# # Define aws clients dev = boto3.session.Session(profile_name='myprofile') boto3.setup_default_session(profile_name='myprofile') s3 = boto3.resource('s3', region_name=region) sns = boto3.client('sns', region_name=region) sqs = boto3.client('sqs', region_name=region) client = boto3.client("sts") account_id = client.get_caller_identity()["Account"] queue_arn = "" def print_error(e): print(e.message + ' RequestId: ' + e.response['ResponseMetadata']['RequestId']) def create_s3_bucket(bucket_name, client): bucket = client.Bucket(bucket_name) try: if not create_bucket: return True response = bucket.create( ACL='private', CreateBucketConfiguration={ 'LocationConstraint': region }, ) return True except botocore.exceptions.ClientError as e: print_error(e) if 'BucketAlreadyOwnedByYou' in e.message: # we own this bucket so continue print('We own the bucket already. Lets continue...') return True return False def create_s3_bucket_folder(bucket_name, client, directory_name): s3.put_object(Bucket=bucket_name, Key=(directory_name + '/')) def set_s3_notification_sns(bucket_name, client, topic_arn): bucket_notification = client.BucketNotification(bucket_name) try: response = bucket_notification.put( NotificationConfiguration={ 'TopicConfigurations': [ { 'Id' : crawler_name, 'TopicArn': topic_arn, 'Events': [ 's3:ObjectCreated:*', 's3:ObjectRemoved:*', ], 'Filter' : {'Key': {'FilterRules': [{'Name': 'prefix', 'Value': folder_name}]}} }, ] } ) return True except botocore.exceptions.ClientError as e: print_error(e) return False def create_sns_topic(topic_name, client): try: response = client.create_topic( Name=topic_name ) return response['TopicArn'] except botocore.exceptions.ClientError as e: print_error(e) return None def set_sns_topic_policy(topic_arn, client, bucket_name): try: response = client.set_topic_attributes( TopicArn=topic_arn, AttributeName='Policy', AttributeValue='''{ "Version": "2008-10-17", "Id": "s3-publish-to-sns", "Statement": [{ "Effect": "Allow", "Principal": { "AWS" : "*" }, "Action": [ "SNS:Publish" ], "Resource": "%s", "Condition": { "StringEquals": { "AWS:SourceAccount": "%s" }, "ArnLike": { "aws:SourceArn": "arn:aws:s3:*:*:%s" } } }] }''' % (topic_arn, account_id, bucket_name) ) return True except botocore.exceptions.ClientError as e: print_error(e) return False def subscribe_to_sns_topic(topic_arn, client, protocol, endpoint): try: response = client.subscribe( TopicArn=topic_arn, Protocol=protocol, Endpoint=endpoint ) return response['SubscriptionArn'] except botocore.exceptions.ClientError as e: print_error(e) return None def create_sqs_queue(queue_name, client): try: response = client.create_queue( QueueName=queue_name, ) return response['QueueUrl'] except botocore.exceptions.ClientError as e: print_error(e) return None def get_sqs_queue_arn(queue_url, client): try: response = client.get_queue_attributes( QueueUrl=queue_url, AttributeNames=[ 'QueueArn', ] ) return response['Attributes']['QueueArn'] except botocore.exceptions.ClientError as e: print_error(e) return None def set_sqs_policy(queue_url, queue_arn, client, topic_arn): try: response = client.set_queue_attributes( QueueUrl=queue_url, Attributes={ 'Policy': '''{ "Version": "2012-10-17", "Id": "AllowSNSPublish", "Statement": [ { "Sid": "AllowSNSPublish01", "Effect": "Allow", "Principal": "*", "Action": "SQS:SendMessage", "Resource": "%s", "Condition": { "ArnEquals": { "aws:SourceArn": "%s" } } } ] }''' % (queue_arn, topic_arn) } ) return True except botocore.exceptions.ClientError as e: print_error(e) return False if __name__ == "__main__": print('Creating S3 bucket %s.' % s3_bucket_name) if create_s3_bucket(s3_bucket_name, s3): print('\nCreating SNS topic %s.' % sns_topic_name) topic_arn = create_sns_topic(sns_topic_name, sns) if topic_arn: print('SNS topic created successfully: %s' % topic_arn) print('Creating SQS queue %s' % sqs_queue_name) queue_url = create_sqs_queue(sqs_queue_name, sqs) if queue_url is not None: print('Subscribing sqs queue with sns.') queue_arn = get_sqs_queue_arn(queue_url, sqs) if queue_arn is not None: if set_sqs_policy(queue_url, queue_arn, sqs, topic_arn): print('Successfully configured queue policy.') subscription_arn = subscribe_to_sns_topic(topic_arn, sns, 'sqs', queue_arn) if subscription_arn is not None: if 'pending confirmation' in subscription_arn: print('Please confirm SNS subscription by visiting the subscribe URL.') else: print('Successfully subscribed SQS queue: ' + queue_arn) else: print('Failed to subscribe SNS') else: print('Failed to set queue policy.') else: print("Failed to get queue arn for %s" % queue_url) # ------------ End subscriptions to SNS topic ----------------- print('\nSetting topic policy to allow s3 bucket %s to publish.' % s3_bucket_name) if set_sns_topic_policy(topic_arn, sns, s3_bucket_name): print('SNS topic policy added successfully.') if set_s3_notification_sns(s3_bucket_name, s3, topic_arn): print('Successfully configured event for S3 bucket %s' % s3_bucket_name) print('Create S3 Event Crawler using SQS ARN %s' % queue_arn) else: print('Failed to configure S3 bucket notification.') else: print('Failed to add SNS topic policy.') else: print('Failed to create SNS topic.')

コンソールを使用した Amazon S3 イベント通知用のクローラーの設定 (Amazon S3 ターゲット)

Amazon S3 ターゲットのために AWS Glue コンソールを使用して Amazon S3 イベント通知用のクローラーを設定するには、以下の手順を実行します。

  1. クローラーのプロパティを設定します。詳細については、「AWS Glue コンソールでのクローラー設定オプションの設定」を参照してください。

  2. [データソースの設定] セクションに、[データは AWS Glue テーブルにマッピング済みですか?] という質問が表示されています。

    デフォルトでは、[Not yet] (まだです) が選択されています。Amazon S3 のデータソースを使用しており、データがまだ AWS Glue テーブルにマップされていないため、これはデフォルトままにしておきます。

  3. [Data sources] (データソース) セクションで、[Add a data source] (データソースを追加) を選択します。

    Data source configuration interface with options to select or add data sources for crawling.
  4. [Add data source] (データソースの追加) ダイアログで、Amazon S3 データソースを以下のように設定します。

    • [Data source] (データソース): デフォルトで、Amazon S3 が選択されています。

    • [Network connection] (ネットワーク接続) (オプション): [Add new connection] (新しい接続を追加) を選択します。

    • [Location of Amazon S3 data] (Amazon S3 データの場所): デフォルトで、[In this account] (このアカウント内) が選択されています。

    • [Amazon S3 path] (Amazon S3 パス): フォルダとファイルがクロールされる Amazon S3 パスを指定します。

    • [Subsequent crawler runs] (それ以降のクローラー実行): クローラーに関する Amazon S3 イベント通知を使用するには、[Crawl based on events] (イベントに基づくクロール) を選択します。

    • [Include SQS ARN] (SQS ARN を含める): 有効な SQS ARN を含むデータストアパラメータを指定します。(例えば、arn:aws:sqs:region:account:sqs)

    • [Include dead-letter SQS ARN] (配信不能 SQS ARN を含める) (オプション): 有効な Amazon 配信不能 SQS ARN を指定します。(例えば、arn:aws:sqs:region:account:deadLetterQueue)

    • [Add an Amazon S3 data source] (Amazon S3 データソースを追加) を選択します。

    Add data source dialog for S3, showing options for network connection and crawl settings.

AWS CLI を使用した Amazon S3 イベント通知のクローラーの設定

次に示すのは、SQS キューを作成し、Amazon S3 ターゲットバケットでイベント通知を設定するための Amazon S3 AWS CLI コールの例です。

S3 Event AWS CLI aws sqs create-queue --queue-name MyQueue --attributes file://create-queue.json create-queue.json ``` { "Policy": { "Version": "2012-10-17", "Id": "example-ID", "Statement": [ { "Sid": "example-statement-ID", "Effect": "Allow", "Principal": { "Service": "s3.amazonaws.com" }, "Action": [ "SQS:SendMessage" ], "Resource": "SQS-queue-ARN", "Condition": { "ArnLike": { "aws:SourceArn": "arn:aws:s3:*:*:awsexamplebucket1" }, "StringEquals": { "aws:SourceAccount": "bucket-owner-account-id" } } } ] } } ``` aws s3api put-bucket-notification-configuration --bucket customer-data-pdx --notification-configuration file://s3-event-config.json s3-event-config.json ``` { "QueueConfigurations": [ { "Id": "s3event-sqs-queue", "QueueArn": "arn:aws:sqs:{region}:{account}:queuename", "Events": [ "s3:ObjectCreated:*", "s3:ObjectRemoved:*" ], "Filter": { "Key": { "FilterRules": [ { "Name": "Prefix", "Value": "/json" } ] } } } ] } ``` Create Crawler:

コンソールを使用した Amazon S3 イベント通知用のクローラーの設定 (Data Catalog ターゲット)

カタログターゲットがあるときは、AWS Glue コンソールを使用して Amazon S3 イベント通知用のクローラーを設定します。

  1. クローラーのプロパティを設定します。詳細については、「AWS Glue コンソールでのクローラー設定オプションの設定」を参照してください。

  2. [データソースの設定] セクションに、[データは AWS Glue テーブルにマッピング済みですか?] という質問が表示されています。

    [Yes] (はい) を選択して、Data Catalog からの既存のテーブルをデータソースとして選択します。

  3. [Glue tables] (Glue テーブル) セクションで、[Add tables] (テーブルを追加する) を選択します。

    Data source configuration interface with options to select existing Glue tables or add new ones.
  4. [Add table] (テーブルを追加する) モーダルで、データベースとテーブルを設定します。

    • [Network connection] (ネットワーク接続) (オプション): [Add new connection] (新しい接続を追加) を選択します。

    • [Database] (データベース): Data Catalog 内のデータベースを選択します。

    • [Tables] (テーブル): Data Catalog 内のデータベースから 1 つ、または複数のテーブルを選択します。

    • [Subsequent crawler runs] (それ以降のクローラー実行): クローラーに関する Amazon S3 イベント通知を使用するには、[Crawl based on events] (イベントに基づくクロール) を選択します。

    • [Include SQS ARN] (SQS ARN を含める): 有効な SQS ARN を含むデータストアパラメータを指定します。(例えば、arn:aws:sqs:region:account:sqs)

    • [Include dead-letter SQS ARN] (配信不能 SQS ARN を含める) (オプション): 有効な Amazon 配信不能 SQS ARN を指定します。(例えば、arn:aws:sqs:region:account:deadLetterQueue)

    • [確認] を選択します。

    Add Glue tables dialog with network, database, tables, and crawler options.