將資料導入 Amazon OpenSearch 無伺服器集合 - Amazon OpenSearch Service

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

將資料導入 Amazon OpenSearch 無伺服器集合

這些章節提供有關支援的擷取管道以將資料擷取至 Amazon OpenSearch 無伺服器集合的詳細資訊。它們還涵蓋了一些您可以用來與 OpenSearch API操作進行交互的客戶端。您的用戶端應與 OpenSearch 2.x 相容,才能與 OpenSearch 無伺服器整合。

所需的最低許可

若要將資料內嵌至 OpenSearch 無伺服器集合,寫入資料的主體必須具有在資料取原則中指派下列最低權限:

[ { "Rules":[ { "ResourceType":"index", "Resource":[ "index/target-collection/logs" ], "Permission":[ "aoss:CreateIndex", "aoss:WriteDocument", "aoss:UpdateIndex" ] } ], "Principal":[ "arn:aws:iam::123456789012:user/my-user" ] } ]

如果您計劃寫入到其他索引,許可的範圍可能會更廣泛。例如,您可以允許所有索引(index/)的權限,而不是指定單個目標索引target-collection/*),或索引的子集(索引/target-collection/logs*).

如需所有可用 OpenSearch API作業及其相關權限的參考,請參閱Amazon OpenSearch Serverless 中支援的操作和外掛程式

OpenSearch 攝入

您可以使用 Amazon OpenSearch 擷取,而不是使用第三方用戶端將資料直接傳送到 OpenSearch 無伺服器集合。您可以將資料生產者設定為將資料傳送至 OpenSearch 擷取,它會自動將資料傳送至您指定的集合。您也可以將 OpenSearch 擷取設定為在傳送資料之前轉換資料。如需詳細資訊,請參閱Amazon OpenSearch 擷取

OpenSearch 擷取管線需要權限,才能寫入設定為其接收器的 OpenSearch 無伺服器集合。這些權限包括描述集合並向其傳送HTTP要求的功能。如需使用 OpenSearch 擷取將資料新增至集合的指示,請參閱授與 Amazon OpenSearch 擷取管道對集合的存取權

若要開始使用 OpenSearch 擷取,請參閱教學課程:使用 Amazon OpenSearch 擷取將資料擷取到集合

Fluent Bit

您可以使用 AWS 用於 Fluent Bit 圖像OpenSearch 輸出插件,以將數據導入 OpenSearch 無服務器集合。

注意

您必須擁有版本 2.30.0 或更新的版本 AWS 用於流利位元影像,以便與 OpenSearch 無伺服器整合。

範例組態

組態檔的這個範例輸出區段顯示如何使用 OpenSearch 無伺服器集合作為目的地。重要的補充是 AWS_Service_Name 參數,也就是 aossHost 是集合端點。

[OUTPUT] Name opensearch Match * Host collection-endpoint.us-west-2.aoss.amazonaws.com Port 443 Index my_index Trace_Error On Trace_Output On AWS_Auth On AWS_Region <region> AWS_Service_Name aoss tls On Suppress_Type_Name On

Amazon 數據 Firehose

Firehose 支援 OpenSearch 無伺服器作為傳送目的地。如需將資料傳送至 OpenSearch 無伺服器的指示,請參閱 Amazon Data Firehose 開發人員指南中的建立 Kinesis Data Firehose 交付串流為您的目的地選擇 OpenSearch 無伺服器

您提供給 Firehose 要傳送的IAM角色必須在具有目標集合的aoss:WriteDocument最低權限的資料存取原則中指定,而且您必須擁有可將資料傳送至其中的已存在索引。如需詳細資訊,請參閱所需的最低許可

在將資料傳送至 OpenSearch 無伺服器之前,您可能需要對資料執行轉換。如需進一步了解如何使用 Lambda 函數來執行此任務,請參閱相同指南中的 Amazon Kinesis Data Firehose 資料轉換

Go

下列範例程式碼會使用 Go 的 opensearch-go 用戶端,建立與指定的 OpenSearch 無伺服器集合的安全連線,並建立單一索引。您必須提供 regionhost 的值。

package main import ( "context" "log" "strings" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/config" opensearch "github.com/opensearch-project/opensearch-go/v2" opensearchapi "github.com/opensearch-project/opensearch-go/v2/opensearchapi" requestsigner "github.com/opensearch-project/opensearch-go/v2/signer/awsv2" ) const endpoint = "" // serverless collection endpoint func main() { ctx := context.Background() awsCfg, err := config.LoadDefaultConfig(ctx, config.WithRegion("<AWS_REGION>"), config.WithCredentialsProvider( getCredentialProvider("<AWS_ACCESS_KEY>", "<AWS_SECRET_ACCESS_KEY>", "<AWS_SESSION_TOKEN>"), ), ) if err != nil { log.Fatal(err) // don't log.fatal in a production-ready app } // create an AWS request Signer and load AWS configuration using default config folder or env vars. signer, err := requestsigner.NewSignerWithService(awsCfg, "aoss") // "aoss" for Amazon OpenSearch Serverless if err != nil { log.Fatal(err) // don't log.fatal in a production-ready app } // create an opensearch client and use the request-signer client, err := opensearch.NewClient(opensearch.Config{ Addresses: []string{endpoint}, Signer: signer, }) if err != nil { log.Fatal("client creation err", err) } indexName := "go-test-index" // define index mapping mapping := strings.NewReader(`{ "settings": { "index": { "number_of_shards": 4 } } }`) // create an index createIndex := opensearchapi.IndicesCreateRequest{ Index: indexName, Body: mapping, } createIndexResponse, err := createIndex.Do(context.Background(), client) if err != nil { log.Println("Error ", err.Error()) log.Println("failed to create index ", err) log.Fatal("create response body read err", err) } log.Println(createIndexResponse) // delete the index deleteIndex := opensearchapi.IndicesDeleteRequest{ Index: []string{indexName}, } deleteIndexResponse, err := deleteIndex.Do(context.Background(), client) if err != nil { log.Println("failed to delete index ", err) log.Fatal("delete index response body read err", err) } log.Println("deleting index", deleteIndexResponse) } func getCredentialProvider(accessKey, secretAccessKey, token string) aws.CredentialsProviderFunc { return func(ctx context.Context) (aws.Credentials, error) { c := &aws.Credentials{ AccessKeyID: accessKey, SecretAccessKey: secretAccessKey, SessionToken: token, } return *c, nil } }

Java

下列範例程式碼會使用 Java 的 opensearch-java 用戶端,建立與指定的 OpenSearch 無伺服器集合的安全連線,並建立單一索引。您必須提供 regionhost 的值。

與 OpenSearch 服務相比,重要的區別在於服務名稱(aoss而不是es)。

// import OpenSearchClient to establish connection to OpenSearch Serverless collection import org.opensearch.client.opensearch.OpenSearchClient; SdkHttpClient httpClient = ApacheHttpClient.builder().build(); // create an opensearch client and use the request-signer OpenSearchClient client = new OpenSearchClient( new AwsSdk2Transport( httpClient, "...us-west-2.aoss.amazonaws.com", // serverless collection endpoint "aoss" // signing service name Region.US_WEST_2, // signing service region AwsSdk2TransportOptions.builder().build() ) ); String index = "sample-index"; // create an index CreateIndexRequest createIndexRequest = new CreateIndexRequest.Builder().index(index).build(); CreateIndexResponse createIndexResponse = client.indices().create(createIndexRequest); System.out.println("Create index reponse: " + createIndexResponse); // delete the index DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest.Builder().index(index).build(); DeleteIndexResponse deleteIndexResponse = client.indices().delete(deleteIndexRequest); System.out.println("Delete index reponse: " + deleteIndexResponse); httpClient.close();

下列範例程式碼會再次建立安全連線,然後搜尋索引。

import org.opensearch.client.opensearch.OpenSearchClient; >>>>>>> aoss-slr-update SdkHttpClient httpClient = ApacheHttpClient.builder().build(); OpenSearchClient client = new OpenSearchClient( new AwsSdk2Transport( httpClient, "...us-west-2.aoss.amazonaws.com", // serverless collection endpoint "aoss" // signing service name Region.US_WEST_2, // signing service region AwsSdk2TransportOptions.builder().build() ) ); Response response = client.generic() .execute( Requests.builder() .endpoint("/" + "users" + "/_search?typed_keys=true") .method("GET") .json("{" + " \"query\": {" + " \"match_all\": {}" + " }" + "}") .build()); httpClient.close();

JavaScript

下列範例程式碼會使用 opensearch-js 用戶端 JavaScript 來建立與指定的 OpenSearch 無伺服器集合的安全連線、建立單一索引、新增文件,以及刪除索引。您必須提供 noderegion 的值。

與 OpenSearch 服務相比,重要的區別在於服務名稱(aoss而不是es)。

Version 3

這個範例 JavaScript 在 Node.js 中使用的版本 3。SDK

const { defaultProvider } = require('@aws-sdk/credential-provider-node'); const { Client } = require('@opensearch-project/opensearch'); const { AwsSigv4Signer } = require('@opensearch-project/opensearch/aws'); async function main() { // create an opensearch client and use the request-signer const client = new Client({ ...AwsSigv4Signer({ region: 'us-west-2', service: 'aoss', getCredentials: () => { const credentialsProvider = defaultProvider(); return credentialsProvider(); }, }), node: '' # // serverless collection endpoint }); const index = 'movies'; // create index if it doesn't already exist if (!(await client.indices.exists({ index })).body) { console.log((await client.indices.create({ index })).body); } // add a document to the index const document = { foo: 'bar' }; const response = await client.index({ id: '1', index: index, body: document, }); console.log(response.body); // delete the index console.log((await client.indices.delete({ index })).body); } main();
Version 2

這個範例 JavaScript 在 Node.js 中使用的版本 2。SDK

const AWS = require('aws-sdk'); const { Client } = require('@opensearch-project/opensearch'); const { AwsSigv4Signer } = require('@opensearch-project/opensearch/aws'); async function main() { // create an opensearch client and use the request-signer const client = new Client({ ...AwsSigv4Signer({ region: 'us-west-2', service: 'aoss', getCredentials: () => new Promise((resolve, reject) => { AWS.config.getCredentials((err, credentials) => { if (err) { reject(err); } else { resolve(credentials); } }); }), }), node: '' # // serverless collection endpoint }); const index = 'movies'; // create index if it doesn't already exist if (!(await client.indices.exists({ index })).body) { console.log((await client.indices.create({ index })).body); } // add a document to the index const document = { foo: 'bar' }; const response = await client.index({ id: '1', index: index, body: document, }); console.log(response.body); // delete the index console.log((await client.indices.delete({ index })).body); } main();

Logstash

您可以使用 Logstash OpenSearch 外掛程式將記錄發佈至 OpenSearch 無伺服器集合。

若要使用 Logstash 將資料傳送至無伺服器 OpenSearch
  1. 使用泊塢視窗或 Linux 安裝logstash-output-opensearch外掛程式 2.0.0 版或更新版本。

    Docker

    碼頭窗承載預先安裝輸出插件的 Logstash OSS 軟件:開放搜索項目/- OpenSearch 輸出插件。logstash-oss-with-opensearch您可以像任何其他映像一樣提取映像:

    docker pull opensearchproject/logstash-oss-with-opensearch-output-plugin:latest
    Linux

    首先,如果您尚未安裝最新版本的 Logstash,請先安裝。然後,安裝輸出外掛程式的 2.0.0 版本:

    cd logstash-8.5.0/ bin/logstash-plugin install --version 2.0.0 logstash-output-opensearch

    如果外掛程式已安裝,請將其更新至最新版本:

    bin/logstash-plugin update logstash-output-opensearch

    從外掛程式的 2.0.0 版開始, AWS SDK使用版本 3。如果您使用的是 8.4.0 之前的 Logstash 版本,則必須移除任何預先安裝的版本 AWS 插件和安裝logstash-integration-aws插件:

    /usr/share/logstash/bin/logstash-plugin remove logstash-input-s3 /usr/share/logstash/bin/logstash-plugin remove logstash-input-sqs /usr/share/logstash/bin/logstash-plugin remove logstash-output-s3 /usr/share/logstash/bin/logstash-plugin remove logstash-output-sns /usr/share/logstash/bin/logstash-plugin remove logstash-output-sqs /usr/share/logstash/bin/logstash-plugin remove logstash-output-cloudwatch /usr/share/logstash/bin/logstash-plugin install --version 0.1.0.pre logstash-integration-aws
  2. 若要讓 OpenSearch 輸出外掛程式與 OpenSearch 無伺服器搭配使用,您必須對 logstash.conf 的opensearch輸出區段進行下列修改:

    • aoss 指定為 auth_type 下的 service_name

    • 針對 hosts 指定您的集合端點。

    • 新增參數 default_server_major_versionlegacy_template。外掛程式需要這些參數才能與 OpenSearch 無伺服器搭配使用。

    output { opensearch { hosts => "collection-endpoint:443" auth_type => { ... service_name => 'aoss' } default_server_major_version => 2 legacy_template => false } }

    此範例組態檔案會從 S3 儲存貯體中的檔案取得輸入,並將其傳送至 OpenSearch 無伺服器集合:

    input { s3 { bucket => "my-s3-bucket" region => "us-east-1" } } output { opensearch { ecs_compatibility => disabled hosts => "https://my-collection-endpoint.us-east-1.aoss.amazonaws.com:443" index => my-index auth_type => { type => 'aws_iam' aws_access_key_id => 'your-access-key' aws_secret_access_key => 'your-secret-key' region => 'us-east-1' service_name => 'aoss' } default_server_major_version => 2 legacy_template => false } }
  3. 然後,使用新的組態執行 Logstash 來測試外掛程式:

    bin/logstash -f config/test-plugin.conf

Python

下列範例程式碼會使用 Python 的 opensearch-py 用戶端,建立與指定的 OpenSearch 無伺服器集合的安全連線、建立單一索引,然後搜尋該索引。您必須提供 regionhost 的值。

與 OpenSearch 服務相比,重要的區別在於服務名稱(aoss而不是es)。

from opensearchpy import OpenSearch, RequestsHttpConnection, AWSV4SignerAuth import boto3 host = '' # serverless collection endpoint, without https:// region = '' # e.g. us-east-1 service = 'aoss' credentials = boto3.Session().get_credentials() auth = AWSV4SignerAuth(credentials, region, service) # create an opensearch client and use the request-signer client = OpenSearch( hosts=[{'host': host, 'port': 443}], http_auth=auth, use_ssl=True, verify_certs=True, connection_class=RequestsHttpConnection, pool_maxsize=20, ) # create an index index_name = 'books-index' create_response = client.indices.create( index_name ) print('\nCreating index:') print(create_response) # index a document document = { 'title': 'The Green Mile', 'director': 'Stephen King', 'year': '1996' } response = client.index( index = 'books-index', body = document, id = '1' ) # delete the index delete_response = client.indices.delete( index_name ) print('\nDeleting index:') print(delete_response)

Ruby

opensearch-aws-sigv4gem 提供對 OpenSearch 無伺服器的存取,以及開箱即用的 OpenSearch 服務。它具有 opensearch-ruby 客戶端的所有功能,因為這是此 Gem 套件的相依項目。

執行個體化 Sigv4 簽署者時,請指定 aoss 為服務名稱:

require 'opensearch-aws-sigv4' require 'aws-sigv4' signer = Aws::Sigv4::Signer.new(service: 'aoss', region: 'us-west-2', access_key_id: 'key_id', secret_access_key: 'secret') # create an opensearch client and use the request-signer client = OpenSearch::Aws::Sigv4Client.new( { host: 'https://your.amz-opensearch-serverless.endpoint', log: true }, signer) # create an index index = 'prime' client.indices.create(index: index) # insert data client.index(index: index, id: '1', body: { name: 'Amazon Echo', msrp: '5999', year: 2011 }) # query the index client.search(body: { query: { match: { name: 'Echo' } } }) # delete index entry client.delete(index: index, id: '1') # delete the index client.indices.delete(index: index)

與其他用戶端簽署HTTP要求

當您向其他用戶端建構要求時,將要求簽署至 OpenSearch 無伺服器集合時,適用下列要HTTP求。

  • 您必須將服務名稱指定為 aoss

  • x-amz-content-sha256頭是必需的所有 AWS 簽名版本 4 請求。其提供請求承載的雜湊。如果有請求有效負載,請將該值設置為其安全哈希算法(SHA)加密哈希(SHA256)。如果沒有請求承載,請將值設定為 e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855,這是一個空字串的雜湊。

用 c 索引 URL

下列範例要求會使用用戶端URL要求程式庫 (cURL),將單一文件傳送至集合movies-index中指定的索引:

curl -XPOST \ --user "$AWS_ACCESS_KEY_ID":"$AWS_SECRET_ACCESS_KEY" \ --aws-sigv4 "aws:amz:us-east-1:aoss" \ --header "x-amz-content-sha256: $REQUEST_PAYLOAD_SHA_HASH" \ --header "x-amz-security-token: $AWS_SESSION_TOKEN" \ "https://my-collection-endpoint.us-east-1.aoss.amazonaws.com/movies-index/_doc" \ -H "Content-Type: application/json" -d '{"title": "Shawshank Redemption"}'

與郵遞員索引

下圖顯示了如何使用郵遞員將請求發送到集合。有關進行身份驗證的說明,請參閱驗證 AWS 郵差中的簽名驗證工作流程

JSON response showing creation of a "movies-index" with successful result and no shards.