SDK for JavaScript (v3) を使用した Kinesis の例 - AWS SDKコードの例

Doc AWS SDK ExamplesWord リポジトリには、さらに多くの GitHub の例があります。 AWS SDK

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

SDK for JavaScript (v3) を使用した Kinesis の例

次のコード例は、Kinesis で AWS SDK for JavaScript (v3) を使用してアクションを実行し、一般的なシナリオを実装する方法を示しています。

アクションはより大きなプログラムからのコードの抜粋であり、コンテキスト内で実行する必要があります。アクションは個々のサービス機能を呼び出す方法を示していますが、コンテキスト内のアクションは、関連するシナリオで確認できます。

各例には、完全なソースコードへのリンクが含まれています。ここでは、コンテキストでコードを設定および実行する方法の手順を確認できます。

アクション

次の例は、PutRecords を使用する方法を説明しています。

SDK(v3) の JavaScript
注記

GitHub には他にもあります。用例一覧を検索し、AWS コード例リポジトリでの設定と実行の方法を確認してください。

import { PutRecordsCommand, KinesisClient } from "@aws-sdk/client-kinesis"; /** * Put multiple records into a Kinesis stream. * @param {{ streamArn: string }} config */ export const main = async ({ streamArn }) => { const client = new KinesisClient({}); try { await client.send( new PutRecordsCommand({ StreamARN: streamArn, Records: [ { Data: new Uint8Array(), /** * Determines which shard in the stream the data record is assigned to. * Partition keys are Unicode strings with a maximum length limit of 256 * characters for each key. Amazon Kinesis Data Streams uses the partition * key as input to a hash function that maps the partition key and * associated data to a specific shard. */ PartitionKey: "TEST_KEY", }, { Data: new Uint8Array(), PartitionKey: "TEST_KEY", }, ], }), ); } catch (caught) { if (caught instanceof Error) { // } else { throw caught; } } }; // Call function if run directly. import { fileURLToPath } from "node:url"; import { parseArgs } from "node:util"; if (process.argv[1] === fileURLToPath(import.meta.url)) { const options = { streamArn: { type: "string", description: "The ARN of the stream.", }, }; const { values } = parseArgs({ options }); main(values); }
  • API の詳細については、PutRecords AWS SDK for JavaScript リファレンスの API を参照してください。

サーバーレスサンプル

次のコード例では、Kinesis ストリームからレコードを受信することによってトリガーされるイベントを受け取る、Lambda 関数の実装方法を示しています。この関数は Kinesis ペイロードを取得し、それを Base64 からデコードして、そのレコードの内容をログ記録します。

SDK(v3) の JavaScript
注記

GitHub には他にもあります。サーバーレスサンプルリポジトリで完全な例を検索し、設定および実行の方法を確認してください。

JavaScript を使用して Lambda で Kinesis イベントを消費する。

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 exports.handler = async (event, context) => { for (const record of event.Records) { try { console.log(`Processed Kinesis Event - EventID: ${record.eventID}`); const recordData = await getRecordDataAsync(record.kinesis); console.log(`Record Data: ${recordData}`); // TODO: Do interesting work based on the new data } catch (err) { console.error(`An error occurred ${err}`); throw err; } } console.log(`Successfully processed ${event.Records.length} records.`); }; async function getRecordDataAsync(payload) { var data = Buffer.from(payload.data, "base64").toString("utf-8"); await Promise.resolve(1); //Placeholder for actual async work return data; }

TypeScript を使用して Lambda で Kinesis イベントを消費する。

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 import { KinesisStreamEvent, Context, KinesisStreamHandler, KinesisStreamRecordPayload, } from "aws-lambda"; import { Buffer } from "buffer"; import { Logger } from "@aws-lambda-powertools/logger"; const logger = new Logger({ logLevel: "INFO", serviceName: "kinesis-stream-handler-sample", }); export const functionHandler: KinesisStreamHandler = async ( event: KinesisStreamEvent, context: Context ): Promise<void> => { for (const record of event.Records) { try { logger.info(`Processed Kinesis Event - EventID: ${record.eventID}`); const recordData = await getRecordDataAsync(record.kinesis); logger.info(`Record Data: ${recordData}`); // TODO: Do interesting work based on the new data } catch (err) { logger.error(`An error occurred ${err}`); throw err; } logger.info(`Successfully processed ${event.Records.length} records.`); } }; async function getRecordDataAsync( payload: KinesisStreamRecordPayload ): Promise<string> { var data = Buffer.from(payload.data, "base64").toString("utf-8"); await Promise.resolve(1); //Placeholder for actual async work return data; }

以下のコード例では、Kinesis ストリームからイベントを受け取る Lambda 関数のための、部分的なバッチレスポンスの実装方法を示しています。この関数は、レスポンスとしてバッチアイテムの失敗を報告し、対象のメッセージを後で再試行するよう Lambda に伝えます。

SDK(v3) の JavaScript
注記

GitHub には他にもあります。サーバーレスサンプルリポジトリで完全な例を検索し、設定および実行の方法を確認してください。

Javascript を使用した Lambda での Kinesis バッチアイテム失敗のレポート。

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 exports.handler = async (event, context) => { for (const record of event.Records) { try { console.log(`Processed Kinesis Event - EventID: ${record.eventID}`); const recordData = await getRecordDataAsync(record.kinesis); console.log(`Record Data: ${recordData}`); // TODO: Do interesting work based on the new data } catch (err) { console.error(`An error occurred ${err}`); /* Since we are working with streams, we can return the failed item immediately. Lambda will immediately begin to retry processing from this failed item onwards. */ return { batchItemFailures: [{ itemIdentifier: record.kinesis.sequenceNumber }], }; } } console.log(`Successfully processed ${event.Records.length} records.`); return { batchItemFailures: [] }; }; async function getRecordDataAsync(payload) { var data = Buffer.from(payload.data, "base64").toString("utf-8"); await Promise.resolve(1); //Placeholder for actual async work return data; }

TypeScript を使用して Lambda で Kinesis バッチ項目の障害を報告する。

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 import { KinesisStreamEvent, Context, KinesisStreamHandler, KinesisStreamRecordPayload, KinesisStreamBatchResponse, } from "aws-lambda"; import { Buffer } from "buffer"; import { Logger } from "@aws-lambda-powertools/logger"; const logger = new Logger({ logLevel: "INFO", serviceName: "kinesis-stream-handler-sample", }); export const functionHandler: KinesisStreamHandler = async ( event: KinesisStreamEvent, context: Context ): Promise<KinesisStreamBatchResponse> => { for (const record of event.Records) { try { logger.info(`Processed Kinesis Event - EventID: ${record.eventID}`); const recordData = await getRecordDataAsync(record.kinesis); logger.info(`Record Data: ${recordData}`); // TODO: Do interesting work based on the new data } catch (err) { logger.error(`An error occurred ${err}`); /* Since we are working with streams, we can return the failed item immediately. Lambda will immediately begin to retry processing from this failed item onwards. */ return { batchItemFailures: [{ itemIdentifier: record.kinesis.sequenceNumber }], }; } } logger.info(`Successfully processed ${event.Records.length} records.`); return { batchItemFailures: [] }; }; async function getRecordDataAsync( payload: KinesisStreamRecordPayload ): Promise<string> { var data = Buffer.from(payload.data, "base64").toString("utf-8"); await Promise.resolve(1); //Placeholder for actual async work return data; }