使用 的 Kinesis 範例 AWS SDK for .NET - AWS SDK for .NET

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

使用 的 Kinesis 範例 AWS SDK for .NET

下列程式碼範例示範如何搭配 AWS SDK for .NET Kinesis 使用 來執行動作和實作常見案例。

Actions 是大型程式的程式碼摘錄,必須在內容中執行。雖然動作會示範如何呼叫個別服務函數,但您可以在相關案例中查看內容中的動作。

每個範例都包含完整原始程式碼的連結,您可以在其中找到如何在內容中設定和執行程式碼的指示。

動作

下列程式碼範例示範如何使用 AddTagsToStream

AWS SDK for .NET
注意

還有更多 。 GitHub尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

using System; using System.Collections.Generic; using System.Threading.Tasks; using Amazon.Kinesis; using Amazon.Kinesis.Model; /// <summary> /// This example shows how to apply key/value pairs to an Amazon Kinesis /// stream. /// </summary> public class TagStream { public static async Task Main() { IAmazonKinesis client = new AmazonKinesisClient(); string streamName = "AmazonKinesisStream"; var tags = new Dictionary<string, string> { { "Project", "Sample Kinesis Project" }, { "Application", "Sample Kinesis App" }, }; var success = await ApplyTagsToStreamAsync(client, streamName, tags); if (success) { Console.WriteLine($"Taggs successfully added to {streamName}."); } else { Console.WriteLine("Tags were not added to the stream."); } } /// <summary> /// Applies the set of tags to the named Kinesis stream. /// </summary> /// <param name="client">The initialized Kinesis client.</param> /// <param name="streamName">The name of the Kinesis stream to which /// the tags will be attached.</param> /// <param name="tags">A sictionary containing key/value pairs which /// will be used to create the Kinesis tags.</param> /// <returns>A Boolean value which represents the success or failure /// of AddTagsToStreamAsync.</returns> public static async Task<bool> ApplyTagsToStreamAsync( IAmazonKinesis client, string streamName, Dictionary<string, string> tags) { var request = new AddTagsToStreamRequest { StreamName = streamName, Tags = tags, }; var response = await client.AddTagsToStreamAsync(request); return response.HttpStatusCode == System.Net.HttpStatusCode.OK; } }
  • 如需API詳細資訊,請參閱 參考 AddTagsToStream中的 。 AWS SDK for .NET API

下列程式碼範例示範如何使用 CreateStream

AWS SDK for .NET
注意

還有更多 。 GitHub尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

using System; using System.Threading.Tasks; using Amazon.Kinesis; using Amazon.Kinesis.Model; /// <summary> /// This example shows how to create a new Amazon Kinesis stream. /// </summary> public class CreateStream { public static async Task Main() { IAmazonKinesis client = new AmazonKinesisClient(); string streamName = "AmazonKinesisStream"; int shardCount = 1; var success = await CreateNewStreamAsync(client, streamName, shardCount); if (success) { Console.WriteLine($"The stream, {streamName} successfully created."); } } /// <summary> /// Creates a new Kinesis stream. /// </summary> /// <param name="client">An initialized Kinesis client.</param> /// <param name="streamName">The name for the new stream.</param> /// <param name="shardCount">The number of shards the new stream will /// use. The throughput of the stream is a function of the number of /// shards; more shards are required for greater provisioned /// throughput.</param> /// <returns>A Boolean value indicating whether the stream was created.</returns> public static async Task<bool> CreateNewStreamAsync(IAmazonKinesis client, string streamName, int shardCount) { var request = new CreateStreamRequest { StreamName = streamName, ShardCount = shardCount, }; var response = await client.CreateStreamAsync(request); return response.HttpStatusCode == System.Net.HttpStatusCode.OK; } }
  • 如需API詳細資訊,請參閱 參考 CreateStream中的 。 AWS SDK for .NET API

下列程式碼範例示範如何使用 DeleteStream

AWS SDK for .NET
注意

還有更多 。 GitHub尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

using System; using System.Threading.Tasks; using Amazon.Kinesis; using Amazon.Kinesis.Model; /// <summary> /// Shows how to delete an Amazon Kinesis stream. /// </summary> public class DeleteStream { public static async Task Main() { IAmazonKinesis client = new AmazonKinesisClient(); string streamName = "AmazonKinesisStream"; var success = await DeleteStreamAsync(client, streamName); if (success) { Console.WriteLine($"Stream, {streamName} successfully deleted."); } else { Console.WriteLine("Stream not deleted."); } } /// <summary> /// Deletes a Kinesis stream. /// </summary> /// <param name="client">An initialized Kinesis client object.</param> /// <param name="streamName">The name of the string to delete.</param> /// <returns>A Boolean value representing the success of the operation.</returns> public static async Task<bool> DeleteStreamAsync(IAmazonKinesis client, string streamName) { // If EnforceConsumerDeletion is true, any consumers // of this stream will also be deleted. If it is set // to false and this stream has any consumers, the // call will fail with a ResourceInUseException. var request = new DeleteStreamRequest { StreamName = streamName, EnforceConsumerDeletion = true, }; var response = await client.DeleteStreamAsync(request); return response.HttpStatusCode == System.Net.HttpStatusCode.OK; } }
  • 如需API詳細資訊,請參閱 參考 DeleteStream中的 。 AWS SDK for .NET API

下列程式碼範例示範如何使用 DeregisterStreamConsumer

AWS SDK for .NET
注意

還有更多 。 GitHub尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

using System; using System.Threading.Tasks; using Amazon.Kinesis; using Amazon.Kinesis.Model; /// <summary> /// Shows how to deregister a consumer from an Amazon Kinesis stream. /// </summary> public class DeregisterConsumer { public static async Task Main(string[] args) { IAmazonKinesis client = new AmazonKinesisClient(); string streamARN = "arn:aws:kinesis:us-west-2:000000000000:stream/AmazonKinesisStream"; string consumerName = "CONSUMER_NAME"; string consumerARN = "arn:aws:kinesis:us-west-2:000000000000:stream/AmazonKinesisStream/consumer/CONSUMER_NAME:000000000000"; var success = await DeregisterConsumerAsync(client, streamARN, consumerARN, consumerName); if (success) { Console.WriteLine($"{consumerName} successfully deregistered."); } else { Console.WriteLine($"{consumerName} was not successfully deregistered."); } } /// <summary> /// Deregisters a consumer from a Kinesis stream. /// </summary> /// <param name="client">An initialized Kinesis client object.</param> /// <param name="streamARN">The ARN of a Kinesis stream.</param> /// <param name="consumerARN">The ARN of the consumer.</param> /// <param name="consumerName">The name of the consumer.</param> /// <returns>A Boolean value representing the success of the operation.</returns> public static async Task<bool> DeregisterConsumerAsync( IAmazonKinesis client, string streamARN, string consumerARN, string consumerName) { var request = new DeregisterStreamConsumerRequest { StreamARN = streamARN, ConsumerARN = consumerARN, ConsumerName = consumerName, }; var response = await client.DeregisterStreamConsumerAsync(request); return response.HttpStatusCode == System.Net.HttpStatusCode.OK; } }

下列程式碼範例示範如何使用 ListStreamConsumers

AWS SDK for .NET
注意

還有更多 。 GitHub尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

using System; using System.Collections.Generic; using System.Threading.Tasks; using Amazon.Kinesis; using Amazon.Kinesis.Model; /// <summary> /// List the consumers of an Amazon Kinesis stream. /// </summary> public class ListConsumers { public static async Task Main() { IAmazonKinesis client = new AmazonKinesisClient(); string streamARN = "arn:aws:kinesis:us-east-2:000000000000:stream/AmazonKinesisStream"; int maxResults = 10; var consumers = await ListConsumersAsync(client, streamARN, maxResults); if (consumers.Count > 0) { consumers .ForEach(c => Console.WriteLine($"Name: {c.ConsumerName} ARN: {c.ConsumerARN}")); } else { Console.WriteLine("No consumers found."); } } /// <summary> /// Retrieve a list of the consumers for a Kinesis stream. /// </summary> /// <param name="client">An initialized Kinesis client object.</param> /// <param name="streamARN">The ARN of the stream for which we want to /// retrieve a list of clients.</param> /// <param name="maxResults">The maximum number of results to return.</param> /// <returns>A list of Consumer objects.</returns> public static async Task<List<Consumer>> ListConsumersAsync(IAmazonKinesis client, string streamARN, int maxResults) { var request = new ListStreamConsumersRequest { StreamARN = streamARN, MaxResults = maxResults, }; var response = await client.ListStreamConsumersAsync(request); return response.Consumers; } }

下列程式碼範例示範如何使用 ListStreams

AWS SDK for .NET
注意

還有更多 。 GitHub尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

using System; using System.Collections.Generic; using System.Threading.Tasks; using Amazon.Kinesis; using Amazon.Kinesis.Model; /// <summary> /// Retrieves and displays a list of existing Amazon Kinesis streams. /// </summary> public class ListStreams { public static async Task Main(string[] args) { IAmazonKinesis client = new AmazonKinesisClient(); var response = await client.ListStreamsAsync(new ListStreamsRequest()); List<string> streamNames = response.StreamNames; if (streamNames.Count > 0) { streamNames .ForEach(s => Console.WriteLine($"Stream name: {s}")); } else { Console.WriteLine("No streams were found."); } } }
  • 如需API詳細資訊,請參閱 參考 ListStreams中的 。 AWS SDK for .NET API

下列程式碼範例示範如何使用 ListTagsForStream

AWS SDK for .NET
注意

還有更多 。 GitHub尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

using System; using System.Collections.Generic; using System.Threading.Tasks; using Amazon.Kinesis; using Amazon.Kinesis.Model; /// <summary> /// Shows how to list the tags that have been attached to an Amazon Kinesis /// stream. /// </summary> public class ListTags { public static async Task Main() { IAmazonKinesis client = new AmazonKinesisClient(); string streamName = "AmazonKinesisStream"; await ListTagsAsync(client, streamName); } /// <summary> /// List the tags attached to a Kinesis stream. /// </summary> /// <param name="client">An initialized Kinesis client object.</param> /// <param name="streamName">The name of the Kinesis stream for which you /// wish to display tags.</param> public static async Task ListTagsAsync(IAmazonKinesis client, string streamName) { var request = new ListTagsForStreamRequest { StreamName = streamName, Limit = 10, }; var response = await client.ListTagsForStreamAsync(request); DisplayTags(response.Tags); while (response.HasMoreTags) { request.ExclusiveStartTagKey = response.Tags[response.Tags.Count - 1].Key; response = await client.ListTagsForStreamAsync(request); } } /// <summary> /// Displays the items in a list of Kinesis tags. /// </summary> /// <param name="tags">A list of the Tag objects to be displayed.</param> public static void DisplayTags(List<Tag> tags) { tags .ForEach(t => Console.WriteLine($"Key: {t.Key} Value: {t.Value}")); } }
  • 如需API詳細資訊,請參閱 參考 ListTagsForStream中的 。 AWS SDK for .NET API

下列程式碼範例示範如何使用 RegisterStreamConsumer

AWS SDK for .NET
注意

還有更多 。 GitHub尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

using System; using System.Threading.Tasks; using Amazon.Kinesis; using Amazon.Kinesis.Model; /// <summary> /// This example shows how to register a consumer to an Amazon Kinesis /// stream. /// </summary> public class RegisterConsumer { public static async Task Main() { IAmazonKinesis client = new AmazonKinesisClient(); string consumerName = "NEW_CONSUMER_NAME"; string streamARN = "arn:aws:kinesis:us-east-2:000000000000:stream/AmazonKinesisStream"; var consumer = await RegisterConsumerAsync(client, consumerName, streamARN); if (consumer is not null) { Console.WriteLine($"{consumer.ConsumerName}"); } } /// <summary> /// Registers the consumer to a Kinesis stream. /// </summary> /// <param name="client">The initialized Kinesis client object.</param> /// <param name="consumerName">A string representing the consumer.</param> /// <param name="streamARN">The ARN of the stream.</param> /// <returns>A Consumer object that contains information about the consumer.</returns> public static async Task<Consumer> RegisterConsumerAsync(IAmazonKinesis client, string consumerName, string streamARN) { var request = new RegisterStreamConsumerRequest { ConsumerName = consumerName, StreamARN = streamARN, }; var response = await client.RegisterStreamConsumerAsync(request); return response.Consumer; } }

無伺服器範例

下列程式碼範例示範如何實作 Lambda 函數,該函數接收從 Kinesis 串流接收記錄所觸發的事件。此函數會擷取 Kinesis 承載、從 Base64 解碼,並記錄記錄內容。

AWS SDK for .NET
注意

還有更多 。 GitHub尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。

使用 使用 Lambda 來使用 Kinesis 事件NET。

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 using System.Text; using Amazon.Lambda.Core; using Amazon.Lambda.KinesisEvents; using AWS.Lambda.Powertools.Logging; // Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class. [assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))] namespace KinesisIntegrationSampleCode; public class Function { // Powertools Logger requires an environment variables against your function // POWERTOOLS_SERVICE_NAME [Logging(LogEvent = true)] public async Task FunctionHandler(KinesisEvent evnt, ILambdaContext context) { if (evnt.Records.Count == 0) { Logger.LogInformation("Empty Kinesis Event received"); return; } foreach (var record in evnt.Records) { try { Logger.LogInformation($"Processed Event with EventId: {record.EventId}"); string data = await GetRecordDataAsync(record.Kinesis, context); Logger.LogInformation($"Data: {data}"); // TODO: Do interesting work based on the new data } catch (Exception ex) { Logger.LogError($"An error occurred {ex.Message}"); throw; } } Logger.LogInformation($"Successfully processed {evnt.Records.Count} records."); } private async Task<string> GetRecordDataAsync(KinesisEvent.Record record, ILambdaContext context) { byte[] bytes = record.Data.ToArray(); string data = Encoding.UTF8.GetString(bytes); await Task.CompletedTask; //Placeholder for actual async work return data; } }

下列程式碼範例示範如何針對從 Kinesis 串流接收事件的 Lambda 函數實作部分批次回應。此函數會在回應中報告批次項目失敗,指示 Lambda 稍後重試這些訊息。

AWS SDK for .NET
注意

還有更多 。 GitHub尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。

使用 報告使用 Lambda 的 Kinesis 批次項目失敗NET。

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 using System.Text; using System.Text.Json.Serialization; using Amazon.Lambda.Core; using Amazon.Lambda.KinesisEvents; using AWS.Lambda.Powertools.Logging; // Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class. [assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))] namespace KinesisIntegration; public class Function { // Powertools Logger requires an environment variables against your function // POWERTOOLS_SERVICE_NAME [Logging(LogEvent = true)] public async Task<StreamsEventResponse> FunctionHandler(KinesisEvent evnt, ILambdaContext context) { if (evnt.Records.Count == 0) { Logger.LogInformation("Empty Kinesis Event received"); return new StreamsEventResponse(); } foreach (var record in evnt.Records) { try { Logger.LogInformation($"Processed Event with EventId: {record.EventId}"); string data = await GetRecordDataAsync(record.Kinesis, context); Logger.LogInformation($"Data: {data}"); // TODO: Do interesting work based on the new data } catch (Exception ex) { Logger.LogError($"An error occurred {ex.Message}"); /* Since we are working with streams, we can return the failed item immediately. Lambda will immediately begin to retry processing from this failed item onwards. */ return new StreamsEventResponse { BatchItemFailures = new List<StreamsEventResponse.BatchItemFailure> { new StreamsEventResponse.BatchItemFailure { ItemIdentifier = record.Kinesis.SequenceNumber } } }; } } Logger.LogInformation($"Successfully processed {evnt.Records.Count} records."); return new StreamsEventResponse(); } private async Task<string> GetRecordDataAsync(KinesisEvent.Record record, ILambdaContext context) { byte[] bytes = record.Data.ToArray(); string data = Encoding.UTF8.GetString(bytes); await Task.CompletedTask; //Placeholder for actual async work return data; } } public class StreamsEventResponse { [JsonPropertyName("batchItemFailures")] public IList<BatchItemFailure> BatchItemFailures { get; set; } public class BatchItemFailure { [JsonPropertyName("itemIdentifier")] public string ItemIdentifier { get; set; } } }