本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
將資料導入 Amazon OpenSearch 無伺服器集合
這些章節提供有關支援的擷取管道以將資料擷取至 Amazon OpenSearch 無伺服器集合的詳細資訊。它們還涵蓋了一些您可以用來與 OpenSearch API操作進行交互的客戶端。您的用戶端應與 OpenSearch 2.x 相容,才能與 OpenSearch 無伺服器整合。
所需的最低許可
若要將資料內嵌至 OpenSearch 無伺服器集合,寫入資料的主體必須具有在資料存取原則中指派下列最低權限:
[ { "Rules":[ { "ResourceType":"index", "Resource":[ "index/
target-collection
/logs
" ], "Permission":[ "aoss:CreateIndex", "aoss:WriteDocument", "aoss:UpdateIndex" ] } ], "Principal":[ "arn:aws:iam::123456789012
:user/my-user
" ] } ]
如果您計劃寫入到其他索引,許可的範圍可能會更廣泛。例如,您可以允許所有索引(index/)的權限,而不是指定單個目標索引target-collection
/*),或索引的子集(索引/target-collection
/logs*
).
如需所有可用 OpenSearch API作業及其相關權限的參考,請參閱Amazon OpenSearch Serverless 中支援的操作和外掛程式。
OpenSearch 攝入
您可以使用 Amazon OpenSearch 擷取,而不是使用第三方用戶端將資料直接傳送到 OpenSearch 無伺服器集合。您可以將資料生產者設定為將資料傳送至 OpenSearch 擷取,它會自動將資料傳送至您指定的集合。您也可以將 OpenSearch 擷取設定為在傳送資料之前轉換資料。如需詳細資訊,請參閱Amazon OpenSearch 擷取。
OpenSearch 擷取管線需要權限,才能寫入設定為其接收器的 OpenSearch 無伺服器集合。這些權限包括描述集合並向其傳送HTTP要求的功能。如需使用 OpenSearch 擷取將資料新增至集合的指示,請參閱授與 Amazon OpenSearch 擷取管道對集合的存取權。
若要開始使用 OpenSearch 擷取,請參閱教學課程:使用 Amazon OpenSearch 擷取將資料擷取到集合。
Fluent Bit
您可以使用 AWS
用於 Fluent Bit 圖像
注意
您必須擁有版本 2.30.0 或更新的版本 AWS 用於流利位元影像,以便與 OpenSearch 無伺服器整合。
範例組態:
組態檔的這個範例輸出區段顯示如何使用 OpenSearch 無伺服器集合作為目的地。重要的補充是 AWS_Service_Name
參數,也就是 aoss
。Host
是集合端點。
[OUTPUT] Name opensearch Match * Host
collection-endpoint
.us-west-2
.aoss.amazonaws.com Port 443 Indexmy_index
Trace_Error On Trace_Output On AWS_Auth On AWS_Region<region>
AWS_Service_Name aoss tls On Suppress_Type_Name On
Amazon 數據 Firehose
Firehose 支援 OpenSearch 無伺服器作為傳送目的地。如需將資料傳送至 OpenSearch 無伺服器的指示,請參閱 Amazon Data Firehose 開發人員指南中的建立 Kinesis Data Firehose 交付串流和為您的目的地選擇 OpenSearch 無伺服器。
您提供給 Firehose 要傳送的IAM角色必須在具有目標集合的aoss:WriteDocument
最低權限的資料存取原則中指定,而且您必須擁有可將資料傳送至其中的已存在索引。如需詳細資訊,請參閱所需的最低許可。
在將資料傳送至 OpenSearch 無伺服器之前,您可能需要對資料執行轉換。如需進一步了解如何使用 Lambda 函數來執行此任務,請參閱相同指南中的 Amazon Kinesis Data Firehose 資料轉換。
Go
下列範例程式碼會使用 Go 的 opensearch-goregion
和 host
的值。
package main import ( "context" "log" "strings" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/config" opensearch "github.com/opensearch-project/opensearch-go/v2" opensearchapi "github.com/opensearch-project/opensearch-go/v2/opensearchapi" requestsigner "github.com/opensearch-project/opensearch-go/v2/signer/awsv2" ) const endpoint = "" // serverless collection endpoint func main() { ctx := context.Background() awsCfg, err := config.LoadDefaultConfig(ctx, config.WithRegion("<AWS_REGION>"), config.WithCredentialsProvider( getCredentialProvider("<AWS_ACCESS_KEY>", "<AWS_SECRET_ACCESS_KEY>", "<AWS_SESSION_TOKEN>"), ), ) if err != nil { log.Fatal(err) // don't log.fatal in a production-ready app } // create an AWS request Signer and load AWS configuration using default config folder or env vars. signer, err := requestsigner.NewSignerWithService(awsCfg, "aoss") // "aoss" for Amazon OpenSearch Serverless if err != nil { log.Fatal(err) // don't log.fatal in a production-ready app } // create an opensearch client and use the request-signer client, err := opensearch.NewClient(opensearch.Config{ Addresses: []string{endpoint}, Signer: signer, }) if err != nil { log.Fatal("client creation err", err) } indexName := "go-test-index" // define index mapping mapping := strings.NewReader(`{ "settings": { "index": { "number_of_shards": 4 } } }`) // create an index createIndex := opensearchapi.IndicesCreateRequest{ Index: indexName, Body: mapping, } createIndexResponse, err := createIndex.Do(context.Background(), client) if err != nil { log.Println("Error ", err.Error()) log.Println("failed to create index ", err) log.Fatal("create response body read err", err) } log.Println(createIndexResponse) // delete the index deleteIndex := opensearchapi.IndicesDeleteRequest{ Index: []string{indexName}, } deleteIndexResponse, err := deleteIndex.Do(context.Background(), client) if err != nil { log.Println("failed to delete index ", err) log.Fatal("delete index response body read err", err) } log.Println("deleting index", deleteIndexResponse) } func getCredentialProvider(accessKey, secretAccessKey, token string) aws.CredentialsProviderFunc { return func(ctx context.Context) (aws.Credentials, error) { c := &aws.Credentials{ AccessKeyID: accessKey, SecretAccessKey: secretAccessKey, SessionToken: token, } return *c, nil } }
Java
下列範例程式碼會使用 Java 的 opensearch-javaregion
和 host
的值。
與 OpenSearch 服務域相比,重要的區別在於服務名稱(aoss
而不是es
)。
// import OpenSearchClient to establish connection to OpenSearch Serverless collection import org.opensearch.client.opensearch.OpenSearchClient; SdkHttpClient httpClient = ApacheHttpClient.builder().build(); // create an opensearch client and use the request-signer OpenSearchClient client = new OpenSearchClient( new AwsSdk2Transport( httpClient, "...us-west-2.aoss.amazonaws.com", // serverless collection endpoint "aoss" // signing service name Region.US_WEST_2, // signing service region AwsSdk2TransportOptions.builder().build() ) ); String index = "sample-index"; // create an index CreateIndexRequest createIndexRequest = new CreateIndexRequest.Builder().index(index).build(); CreateIndexResponse createIndexResponse = client.indices().create(createIndexRequest); System.out.println("Create index reponse: " + createIndexResponse); // delete the index DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest.Builder().index(index).build(); DeleteIndexResponse deleteIndexResponse = client.indices().delete(deleteIndexRequest); System.out.println("Delete index reponse: " + deleteIndexResponse); httpClient.close();
下列範例程式碼會再次建立安全連線,然後搜尋索引。
import org.opensearch.client.opensearch.OpenSearchClient; >>>>>>> aoss-slr-update SdkHttpClient httpClient = ApacheHttpClient.builder().build(); OpenSearchClient client = new OpenSearchClient( new AwsSdk2Transport( httpClient, "...us-west-2.aoss.amazonaws.com", // serverless collection endpoint "aoss" // signing service name Region.US_WEST_2, // signing service region AwsSdk2TransportOptions.builder().build() ) ); Response response = client.generic() .execute( Requests.builder() .endpoint("/" + "users" + "/_search?typed_keys=true") .method("GET") .json("{" + " \"query\": {" + " \"match_all\": {}" + " }" + "}") .build()); httpClient.close();
JavaScript
下列範例程式碼會使用 opensearch-jsnode
和 region
的值。
與 OpenSearch 服務域相比,重要的區別在於服務名稱(aoss
而不是es
)。
Logstash
您可以使用 Logstash OpenSearch 外掛程式
若要使用 Logstash 將資料傳送至無伺服器 OpenSearch
-
使用泊塢視窗或 Linux 安裝logstash-output-opensearch
外掛程式 2.0.0 版或更新版本。 -
若要讓 OpenSearch 輸出外掛程式與 OpenSearch 無伺服器搭配使用,您必須對 logstash.conf 的
opensearch
輸出區段進行下列修改:-
將
aoss
指定為auth_type
下的service_name
。 -
針對
hosts
指定您的集合端點。 -
新增參數
default_server_major_version
和legacy_template
。外掛程式需要這些參數才能與 OpenSearch 無伺服器搭配使用。
output { opensearch { hosts => "
collection-endpoint
:443" auth_type => { ... service_name => 'aoss' } default_server_major_version => 2 legacy_template => false } }此範例組態檔案會從 S3 儲存貯體中的檔案取得輸入,並將其傳送至 OpenSearch 無伺服器集合:
input { s3 { bucket => "
my-s3-bucket
" region => "us-east-1
" } } output { opensearch { ecs_compatibility => disabled hosts => "https://my-collection-endpoint
.us-east-1
.aoss.amazonaws.com:443" index =>my-index
auth_type => { type => 'aws_iam' aws_access_key_id => 'your-access-key
' aws_secret_access_key => 'your-secret-key
' region => 'us-east-1
' service_name => 'aoss' } default_server_major_version => 2 legacy_template => false } } -
-
然後,使用新的組態執行 Logstash 來測試外掛程式:
bin/logstash -f config/
test-plugin
.conf
Python
下列範例程式碼會使用 Python 的 opensearch-pyregion
和 host
的值。
與 OpenSearch 服務域相比,重要的區別在於服務名稱(aoss
而不是es
)。
from opensearchpy import OpenSearch, RequestsHttpConnection, AWSV4SignerAuth import boto3 host = '' # serverless collection endpoint, without https:// region = '' # e.g. us-east-1 service = 'aoss' credentials = boto3.Session().get_credentials() auth = AWSV4SignerAuth(credentials, region, service) # create an opensearch client and use the request-signer client = OpenSearch( hosts=[{'host': host, 'port': 443}], http_auth=auth, use_ssl=True, verify_certs=True, connection_class=RequestsHttpConnection, pool_maxsize=20, ) # create an index index_name = 'books-index' create_response = client.indices.create( index_name ) print('\nCreating index:') print(create_response) # index a document document = { 'title': 'The Green Mile', 'director': 'Stephen King', 'year': '1996' } response = client.index( index = 'books-index', body = document, id = '1' ) # delete the index delete_response = client.indices.delete( index_name ) print('\nDeleting index:') print(delete_response)
Ruby
opensearch-aws-sigv4
gem 提供對 OpenSearch 無伺服器的存取,以及開箱即用的 OpenSearch 服務。它具有 opensearch-ruby
執行個體化 Sigv4 簽署者時,請指定 aoss
為服務名稱:
require 'opensearch-aws-sigv4' require 'aws-sigv4' signer = Aws::Sigv4::Signer.new(service: 'aoss', region: 'us-west-2', access_key_id: 'key_id', secret_access_key: 'secret') # create an opensearch client and use the request-signer client = OpenSearch::Aws::Sigv4Client.new( { host: 'https://your.amz-opensearch-serverless.endpoint', log: true }, signer) # create an index index = 'prime' client.indices.create(index: index) # insert data client.index(index: index, id: '1', body: { name: 'Amazon Echo', msrp: '5999', year: 2011 }) # query the index client.search(body: { query: { match: { name: 'Echo' } } }) # delete index entry client.delete(index: index, id: '1') # delete the index client.indices.delete(index: index)
與其他用戶端簽署HTTP要求
當您向其他用戶端建構要求時,將要求簽署至 OpenSearch 無伺服器集合時,適用下列要HTTP求。
-
您必須將服務名稱指定為
aoss
。 -
x-amz-content-sha256
頭是必需的所有 AWS 簽名版本 4 請求。其提供請求承載的雜湊。如果有請求有效負載,請將該值設置為其安全哈希算法(SHA)加密哈希(SHA256)。如果沒有請求承載,請將值設定為e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855
,這是一個空字串的雜湊。
用 c 索引 URL
下列範例要求會使用用戶端URL要求程式庫 (cURL),將單一文件傳送至集合movies-index
中指定的索引:
curl -XPOST \ --user "$AWS_ACCESS_KEY_ID":"$AWS_SECRET_ACCESS_KEY" \ --aws-sigv4 "aws:amz:
us-east-1
:aoss" \ --header "x-amz-content-sha256: $REQUEST_PAYLOAD_SHA_HASH" \ --header "x-amz-security-token: $AWS_SESSION_TOKEN" \ "https://my-collection-endpoint
.us-east-1
.aoss.amazonaws.com/movies-index
/_doc" \ -H "Content-Type: application/json" -d '{"title": "Shawshank Redemption"}'
與郵遞員索引
下圖顯示了如何使用郵遞員將請求發送到集合。有關進行身份驗證的說明,請參閱驗證 AWS 郵差中的簽名驗證工作流程