

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 將資料擷取至 Amazon OpenSearch Serverless 集合
<a name="serverless-clients"></a>

這些章節提供有關支援的擷取管道，以將資料擷取至 Amazon OpenSearch Serverless 集合的詳細資訊。同時還涵蓋了一些您可以用於與 OpenSearch API 操作互動的用戶端。您的用戶端應與 OpenSearch 2.x 相容，以便與 OpenSearch Serverless 整合。

**Topics**
+ [所需的最低許可](#serverless-ingestion-permissions)
+ [OpenSearch 擷取](#serverless-osis-ingestion)
+ [Fluent Bit](#serverless-fluentbit)
+ [Amazon Data Firehose](#serverless-kdf)
+ [Go](#serverless-go)
+ [Java](#serverless-java)
+ [JavaScript](#serverless-javascript)
+ [Logstash](#serverless-logstash)
+ [Python](#serverless-python)
+ [Ruby](#serverless-ruby)
+ [使用其他用戶端簽署 HTTP 請求](#serverless-signing)

## 所需的最低許可
<a name="serverless-ingestion-permissions"></a>

若要將資料擷取至 OpenSearch Serverless 集合，寫入資料的主體必須在[資料存取政策](serverless-data-access.md)中指派下列最低許可：

```
[
   {
      "Rules":[
         {
            "ResourceType":"index",
            "Resource":[
               "index/target-collection/logs"
            ],
            "Permission":[
               "aoss:CreateIndex",
               "aoss:WriteDocument",
               "aoss:UpdateIndex"
            ]
         }
      ],
      "Principal":[
         "arn:aws:iam::123456789012:user/my-user"
      ]
   }
]
```

如果您計劃寫入到其他索引，許可的範圍可能會更廣泛。例如，您可以允許對所有索引 (index/*target-collection*/\$1) 或索引子集合 (index/*target-collection*/*logs\$1*) 的許可，而不是指定單一目標索引。

如需所有可用 OpenSearch API 操作及其相關許可的參考資料，請參閱 [Amazon OpenSearch Serverless 中支援的操作和外掛程式](serverless-genref.md)。

## OpenSearch 擷取
<a name="serverless-osis-ingestion"></a>

您可以使用 Amazon OpenSearch Ingestion，而不是使用第三方用戶端直接將資料傳送至 OpenSearch Serverless 集合。 OpenSearch 您可以將資料生產者設定為將資料傳送至 OpenSearch Ingestion，並自動將資料交付至您指定的集合。您也可以設定 OpenSearch Ingestion，在交付資料之前轉換資料。如需詳細資訊，請參閱[Amazon OpenSearch 擷取概觀](ingestion.md)。

OpenSearch Ingestion 管道需要許可，才能寫入設定為接收器的 OpenSearch Serverless 集合。這些許可包括描述集合並向其傳送 HTTP 請求的能力。如需使用 OpenSearch Ingestion 將資料新增至集合的指示，請參閱 [授予 Amazon OpenSearch Ingestion 管道對集合的存取權](pipeline-collection-access.md)。

若要開始使用 OpenSearch Ingestion，請參閱 [教學課程：使用 Amazon OpenSearch Ingestion 將資料擷取至集合](osis-serverless-get-started.md)。

## Fluent Bit
<a name="serverless-fluentbit"></a>

您可以使用 [AWS for Fluent Bit 映像](https://github.com/aws/aws-for-fluent-bit#public-images)和 [OpenSearch 輸出外掛程式](https://docs.fluentbit.io/manual/pipeline/outputs/opensearch)，將資料擷取至 OpenSearch Serverless 集合。

**注意**  
您必須擁有 AWS 適用於 Fluent Bit 映像的 2.30.0 版或更新版本，才能與 OpenSearch Serverless 整合。

**範例組態**：

這個組態檔案的範例輸出區段顯示如何使用 OpenSearch Serverless 集合作為目的地。重要的補充是 `AWS_Service_Name` 參數，也就是 `aoss`。`Host` 是集合端點。

```
[OUTPUT]
    Name  opensearch
    Match *
    Host  collection-endpoint.us-west-2.aoss.amazonaws.com
    Port  443
    Index  my_index
    Trace_Error On
    Trace_Output On
    AWS_Auth On
    AWS_Region <region>
    AWS_Service_Name aoss
    tls     On
    Suppress_Type_Name On
```

## Amazon Data Firehose
<a name="serverless-kdf"></a>

Firehose 支援 OpenSearch Serverless 作為交付目的地。如需將資料傳送至 OpenSearch Serverless 的指示，請參閱《*Amazon* [Data Firehose 開發人員指南》中的建立 Kinesis Data Firehose 交付串流](https://docs.aws.amazon.com/firehose/latest/dev/basic-create.html)和[為您的目的地選擇 OpenSearch Serverless](https://docs.aws.amazon.com/firehose/latest/dev/create-destination.html#create-destination-opensearch-serverless)。

您提供給 Firehose 交付的 IAM 角色必須在具有目標集合`aoss:WriteDocument`最低許可的資料存取政策中指定，而且您必須具有預先存在的索引才能傳送資料。如需詳細資訊，請參閱[所需的最低許可](#serverless-ingestion-permissions)。

將資料傳送至 OpenSearch Serverless 之前，您可能需要執行資料轉換。如需進一步了解如何使用 Lambda 函數來執行此任務，請參閱相同指南中的 [Amazon Kinesis Data Firehose 資料轉換](https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html)。

## Go
<a name="serverless-go"></a>

下列範本程式碼會使用 Go 的 [opensearch-go](https://github.com/opensearch-project/opensearch-go) 用戶端，建立與指定 OpenSearch Serverless 集合的安全連線，並建立單一索引。您必須提供 `region` 和 `host` 的值。

```
package main

import (
  "context"
  "log"
  "strings"
  "github.com/aws/aws-sdk-go-v2/aws"
  "github.com/aws/aws-sdk-go-v2/config"
  opensearch "github.com/opensearch-project/opensearch-go/v2"
  opensearchapi "github.com/opensearch-project/opensearch-go/v2/opensearchapi"
  requestsigner "github.com/opensearch-project/opensearch-go/v2/signer/awsv2"
)

const endpoint = "" // serverless collection endpoint

func main() {
	ctx := context.Background()

	awsCfg, err := config.LoadDefaultConfig(ctx,
		config.WithRegion("<AWS_REGION>"),
		config.WithCredentialsProvider(
			getCredentialProvider("<AWS_ACCESS_KEY>", "<AWS_SECRET_ACCESS_KEY>", "<AWS_SESSION_TOKEN>"),
		),
	)
	if err != nil {
		log.Fatal(err) // don't log.fatal in a production-ready app
	}

	// create an AWS request Signer and load AWS configuration using default config folder or env vars.
	signer, err := requestsigner.NewSignerWithService(awsCfg, "aoss") // "aoss" for Amazon OpenSearch Serverless
	if err != nil {
		log.Fatal(err) // don't log.fatal in a production-ready app
	}

	// create an opensearch client and use the request-signer
	client, err := opensearch.NewClient(opensearch.Config{
		Addresses: []string{endpoint},
		Signer:    signer,
	})
	if err != nil {
		log.Fatal("client creation err", err)
	}

	indexName := "go-test-index"

  // define index mapping
	mapping := strings.NewReader(`{
	 "settings": {
	   "index": {
	        "number_of_shards": 4
	        }
	      }
	 }`)

	// create an index
	createIndex := opensearchapi.IndicesCreateRequest{
		Index: indexName,
    Body: mapping,
	}
	createIndexResponse, err := createIndex.Do(context.Background(), client)
	if err != nil {
		log.Println("Error ", err.Error())
		log.Println("failed to create index ", err)
		log.Fatal("create response body read err", err)
	}
	log.Println(createIndexResponse)

	// delete the index
	deleteIndex := opensearchapi.IndicesDeleteRequest{
		Index: []string{indexName},
	}

	deleteIndexResponse, err := deleteIndex.Do(context.Background(), client)
	if err != nil {
		log.Println("failed to delete index ", err)
		log.Fatal("delete index response body read err", err)
	}
	log.Println("deleting index", deleteIndexResponse)
}

func getCredentialProvider(accessKey, secretAccessKey, token string) aws.CredentialsProviderFunc {
	return func(ctx context.Context) (aws.Credentials, error) {
		c := &aws.Credentials{
			AccessKeyID:     accessKey,
			SecretAccessKey: secretAccessKey,
			SessionToken:    token,
		}
		return *c, nil
	}
}
```

## Java
<a name="serverless-java"></a>

下列範本程式碼會使用 Java 的 [opensearch-java](https://search.maven.org/artifact/org.opensearch.client/opensearch-java) 用戶端，建立與指定 OpenSearch Serverless 集合的安全連線，並建立單一索引。您必須提供 `region` 和 `host` 的值。

與 OpenSearch Service *網域*相較，重要的區別在於服務名稱 (是 `aoss` 而不是 `es`)。

```
// import OpenSearchClient to establish connection to OpenSearch Serverless collection
import org.opensearch.client.opensearch.OpenSearchClient;

SdkHttpClient httpClient = ApacheHttpClient.builder().build();
// create an opensearch client and use the request-signer
OpenSearchClient client = new OpenSearchClient(
    new AwsSdk2Transport(
        httpClient,
        "...us-west-2.aoss.amazonaws.com", // serverless collection endpoint
        "aoss" // signing service name
        Region.US_WEST_2, // signing service region
        AwsSdk2TransportOptions.builder().build()
    )
);

String index = "sample-index";

// create an index
CreateIndexRequest createIndexRequest = new CreateIndexRequest.Builder().index(index).build();
CreateIndexResponse createIndexResponse = client.indices().create(createIndexRequest);
System.out.println("Create index reponse: " + createIndexResponse);

// delete the index
DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest.Builder().index(index).build();
DeleteIndexResponse deleteIndexResponse = client.indices().delete(deleteIndexRequest);
System.out.println("Delete index reponse: " + deleteIndexResponse);

httpClient.close();
```

下列範例程式碼會再次建立安全連線，然後搜尋索引。

```
import org.opensearch.client.opensearch.OpenSearchClient;

SdkHttpClient httpClient = ApacheHttpClient.builder().build();

OpenSearchClient client = new OpenSearchClient(
    new AwsSdk2Transport(
        httpClient,
        "...us-west-2.aoss.amazonaws.com", // serverless collection endpoint
        "aoss" // signing service name
        Region.US_WEST_2, // signing service region
        AwsSdk2TransportOptions.builder().build()
    )
);

Response response = client.generic()
    .execute(
        Requests.builder()
            .endpoint("/" + "users" + "/_search?typed_keys=true")
            .method("GET")
            .json("{"
                + "    \"query\": {"
                + "        \"match_all\": {}"
                + "    }"
                + "}")
            .build());

httpClient.close();
```

## JavaScript
<a name="serverless-javascript"></a>

下列範本程式碼會使用 JavaScript 的 [opensearch-js](https://www.npmjs.com/package/@opensearch-project/opensearch) 用戶端，建立與指定 OpenSearch Serverless 集合的安全連線，建立單一索引，新增文件，以及刪除索引。您必須提供 `node` 和 `region` 的值。

與 OpenSearch Service *網域*相較，重要的區別在於服務名稱 (是 `aoss` 而不是 `es`)。

------
#### [ Version 3 ]

此範例使用適用於 JavaScript in Node.js 的 SDK 的[版本 3](https://docs.aws.amazon.com/AWSJavaScriptSDK/v3/latest/)。

```
const { defaultProvider } = require('@aws-sdk/credential-provider-node');
const { Client } = require('@opensearch-project/opensearch');
const { AwsSigv4Signer } = require('@opensearch-project/opensearch/aws');

async function main() {
    // create an opensearch client and use the request-signer
    const client = new Client({
        ...AwsSigv4Signer({
            region: 'us-west-2',
            service: 'aoss',
            getCredentials: () => {
                const credentialsProvider = defaultProvider();
                return credentialsProvider();
            },
        }),
        node: '' # // serverless collection endpoint
    });

    const index = 'movies';

    // create index if it doesn't already exist
    if (!(await client.indices.exists({ index })).body) {
        console.log((await client.indices.create({ index })).body);
    }

    // add a document to the index
    const document = { foo: 'bar' };
    const response = await client.index({
        id: '1',
        index: index,
        body: document,
    });
    console.log(response.body);

    // delete the index
    console.log((await client.indices.delete({ index })).body);
}

main();
```

------
#### [ Version 2 ]

此範例使用適用於 JavaScript in Node.js 的 SDK 的[版本 2](https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/)。

```
const AWS = require('aws-sdk');
const { Client } = require('@opensearch-project/opensearch');
const { AwsSigv4Signer } = require('@opensearch-project/opensearch/aws');

async function main() {
    // create an opensearch client and use the request-signer
    const client = new Client({
        ...AwsSigv4Signer({
            region: 'us-west-2',
            service: 'aoss',
            getCredentials: () =>
                new Promise((resolve, reject) => {
                    AWS.config.getCredentials((err, credentials) => {
                        if (err) {
                            reject(err);
                        } else {
                            resolve(credentials);
                        }
                    });
                }),
        }),
        node: '' # // serverless collection endpoint
    });

    const index = 'movies';

    // create index if it doesn't already exist
    if (!(await client.indices.exists({ index })).body) {
        console.log((await client.indices.create({
            index
        })).body);
    }

    // add a document to the index
    const document = {
        foo: 'bar'
    };
    const response = await client.index({
        id: '1',
        index: index,
        body: document,
    });
    console.log(response.body);

    // delete the index
    console.log((await client.indices.delete({ index })).body);
}

main();
```

------

## Logstash
<a name="serverless-logstash"></a>

您可以使用 [Logstash OpenSearch 外掛程式](https://github.com/opensearch-project/logstash-output-opensearch)，將日誌發佈至 OpenSearch Serverless 集合。

**使用 Logstash 將資料傳送至 OpenSearch Serverless**

1. 使用 Docker *或 Linux 安裝 logstash-output-opensearch 外掛程式的 2.0.0 版或更新版本*。 [logstash-output-opensearch](https://github.com/opensearch-project/logstash-output-opensearch) 

------
#### [ Docker ]

   Docker 託管預先安裝了 OpenSearch 輸出外掛程式的 Logstash OSS 軟體：[opensearchproject/logstash-oss-with-opensearch-output-plugin](https://hub.docker.com/r/opensearchproject/logstash-oss-with-opensearch-output-plugin/tags?page=1&ordering=last_updated&name=8.4.0)。您可以像任何其他映像一樣提取映像：

   ```
   docker pull opensearchproject/logstash-oss-with-opensearch-output-plugin:latest
   ```

------
#### [ Linux ]

   首先，如果您尚未[安裝最新版本的 Logstash](https://www.elastic.co/guide/en/logstash/current/installing-logstash.html)，請先安裝。然後，安裝輸出外掛程式的 2.0.0 版本：

   ```
   cd logstash-8.5.0/
   bin/logstash-plugin install --version 2.0.0 logstash-output-opensearch
   ```

   如果外掛程式已安裝，請將其更新至最新版本：

   ```
   bin/logstash-plugin update logstash-output-opensearch 
   ```

   從 2.0.0 版的外掛程式開始， AWS 軟體開發套件使用 第 3 版。如果您使用的是早於 8.4.0 的 Logstash 版本，則必須移除任何預先安裝的 AWS 外掛程式，並安裝`logstash-integration-aws`外掛程式：

   ```
   /usr/share/logstash/bin/logstash-plugin remove logstash-input-s3
   /usr/share/logstash/bin/logstash-plugin remove logstash-input-sqs
   /usr/share/logstash/bin/logstash-plugin remove logstash-output-s3
   /usr/share/logstash/bin/logstash-plugin remove logstash-output-sns
   /usr/share/logstash/bin/logstash-plugin remove logstash-output-sqs
   /usr/share/logstash/bin/logstash-plugin remove logstash-output-cloudwatch
   
   /usr/share/logstash/bin/logstash-plugin install --version 0.1.0.pre logstash-integration-aws
   ```

------

1. 若要讓 OpenSearch 輸出外掛程式與 OpenSearch Serverless 搭配使用，您必須對 logstash.conf 的 `opensearch` 輸出區段進行下列修改：
   + 將 `aoss` 指定為 `auth_type` 下的 `service_name`。
   + 針對 `hosts` 指定您的集合端點。
   + 新增參數 `default_server_major_version` 和 `legacy_template`。外掛程式需要這些參數才能與 OpenSearch Serverless 搭配使用。

   ```
   output {
     opensearch {
       hosts => "collection-endpoint:443"
       auth_type => {
         ...
         service_name => 'aoss'
       }
       default_server_major_version => 2
       legacy_template => false
     }
   }
   ```

   此組態檔案範例會從 S3 儲存貯體中的檔案取得輸入，並將其傳送至 OpenSearch Serverless 集合：

   ```
   input {
     s3  {
       bucket => "my-s3-bucket"
       region => "us-east-1"
     }
   }
   
   output {
     opensearch {
       ecs_compatibility => disabled
       hosts => "https://my-collection-endpoint.us-east-1.aoss.amazonaws.com:443"
       index => my-index
       auth_type => {
         type => 'aws_iam'
         aws_access_key_id => 'your-access-key'
         aws_secret_access_key => 'your-secret-key'
         region => 'us-east-1'
         service_name => 'aoss'
       }
       default_server_major_version => 2
       legacy_template => false
     }
   }
   ```

1. 然後，使用新的組態執行 Logstash 來測試外掛程式：

   ```
   bin/logstash -f config/test-plugin.conf
   ```

## Python
<a name="serverless-python"></a>

下列範本程式碼會使用 Python 的 [opensearch-py](https://pypi.org/project/opensearch-py/) 用戶端，建立與指定 OpenSearch Serverless 集合的安全連線，建立單一索引並搜尋該索引。您必須提供 `region` 和 `host` 的值。

與 OpenSearch Service *網域*相較，重要的區別在於服務名稱 (是 `aoss` 而不是 `es`)。

```
from opensearchpy import OpenSearch, RequestsHttpConnection, AWSV4SignerAuth
import boto3

host = ''  # serverless collection endpoint, without https://
region = ''  # e.g. us-east-1

service = 'aoss'
credentials = boto3.Session().get_credentials()
auth = AWSV4SignerAuth(credentials, region, service)

# create an opensearch client and use the request-signer
client = OpenSearch(
    hosts=[{'host': host, 'port': 443}],
    http_auth=auth,
    use_ssl=True,
    verify_certs=True,
    connection_class=RequestsHttpConnection,
    pool_maxsize=20,
)

# create an index
index_name = 'books-index'
create_response = client.indices.create(
    index_name
)

print('\nCreating index:')
print(create_response)

# index a document
document = {
  'title': 'The Green Mile',
  'director': 'Stephen King',
  'year': '1996'
}

response = client.index(
    index = 'books-index',
    body = document,
    id = '1'
)


# delete the index
delete_response = client.indices.delete(
    index_name
)

print('\nDeleting index:')
print(delete_response)
```

## Ruby
<a name="serverless-ruby"></a>

`opensearch-aws-sigv4` Gem 套件提供了立即使用的 OpenSearch Serverless 的存取，以及 OpenSearch 服務。它具有 [opensearch-ruby](https://rubygems.org/gems/opensearch-ruby) 客戶端的所有功能，因為這是此 Gem 套件的相依項目。

執行個體化 Sigv4 簽署者時，請指定 `aoss` 為服務名稱：

```
require 'opensearch-aws-sigv4'
require 'aws-sigv4'

signer = Aws::Sigv4::Signer.new(service: 'aoss',
                                region: 'us-west-2',
                                access_key_id: 'key_id',
                                secret_access_key: 'secret')

# create an opensearch client and use the request-signer
client = OpenSearch::Aws::Sigv4Client.new(
  { host: 'https://your.amz-opensearch-serverless.endpoint',
    log: true },
  signer)

# create an index
index = 'prime'
client.indices.create(index: index)

# insert data
client.index(index: index, id: '1', body: { name: 'Amazon Echo', 
                                            msrp: '5999', 
                                            year: 2011 })

# query the index
client.search(body: { query: { match: { name: 'Echo' } } })

# delete index entry
client.delete(index: index, id: '1')

# delete the index
client.indices.delete(index: index)
```

## 使用其他用戶端簽署 HTTP 請求
<a name="serverless-signing"></a>

當您使用其他用戶端建構 HTTP 請求，並[簽署](https://docs.aws.amazon.com/general/latest/gr/signature-version-4.html)對 OpenSearch Serverless 集合的請求時，下列要求適用。
+ 您必須將服務名稱指定為 `aoss`。
+ 所有 AWS Signature 版本 4 請求均需要 `x-amz-content-sha256` 標頭。其提供請求承載的雜湊。如果有請求承載，請將值設定為其安全雜湊演算法 (SHA) 加密雜湊 (SHA256)。如果沒有請求承載，請將值設定為 `e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855`，這是一個空字串的雜湊。

**Topics**
+ [使用 cURL 編製索引](#serverless-signing-curl)
+ [使用 Postman 編製索引](#serverless-signing-postman)

### 使用 cURL 編製索引
<a name="serverless-signing-curl"></a>

下列範例請求使用用戶端 URL 請求程式庫 (cURL)，將單一文件傳送至集合`movies-index`中名為 的索引：

```
curl -XPOST \
    --user "$AWS_ACCESS_KEY_ID":"$AWS_SECRET_ACCESS_KEY" \
    --aws-sigv4 "aws:amz:us-east-1:aoss" \
    --header "x-amz-content-sha256: $REQUEST_PAYLOAD_SHA_HASH" \
    --header "x-amz-security-token: $AWS_SESSION_TOKEN" \
    "https://my-collection-endpoint.us-east-1.aoss.amazonaws.com/movies-index/_doc" \
    -H "Content-Type: application/json" -d '{"title": "Shawshank Redemption"}'
```

### 使用 Postman 編製索引
<a name="serverless-signing-postman"></a>

下圖顯示如何使用 Postman 將請求傳送至集合。如需驗證的指示，請參閱 [Postman 中的使用 AWS 簽章驗證工作流程進行驗證](https://learning.postman.com/docs/sending-requests/authorization/aws-signature/)。

![\[JSON response showing creation of a "movies-index" with successful result and no shards.\]](http://docs.aws.amazon.com/zh_tw/opensearch-service/latest/developerguide/images/ServerlessPostman.png)
