Amazon S3 からストリーミングデータをロードする - Amazon OpenSearch Service

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Amazon S3 からストリーミングデータをロードする

Lambda を使用して、Amazon S3 から OpenSearch Service ドメインにデータを送信できます。S3 バケットに到着する新しいデータにより、Lambda へのイベント通知がトリガーされた後、インデックス作成を実行するカスタムコードが実行されます。

このデータのストリーミング方法には非常に柔軟性があります。オブジェクトメタデータのインデックスを作成したり、オブジェクトがプレーンテキストの場合は、オブジェクト本文の要素を解析してインデックス作成したりすることができます。このセクションでは、正規表現を使用してログファイルを解析し、一致をインデックス作成するシンプルな Python サンプルコードがあります。

前提条件

続行する前に、以下のリソースが必要です。

前提条件 説明
Amazon S3 バケット 詳細については、Amazon Simple Storage Service ユーザーガイドの「最初の S3 バケットを作成する」を参照してください。バケットは、OpenSearch Service ドメインと同じリージョンに存在する必要があります。
OpenSearch Service ドメイン Lambda 関数により処理された後のデータのターゲット。詳細については、「 OpenSearch Service ドメインの作成」を参照してください。

Lambda デプロイパッケージを作成する

デプロイパッケージは、コードとその依存関係を含む ZIP または JAR ファイルです。このセクションには、Python サンプルコードがあります。他のプログラミング言語については、AWS Lambda デベロッパーガイドの「Lambda デプロイパッケージ」を参照してください。

  1. ディレクトリを作成します。このサンプルでは、名前 s3-to-opensearch を使用します。

  2. sample.py という名前のディレクトリ内にファイルを作成します。

    import boto3 import re import requests from requests_aws4auth import AWS4Auth region = '' # e.g. us-west-1 service = 'es' credentials = boto3.Session().get_credentials() awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token) host = '' # the OpenSearch Service domain, e.g. https://search-mydomain.us-west-1.es.amazonaws.com index = 'lambda-s3-index' datatype = '_doc' url = host + '/' + index + '/' + datatype headers = { "Content-Type": "application/json" } s3 = boto3.client('s3') # Regular expressions used to parse some simple log lines ip_pattern = re.compile('(\d+\.\d+\.\d+\.\d+)') time_pattern = re.compile('\[(\d+\/\w\w\w\/\d\d\d\d:\d\d:\d\d:\d\d\s-\d\d\d\d)\]') message_pattern = re.compile('\"(.+)\"') # Lambda execution starts here def handler(event, context): for record in event['Records']: # Get the bucket name and key for the new file bucket = record['s3']['bucket']['name'] key = record['s3']['object']['key'] # Get, read, and split the file into lines obj = s3.get_object(Bucket=bucket, Key=key) body = obj['Body'].read() lines = body.splitlines() # Match the regular expressions to each line and index the JSON for line in lines: line = line.decode("utf-8") ip = ip_pattern.search(line).group(1) timestamp = time_pattern.search(line).group(1) message = message_pattern.search(line).group(1) document = { "ip": ip, "timestamp": timestamp, "message": message } r = requests.post(url, auth=awsauth, json=document, headers=headers)

    regionhost の変数を編集します。

  3. まだ持っていない場合、pip をインストールしてから、依存関係を新しい package ディレクトリにインストールします。

    cd s3-to-opensearch pip install --target ./package requests pip install --target ./package requests_aws4auth

    すべての Lambda 実行環境に Boto3 がインストールされているため、デプロイパッケージに含める必要はありません。

  4. アプリケーションコードや相互依存性をパッケージ化します。

    cd package zip -r ../lambda.zip . cd .. zip -g lambda.zip sample.py

Lambda 関数を作成する

デプロイパッケージを作成すると、Lambda 関数を作成できます。関数を作成するとき、名前、ランタイム (たとえば、Python 3.8)、IAM ロールを選択します。IAM ロールにより、関数の許可が定義されます。詳細な手順については、AWS Lambda デベロッパーガイドの「コンソールで Lambda 関数を作成する」を参照してください。

この例では、コンソールを使用していることを前提としています。次のスクリーンショットに示すように、Python 3.9 と、S3 読み取り許可および OpenSearch Service 書き込み許可を持つロールを選択します。

Lambda 関数のサンプル設定

関数を作成した後、トリガーを追加する必要があります。この例では、S3 バケットにログファイルが到着するたびにコードを実行します。

  1. [トリガーの追加] を選択し、[S3] を選択します。

  2. バケットを選択します。

  3. [イベントタイプ] で、[PUT] を選択します。

  4. [プレフィックス] に logs/ と入力します。

  5. [サフィックス] では、.log と入力します。

  6. 再帰呼び出しの警告を確認し、[追加] を選択します。

最後に、デプロイパッケージをアップロードすることができます。

  1. [~からアップロード] および [.zip ファイルをアップロード] を選択してから、デプロイパッケージをアップロードするプロンプトに従います。

  2. アップロードが終了したら、[ランタイム設定] を編集し、[ハンドラ] を sample.handler に変更します。この設定により、トリガー後に実行するファイル (sample.py) およびメソッド (handler) が Lambda に通知されます。

この時点で、リソースがすべて揃いました (ログファイルのバケット、ログファイルがバケットに追加されるたびに実行される関数、解析とインデックス作成を実行するコード、検索および可視化のための OpenSearch Service ドメイン)。

Lambda 関数をテストする

関数を作成した後、Amazon S3 バケットにファイルをアップロードしてテストできます。次のサンプルログの行を使用して、sample.log という名前のファイルを作成します。

12.345.678.90 - [10/Oct/2000:13:55:36 -0700] "PUT /some-file.jpg" 12.345.678.91 - [10/Oct/2000:14:56:14 -0700] "GET /some-file.jpg"

S3 バケットの logs フォルダにファイルをアップロードします。手順については、Amazon Simple Storage Service ユーザーガイドの「バケットにオブジェクトをアップロードする」を参照してください。

次に、OpenSearch Service コンソールまたは OpenSearch Dashboards を使用して、lambda-s3-index インデックスに 2 つのドキュメントが含まれていることを確認します。標準検索リクエストを行うこともできます。

GET https://domain-name/lambda-s3-index/_search?pretty { "hits" : { "total" : 2, "max_score" : 1.0, "hits" : [ { "_index" : "lambda-s3-index", "_type" : "_doc", "_id" : "vTYXaWIBJWV_TTkEuSDg", "_score" : 1.0, "_source" : { "ip" : "12.345.678.91", "message" : "GET /some-file.jpg", "timestamp" : "10/Oct/2000:14:56:14 -0700" } }, { "_index" : "lambda-s3-index", "_type" : "_doc", "_id" : "vjYmaWIBJWV_TTkEuCAB", "_score" : 1.0, "_source" : { "ip" : "12.345.678.90", "message" : "PUT /some-file.jpg", "timestamp" : "10/Oct/2000:13:55:36 -0700" } } ] } }