使用 SDK for JavaScript (v3) 的 Kinesis 範例 - AWS SDK 程式碼範例

文件 AWS SDK AWS 範例 SDK 儲存庫中有更多可用的 GitHub 範例。

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

使用 SDK for JavaScript (v3) 的 Kinesis 範例

下列程式碼範例示範如何搭配 Kinesis 使用 AWS SDK for JavaScript (v3) 來執行動作和實作常見案例。

Actions 是大型程式的程式碼摘錄,必須在內容中執行。雖然動作會示範如何呼叫個別服務函數,但您可以在相關案例中查看內容中的動作。

每個範例都包含完整原始程式碼的連結,您可以在其中找到如何在內容中設定和執行程式碼的指示。

動作

下列程式碼範例示範如何使用 PutRecords

SDK for JavaScript (v3)
注意

還有更多 on GitHub。尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

import { PutRecordsCommand, KinesisClient } from "@aws-sdk/client-kinesis"; /** * Put multiple records into a Kinesis stream. * @param {{ streamArn: string }} config */ export const main = async ({ streamArn }) => { const client = new KinesisClient({}); try { await client.send( new PutRecordsCommand({ StreamARN: streamArn, Records: [ { Data: new Uint8Array(), /** * Determines which shard in the stream the data record is assigned to. * Partition keys are Unicode strings with a maximum length limit of 256 * characters for each key. Amazon Kinesis Data Streams uses the partition * key as input to a hash function that maps the partition key and * associated data to a specific shard. */ PartitionKey: "TEST_KEY", }, { Data: new Uint8Array(), PartitionKey: "TEST_KEY", }, ], }), ); } catch (caught) { if (caught instanceof Error) { // } else { throw caught; } } }; // Call function if run directly. import { fileURLToPath } from "node:url"; import { parseArgs } from "node:util"; if (process.argv[1] === fileURLToPath(import.meta.url)) { const options = { streamArn: { type: "string", description: "The ARN of the stream.", }, }; const { values } = parseArgs({ options }); main(values); }
  • 如需 API 詳細資訊,請參閱 PutRecords AWS SDK for JavaScript 參考中的 API

無伺服器範例

下列程式碼範例示範如何實作 Lambda 函數,該函數接收從 Kinesis 串流接收記錄所觸發的事件。此函數會擷取 Kinesis 承載、從 Base64 解碼,並記錄記錄內容。

SDK for JavaScript (v3)
注意

還有更多 on GitHub。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。

使用 Lambda using JavaScript 使用 Kinesis 事件。

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 exports.handler = async (event, context) => { for (const record of event.Records) { try { console.log(`Processed Kinesis Event - EventID: ${record.eventID}`); const recordData = await getRecordDataAsync(record.kinesis); console.log(`Record Data: ${recordData}`); // TODO: Do interesting work based on the new data } catch (err) { console.error(`An error occurred ${err}`); throw err; } } console.log(`Successfully processed ${event.Records.length} records.`); }; async function getRecordDataAsync(payload) { var data = Buffer.from(payload.data, "base64").toString("utf-8"); await Promise.resolve(1); //Placeholder for actual async work return data; }

使用 Lambda using TypeScript 使用 Kinesis 事件。

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 import { KinesisStreamEvent, Context, KinesisStreamHandler, KinesisStreamRecordPayload, } from "aws-lambda"; import { Buffer } from "buffer"; import { Logger } from "@aws-lambda-powertools/logger"; const logger = new Logger({ logLevel: "INFO", serviceName: "kinesis-stream-handler-sample", }); export const functionHandler: KinesisStreamHandler = async ( event: KinesisStreamEvent, context: Context ): Promise<void> => { for (const record of event.Records) { try { logger.info(`Processed Kinesis Event - EventID: ${record.eventID}`); const recordData = await getRecordDataAsync(record.kinesis); logger.info(`Record Data: ${recordData}`); // TODO: Do interesting work based on the new data } catch (err) { logger.error(`An error occurred ${err}`); throw err; } logger.info(`Successfully processed ${event.Records.length} records.`); } }; async function getRecordDataAsync( payload: KinesisStreamRecordPayload ): Promise<string> { var data = Buffer.from(payload.data, "base64").toString("utf-8"); await Promise.resolve(1); //Placeholder for actual async work return data; }

下列程式碼範例示範如何針對從 Kinesis 串流接收事件的 Lambda 函數實作部分批次回應。此函數會在回應中報告批次項目失敗,指示 Lambda 稍後重試這些訊息。

SDK for JavaScript (v3)
注意

還有更多 on GitHub。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。

使用 Javascript 搭配 Lambda 報告 Kinesis 批次項目失敗。

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 exports.handler = async (event, context) => { for (const record of event.Records) { try { console.log(`Processed Kinesis Event - EventID: ${record.eventID}`); const recordData = await getRecordDataAsync(record.kinesis); console.log(`Record Data: ${recordData}`); // TODO: Do interesting work based on the new data } catch (err) { console.error(`An error occurred ${err}`); /* Since we are working with streams, we can return the failed item immediately. Lambda will immediately begin to retry processing from this failed item onwards. */ return { batchItemFailures: [{ itemIdentifier: record.kinesis.sequenceNumber }], }; } } console.log(`Successfully processed ${event.Records.length} records.`); return { batchItemFailures: [] }; }; async function getRecordDataAsync(payload) { var data = Buffer.from(payload.data, "base64").toString("utf-8"); await Promise.resolve(1); //Placeholder for actual async work return data; }

使用 Lambda using TypeScript 報告 Kinesis 批次項目失敗。

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 import { KinesisStreamEvent, Context, KinesisStreamHandler, KinesisStreamRecordPayload, KinesisStreamBatchResponse, } from "aws-lambda"; import { Buffer } from "buffer"; import { Logger } from "@aws-lambda-powertools/logger"; const logger = new Logger({ logLevel: "INFO", serviceName: "kinesis-stream-handler-sample", }); export const functionHandler: KinesisStreamHandler = async ( event: KinesisStreamEvent, context: Context ): Promise<KinesisStreamBatchResponse> => { for (const record of event.Records) { try { logger.info(`Processed Kinesis Event - EventID: ${record.eventID}`); const recordData = await getRecordDataAsync(record.kinesis); logger.info(`Record Data: ${recordData}`); // TODO: Do interesting work based on the new data } catch (err) { logger.error(`An error occurred ${err}`); /* Since we are working with streams, we can return the failed item immediately. Lambda will immediately begin to retry processing from this failed item onwards. */ return { batchItemFailures: [{ itemIdentifier: record.kinesis.sequenceNumber }], }; } } logger.info(`Successfully processed ${event.Records.length} records.`); return { batchItemFailures: [] }; }; async function getRecordDataAsync( payload: KinesisStreamRecordPayload ): Promise<string> { var data = Buffer.from(payload.data, "base64").toString("utf-8"); await Promise.resolve(1); //Placeholder for actual async work return data; }