翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
Amazon S3 からストリーミングデータをロードする
Lambda を使用して、Amazon S3 から OpenSearch Service ドメインにデータを送信できます。S3 バケットに到着する新しいデータにより、Lambda へのイベント通知がトリガーされた後、インデックス作成を実行するカスタムコードが実行されます。
このデータのストリーミング方法には非常に柔軟性があります。オブジェクトメタデータのインデックスを作成
前提条件
続行する前に、以下のリソースが必要です。
前提条件 | 説明 |
---|---|
Amazon S3 バケット | 詳細については、Amazon Simple Storage Service ユーザーガイドの「最初の S3 バケットを作成する」を参照してください。バケットは、OpenSearch Service ドメインと同じリージョンに存在する必要があります。 |
OpenSearch Service ドメイン | Lambda 関数により処理された後のデータのターゲット。詳細については、「 OpenSearch Service ドメインの作成」を参照してください。 |
Lambda デプロイパッケージを作成する
デプロイパッケージは、コードとその依存関係を含む ZIP または JAR ファイルです。このセクションには、Python サンプルコードがあります。他のプログラミング言語については、AWS Lambda デベロッパーガイドの「Lambda デプロイパッケージ」を参照してください。
-
ディレクトリを作成します。このサンプルでは、名前
s3-to-opensearch
を使用します。 -
sample.py
という名前のディレクトリ内にファイルを作成します。import boto3 import re import requests from requests_aws4auth import AWS4Auth region = '' # e.g. us-west-1 service = 'es' credentials = boto3.Session().get_credentials() awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token) host = '' # the OpenSearch Service domain, e.g. https://search-mydomain.us-west-1.es.amazonaws.com index = 'lambda-s3-index' datatype = '_doc' url = host + '/' + index + '/' + datatype headers = { "Content-Type": "application/json" } s3 = boto3.client('s3') # Regular expressions used to parse some simple log lines ip_pattern = re.compile('(\d+\.\d+\.\d+\.\d+)') time_pattern = re.compile('\[(\d+\/\w\w\w\/\d\d\d\d:\d\d:\d\d:\d\d\s-\d\d\d\d)\]') message_pattern = re.compile('\"(.+)\"') # Lambda execution starts here def handler(event, context): for record in event['Records']: # Get the bucket name and key for the new file bucket = record['s3']['bucket']['name'] key = record['s3']['object']['key'] # Get, read, and split the file into lines obj = s3.get_object(Bucket=bucket, Key=key) body = obj['Body'].read() lines = body.splitlines() # Match the regular expressions to each line and index the JSON for line in lines: line = line.decode("utf-8") ip = ip_pattern.search(line).group(1) timestamp = time_pattern.search(line).group(1) message = message_pattern.search(line).group(1) document = { "ip": ip, "timestamp": timestamp, "message": message } r = requests.post(url, auth=awsauth, json=document, headers=headers)
region
とhost
の変数を編集します。 -
まだ持っていない場合、pip をインストール
してから、依存関係を新しい package
ディレクトリにインストールします。cd s3-to-opensearch pip install --target ./package requests pip install --target ./package requests_aws4auth
すべての Lambda 実行環境に Boto3
がインストールされているため、デプロイパッケージに含める必要はありません。 -
アプリケーションコードや相互依存性をパッケージ化します。
cd package zip -r ../lambda.zip . cd .. zip -g lambda.zip sample.py
Lambda 関数を作成する
デプロイパッケージを作成すると、Lambda 関数を作成できます。関数を作成するとき、名前、ランタイム (たとえば、Python 3.8)、IAM ロールを選択します。IAM ロールにより、関数の許可が定義されます。詳細な手順については、AWS Lambda デベロッパーガイドの「コンソールで Lambda 関数を作成する」を参照してください。
この例では、コンソールを使用していることを前提としています。次のスクリーンショットに示すように、Python 3.9 と、S3 読み取り許可および OpenSearch Service 書き込み許可を持つロールを選択します。

関数を作成した後、トリガーを追加する必要があります。この例では、S3 バケットにログファイルが到着するたびにコードを実行します。
-
[トリガーの追加] を選択し、[S3] を選択します。
-
バケットを選択します。
-
[イベントタイプ] で、[PUT] を選択します。
-
[プレフィックス] に
logs/
と入力します。 -
[サフィックス] では、
.log
と入力します。 -
再帰呼び出しの警告を確認し、[追加] を選択します。
最後に、デプロイパッケージをアップロードすることができます。
-
[~からアップロード] および [.zip ファイルをアップロード] を選択してから、デプロイパッケージをアップロードするプロンプトに従います。
-
アップロードが終了したら、[ランタイム設定] を編集し、[ハンドラ] を
sample.handler
に変更します。この設定により、トリガー後に実行するファイル (sample.py
) およびメソッド (handler
) が Lambda に通知されます。
この時点で、リソースがすべて揃いました (ログファイルのバケット、ログファイルがバケットに追加されるたびに実行される関数、解析とインデックス作成を実行するコード、検索および可視化のための OpenSearch Service ドメイン)。
Lambda 関数をテストする
関数を作成した後、Amazon S3 バケットにファイルをアップロードしてテストできます。次のサンプルログの行を使用して、sample.log
という名前のファイルを作成します。
12.345.678.90 - [10/Oct/2000:13:55:36 -0700] "PUT /some-file.jpg" 12.345.678.91 - [10/Oct/2000:14:56:14 -0700] "GET /some-file.jpg"
S3 バケットの logs
フォルダにファイルをアップロードします。手順については、Amazon Simple Storage Service ユーザーガイドの「バケットにオブジェクトをアップロードする」を参照してください。
次に、OpenSearch Service コンソールまたは OpenSearch Dashboards を使用して、lambda-s3-index
インデックスに 2 つのドキュメントが含まれていることを確認します。標準検索リクエストを行うこともできます。
GET https://domain-name
/lambda-s3-index/_search?pretty
{
"hits" : {
"total" : 2,
"max_score" : 1.0,
"hits" : [
{
"_index" : "lambda-s3-index",
"_type" : "_doc",
"_id" : "vTYXaWIBJWV_TTkEuSDg",
"_score" : 1.0,
"_source" : {
"ip" : "12.345.678.91",
"message" : "GET /some-file.jpg",
"timestamp" : "10/Oct/2000:14:56:14 -0700"
}
},
{
"_index" : "lambda-s3-index",
"_type" : "_doc",
"_id" : "vjYmaWIBJWV_TTkEuCAB",
"_score" : 1.0,
"_source" : {
"ip" : "12.345.678.90",
"message" : "PUT /some-file.jpg",
"timestamp" : "10/Oct/2000:13:55:36 -0700"
}
}
]
}
}