

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 從 Amazon S3 載入串流資料
<a name="integrations-s3-lambda"></a>

您可以使用 Lambda 將資料從 Amazon S3 傳送到您的 OpenSearch Service 網域。送達 S3 儲存貯體的新資料會觸發 Lambda 的事件通知，然後執行您的自訂程式碼以執行索引。

這個串流資料方法非常靈活。您可以[ 索引物件中繼資料 ](https://aws.amazon.com/blogs/database/indexing-metadata-in-amazon-elasticsearch-service-using-aws-lambda-and-python/)，若是純文字的話，便可剖析和索引一些物件本體元素。本節包含一些簡單的 Python 範本程式碼，其使用規則表達式來剖析日誌檔案和索引比對。

## 先決條件
<a name="integrations-s3-lambda-prereq"></a>

繼續之前，您必須準備好以下資源。


****  

| 先決條件 | Description | 
| --- | --- | 
| Amazon S3 儲存貯體 | 如需詳細資訊，請參閱 Amazon Simple Storage Service 使用者指南中的[建立您的第一個 S3 儲存貯體](https://docs.aws.amazon.com/AmazonS3/latest/userguide/CreatingABucket.html)。儲存貯體必須位於與您的 OpenSearch Service 網域相同的區域中。 | 
| OpenSearch Service 網域 | 您的 Lambda 函數處理資料後的資料目的地。如需詳細資訊，請參閱[建立 OpenSearch Service 網域](createupdatedomains.md#createdomains)。 | 

## 建立 Lambda 部署套件
<a name="integrations-s3-lambda-deployment-package"></a>

部署套件是指包含您的程式碼及其相依存項目的 ZIP 或 JAR 檔案。本節包括 Python 範本程式碼。如需其他程式設計語言，請參閱 *AWS Lambda 開發人員指南*中的 [Lambda 部署套件](https://docs.aws.amazon.com/lambda/latest/dg/gettingstarted-package.html)。

1. 建立目錄。在此範例中，我們使用 `s3-to-opensearch` 這個名稱。

1. 在目錄中建立名為 `sample.py` 的檔案：

   ```
   import boto3
   import re
   import requests
   from requests_aws4auth import AWS4Auth
   
   region = '' # e.g. us-west-1
   service = 'es'
   credentials = boto3.Session().get_credentials()
   awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token)
   
   host = '' # the OpenSearch Service domain, e.g. https://search-mydomain.us-west-1.es.amazonaws.com
   index = 'lambda-s3-index'
   datatype = '_doc'
   url = host + '/' + index + '/' + datatype
   
   headers = { "Content-Type": "application/json" }
   
   s3 = boto3.client('s3')
   
   # Regular expressions used to parse some simple log lines
   ip_pattern = re.compile('(\d+\.\d+\.\d+\.\d+)')
   time_pattern = re.compile('\[(\d+\/\w\w\w\/\d\d\d\d:\d\d:\d\d:\d\d\s-\d\d\d\d)\]')
   message_pattern = re.compile('\"(.+)\"')
   
   # Lambda execution starts here
   def handler(event, context):
       for record in event['Records']:
   
           # Get the bucket name and key for the new file
           bucket = record['s3']['bucket']['name']
           key = record['s3']['object']['key']
   
           # Get, read, and split the file into lines
           obj = s3.get_object(Bucket=bucket, Key=key)
           body = obj['Body'].read()
           lines = body.splitlines()
   
           # Match the regular expressions to each line and index the JSON
           for line in lines:
               line = line.decode("utf-8")
               ip = ip_pattern.search(line).group(1)
               timestamp = time_pattern.search(line).group(1)
               message = message_pattern.search(line).group(1)
   
               document = { "ip": ip, "timestamp": timestamp, "message": message }
               r = requests.post(url, auth=awsauth, json=document, headers=headers)
   ```

   編輯 `region` 和 `host` 的變數。

1. [安裝 pip](https://pip.pypa.io/en/stable/installation/) (如果您尚未安裝的話)，然後將相依項目安裝到新的 `package` 目錄：

   ```
   cd s3-to-opensearch
   
   pip install --target ./package requests
   pip install --target ./package requests_aws4auth
   ```

   所有 Lambda 執行環境均已安裝 [Boto3](https://aws.amazon.com/sdk-for-python/)，因此您不需要將其包含在您的部署套件中。

1. 封裝應用程式的程式碼和相依性：

   ```
   cd package
   zip -r ../lambda.zip .
   
   cd ..
   zip -g lambda.zip sample.py
   ```

## 建立 Lambda 函式
<a name="integrations-s3-lambda-create"></a>

建立部署套件後，可建立 Lambda 函數。當您建立函數時，請選擇名稱、執行時間 (例如，Python 3.8) 和 IAM 角色。IAM 角色定義函數的許可。如需詳細說明，請參閱 *AWS Lambda 開發人員指南*中的[使用主控台建立 Lambda 函數](https://docs.aws.amazon.com/lambda/latest/dg/get-started-create-function.html)。

此範例假設您正在使用主控台。選擇 Python 3.9 和具有 S3 讀取許可和 OpenSearch Service 寫入許可的角色，如以下螢幕擷取畫面所示：

![\[Lambda 函數的範本組態\]](http://docs.aws.amazon.com/zh_tw/opensearch-service/latest/developerguide/images/lambda-function.png)


在建立函數，您必須新增觸發。在此範例中，我們希望只要日誌檔案送達 S3 儲存貯體，便執行程式碼：

1. 選擇 **Add trigger** (新增觸發條件)，然後選取 **S3**。

1. 選擇您的儲存貯體。

1. 針對 **Event type (事件類型)** 選擇 **PUT**。

1. 針對 **Prefix (字首)** 輸入 `logs/`。

1. 對於 **Suffix** (尾碼)，輸入 `.log`。

1. 確認遞迴叫用警告，然後選擇 **Add** (新增)。

最後，您可以上傳部署套件：

1. 選擇 **Upload from** (上傳自) 和 **.zip file** (.zip 檔案)，然後依照提示上傳您的部署套件。

1. 上傳完成後，編輯 **Runtime settings** (執行時間設定)，然後將 **Handler** (處理常式) 變更為 `sample.handler`。此設定通知 Lambda 應該在觸發後執行的檔案 (`sample.py`) 和方法 (`handler`)。

此時，您有一整組的資源：日誌檔案的儲存貯體、只要日誌檔案新增到儲存貯體便執行的函數、執行剖析和索引的程式碼，以及可搜尋和視覺化的 OpenSearch Service 網域。

## 測試 Lambda 函數
<a name="integrations-s3-lambda-configure"></a>

在建立函數之後，您可以進行測試，做法是上傳檔案到 Amazon S3 儲存貯體。使用下列範例日誌行，建立名為 `sample.log` 的檔案：

```
12.345.678.90 - [10/Oct/2000:13:55:36 -0700] "PUT /some-file.jpg"
12.345.678.91 - [10/Oct/2000:14:56:14 -0700] "GET /some-file.jpg"
```

上傳檔案到您的 S3 儲存貯體的 `logs` 資料夾。如需指示說明，請參閱 *Amazon Simple Storage Service 使用者指南*中的[將物件上傳至您的儲存貯體](https://docs.aws.amazon.com/AmazonS3/latest/userguide/PuttingAnObjectInABucket.html)。

然後使用 OpenSearch Service 主控台或 OpenSearch Dashboards 來確認 `lambda-s3-index` 索引包含兩個文件。您也可以提出標準搜尋請求：

```
GET https://domain-name/lambda-s3-index/_search?pretty
{
  "hits" : {
    "total" : 2,
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "lambda-s3-index",
        "_type" : "_doc",
        "_id" : "vTYXaWIBJWV_TTkEuSDg",
        "_score" : 1.0,
        "_source" : {
          "ip" : "12.345.678.91",
          "message" : "GET /some-file.jpg",
          "timestamp" : "10/Oct/2000:14:56:14 -0700"
        }
      },
      {
        "_index" : "lambda-s3-index",
        "_type" : "_doc",
        "_id" : "vjYmaWIBJWV_TTkEuCAB",
        "_score" : 1.0,
        "_source" : {
          "ip" : "12.345.678.90",
          "message" : "PUT /some-file.jpg",
          "timestamp" : "10/Oct/2000:13:55:36 -0700"
        }
      }
    ]
  }
}
```