Exemples d'utilisation de Kinesis AWS SDK for .NET - Exemples de code de l'AWS SDK

D'autres AWS SDK exemples sont disponibles dans le GitHub dépôt AWS Doc SDK Examples.

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Exemples d'utilisation de Kinesis AWS SDK for .NET

Les exemples de code suivants vous montrent comment effectuer des actions et implémenter des scénarios courants à l' AWS SDK for .NET aide de Winesis.

Les actions sont des extraits de code de programmes plus larges et doivent être exécutées dans leur contexte. Alors que les actions vous montrent comment appeler des fonctions de service individuelles, vous pouvez les visualiser dans leur contexte dans leurs scénarios associés.

Chaque exemple inclut un lien vers le code source complet, où vous trouverez des instructions sur la façon de configurer et d'exécuter le code en contexte.

Actions

L'exemple de code suivant montre comment utiliserAddTagsToStream.

AWS SDK for .NET
Note

Il y en a plus à ce sujet GitHub. Trouvez l’exemple complet et découvrez comment le configurer et l’exécuter dans le référentiel d’exemples de code AWS.

using System; using System.Collections.Generic; using System.Threading.Tasks; using Amazon.Kinesis; using Amazon.Kinesis.Model; /// <summary> /// This example shows how to apply key/value pairs to an Amazon Kinesis /// stream. /// </summary> public class TagStream { public static async Task Main() { IAmazonKinesis client = new AmazonKinesisClient(); string streamName = "AmazonKinesisStream"; var tags = new Dictionary<string, string> { { "Project", "Sample Kinesis Project" }, { "Application", "Sample Kinesis App" }, }; var success = await ApplyTagsToStreamAsync(client, streamName, tags); if (success) { Console.WriteLine($"Taggs successfully added to {streamName}."); } else { Console.WriteLine("Tags were not added to the stream."); } } /// <summary> /// Applies the set of tags to the named Kinesis stream. /// </summary> /// <param name="client">The initialized Kinesis client.</param> /// <param name="streamName">The name of the Kinesis stream to which /// the tags will be attached.</param> /// <param name="tags">A sictionary containing key/value pairs which /// will be used to create the Kinesis tags.</param> /// <returns>A Boolean value which represents the success or failure /// of AddTagsToStreamAsync.</returns> public static async Task<bool> ApplyTagsToStreamAsync( IAmazonKinesis client, string streamName, Dictionary<string, string> tags) { var request = new AddTagsToStreamRequest { StreamName = streamName, Tags = tags, }; var response = await client.AddTagsToStreamAsync(request); return response.HttpStatusCode == System.Net.HttpStatusCode.OK; } }
  • Pour API plus de détails, voir AddTagsToStreamla section AWS SDK for .NET APIRéférence.

L'exemple de code suivant montre comment utiliserCreateStream.

AWS SDK for .NET
Note

Il y en a plus à ce sujet GitHub. Trouvez l’exemple complet et découvrez comment le configurer et l’exécuter dans le référentiel d’exemples de code AWS.

using System; using System.Threading.Tasks; using Amazon.Kinesis; using Amazon.Kinesis.Model; /// <summary> /// This example shows how to create a new Amazon Kinesis stream. /// </summary> public class CreateStream { public static async Task Main() { IAmazonKinesis client = new AmazonKinesisClient(); string streamName = "AmazonKinesisStream"; int shardCount = 1; var success = await CreateNewStreamAsync(client, streamName, shardCount); if (success) { Console.WriteLine($"The stream, {streamName} successfully created."); } } /// <summary> /// Creates a new Kinesis stream. /// </summary> /// <param name="client">An initialized Kinesis client.</param> /// <param name="streamName">The name for the new stream.</param> /// <param name="shardCount">The number of shards the new stream will /// use. The throughput of the stream is a function of the number of /// shards; more shards are required for greater provisioned /// throughput.</param> /// <returns>A Boolean value indicating whether the stream was created.</returns> public static async Task<bool> CreateNewStreamAsync(IAmazonKinesis client, string streamName, int shardCount) { var request = new CreateStreamRequest { StreamName = streamName, ShardCount = shardCount, }; var response = await client.CreateStreamAsync(request); return response.HttpStatusCode == System.Net.HttpStatusCode.OK; } }
  • Pour API plus de détails, voir CreateStreamla section AWS SDK for .NET APIRéférence.

L'exemple de code suivant montre comment utiliserDeleteStream.

AWS SDK for .NET
Note

Il y en a plus à ce sujet GitHub. Trouvez l’exemple complet et découvrez comment le configurer et l’exécuter dans le référentiel d’exemples de code AWS.

using System; using System.Threading.Tasks; using Amazon.Kinesis; using Amazon.Kinesis.Model; /// <summary> /// Shows how to delete an Amazon Kinesis stream. /// </summary> public class DeleteStream { public static async Task Main() { IAmazonKinesis client = new AmazonKinesisClient(); string streamName = "AmazonKinesisStream"; var success = await DeleteStreamAsync(client, streamName); if (success) { Console.WriteLine($"Stream, {streamName} successfully deleted."); } else { Console.WriteLine("Stream not deleted."); } } /// <summary> /// Deletes a Kinesis stream. /// </summary> /// <param name="client">An initialized Kinesis client object.</param> /// <param name="streamName">The name of the string to delete.</param> /// <returns>A Boolean value representing the success of the operation.</returns> public static async Task<bool> DeleteStreamAsync(IAmazonKinesis client, string streamName) { // If EnforceConsumerDeletion is true, any consumers // of this stream will also be deleted. If it is set // to false and this stream has any consumers, the // call will fail with a ResourceInUseException. var request = new DeleteStreamRequest { StreamName = streamName, EnforceConsumerDeletion = true, }; var response = await client.DeleteStreamAsync(request); return response.HttpStatusCode == System.Net.HttpStatusCode.OK; } }
  • Pour API plus de détails, voir DeleteStreamla section AWS SDK for .NET APIRéférence.

L'exemple de code suivant montre comment utiliserDeregisterStreamConsumer.

AWS SDK for .NET
Note

Il y en a plus à ce sujet GitHub. Trouvez l’exemple complet et découvrez comment le configurer et l’exécuter dans le référentiel d’exemples de code AWS.

using System; using System.Threading.Tasks; using Amazon.Kinesis; using Amazon.Kinesis.Model; /// <summary> /// Shows how to deregister a consumer from an Amazon Kinesis stream. /// </summary> public class DeregisterConsumer { public static async Task Main(string[] args) { IAmazonKinesis client = new AmazonKinesisClient(); string streamARN = "arn:aws:kinesis:us-west-2:000000000000:stream/AmazonKinesisStream"; string consumerName = "CONSUMER_NAME"; string consumerARN = "arn:aws:kinesis:us-west-2:000000000000:stream/AmazonKinesisStream/consumer/CONSUMER_NAME:000000000000"; var success = await DeregisterConsumerAsync(client, streamARN, consumerARN, consumerName); if (success) { Console.WriteLine($"{consumerName} successfully deregistered."); } else { Console.WriteLine($"{consumerName} was not successfully deregistered."); } } /// <summary> /// Deregisters a consumer from a Kinesis stream. /// </summary> /// <param name="client">An initialized Kinesis client object.</param> /// <param name="streamARN">The ARN of a Kinesis stream.</param> /// <param name="consumerARN">The ARN of the consumer.</param> /// <param name="consumerName">The name of the consumer.</param> /// <returns>A Boolean value representing the success of the operation.</returns> public static async Task<bool> DeregisterConsumerAsync( IAmazonKinesis client, string streamARN, string consumerARN, string consumerName) { var request = new DeregisterStreamConsumerRequest { StreamARN = streamARN, ConsumerARN = consumerARN, ConsumerName = consumerName, }; var response = await client.DeregisterStreamConsumerAsync(request); return response.HttpStatusCode == System.Net.HttpStatusCode.OK; } }

L'exemple de code suivant montre comment utiliserListStreamConsumers.

AWS SDK for .NET
Note

Il y en a plus à ce sujet GitHub. Trouvez l’exemple complet et découvrez comment le configurer et l’exécuter dans le référentiel d’exemples de code AWS.

using System; using System.Collections.Generic; using System.Threading.Tasks; using Amazon.Kinesis; using Amazon.Kinesis.Model; /// <summary> /// List the consumers of an Amazon Kinesis stream. /// </summary> public class ListConsumers { public static async Task Main() { IAmazonKinesis client = new AmazonKinesisClient(); string streamARN = "arn:aws:kinesis:us-east-2:000000000000:stream/AmazonKinesisStream"; int maxResults = 10; var consumers = await ListConsumersAsync(client, streamARN, maxResults); if (consumers.Count > 0) { consumers .ForEach(c => Console.WriteLine($"Name: {c.ConsumerName} ARN: {c.ConsumerARN}")); } else { Console.WriteLine("No consumers found."); } } /// <summary> /// Retrieve a list of the consumers for a Kinesis stream. /// </summary> /// <param name="client">An initialized Kinesis client object.</param> /// <param name="streamARN">The ARN of the stream for which we want to /// retrieve a list of clients.</param> /// <param name="maxResults">The maximum number of results to return.</param> /// <returns>A list of Consumer objects.</returns> public static async Task<List<Consumer>> ListConsumersAsync(IAmazonKinesis client, string streamARN, int maxResults) { var request = new ListStreamConsumersRequest { StreamARN = streamARN, MaxResults = maxResults, }; var response = await client.ListStreamConsumersAsync(request); return response.Consumers; } }
  • Pour API plus de détails, voir ListStreamConsumersla section AWS SDK for .NET APIRéférence.

L'exemple de code suivant montre comment utiliserListStreams.

AWS SDK for .NET
Note

Il y en a plus à ce sujet GitHub. Trouvez l’exemple complet et découvrez comment le configurer et l’exécuter dans le référentiel d’exemples de code AWS.

using System; using System.Collections.Generic; using System.Threading.Tasks; using Amazon.Kinesis; using Amazon.Kinesis.Model; /// <summary> /// Retrieves and displays a list of existing Amazon Kinesis streams. /// </summary> public class ListStreams { public static async Task Main(string[] args) { IAmazonKinesis client = new AmazonKinesisClient(); var response = await client.ListStreamsAsync(new ListStreamsRequest()); List<string> streamNames = response.StreamNames; if (streamNames.Count > 0) { streamNames .ForEach(s => Console.WriteLine($"Stream name: {s}")); } else { Console.WriteLine("No streams were found."); } } }
  • Pour API plus de détails, voir ListStreamsla section AWS SDK for .NET APIRéférence.

L'exemple de code suivant montre comment utiliserListTagsForStream.

AWS SDK for .NET
Note

Il y en a plus à ce sujet GitHub. Trouvez l’exemple complet et découvrez comment le configurer et l’exécuter dans le référentiel d’exemples de code AWS.

using System; using System.Collections.Generic; using System.Threading.Tasks; using Amazon.Kinesis; using Amazon.Kinesis.Model; /// <summary> /// Shows how to list the tags that have been attached to an Amazon Kinesis /// stream. /// </summary> public class ListTags { public static async Task Main() { IAmazonKinesis client = new AmazonKinesisClient(); string streamName = "AmazonKinesisStream"; await ListTagsAsync(client, streamName); } /// <summary> /// List the tags attached to a Kinesis stream. /// </summary> /// <param name="client">An initialized Kinesis client object.</param> /// <param name="streamName">The name of the Kinesis stream for which you /// wish to display tags.</param> public static async Task ListTagsAsync(IAmazonKinesis client, string streamName) { var request = new ListTagsForStreamRequest { StreamName = streamName, Limit = 10, }; var response = await client.ListTagsForStreamAsync(request); DisplayTags(response.Tags); while (response.HasMoreTags) { request.ExclusiveStartTagKey = response.Tags[response.Tags.Count - 1].Key; response = await client.ListTagsForStreamAsync(request); } } /// <summary> /// Displays the items in a list of Kinesis tags. /// </summary> /// <param name="tags">A list of the Tag objects to be displayed.</param> public static void DisplayTags(List<Tag> tags) { tags .ForEach(t => Console.WriteLine($"Key: {t.Key} Value: {t.Value}")); } }
  • Pour API plus de détails, voir ListTagsForStreamla section AWS SDK for .NET APIRéférence.

L'exemple de code suivant montre comment utiliserRegisterStreamConsumer.

AWS SDK for .NET
Note

Il y en a plus à ce sujet GitHub. Trouvez l’exemple complet et découvrez comment le configurer et l’exécuter dans le référentiel d’exemples de code AWS.

using System; using System.Threading.Tasks; using Amazon.Kinesis; using Amazon.Kinesis.Model; /// <summary> /// This example shows how to register a consumer to an Amazon Kinesis /// stream. /// </summary> public class RegisterConsumer { public static async Task Main() { IAmazonKinesis client = new AmazonKinesisClient(); string consumerName = "NEW_CONSUMER_NAME"; string streamARN = "arn:aws:kinesis:us-east-2:000000000000:stream/AmazonKinesisStream"; var consumer = await RegisterConsumerAsync(client, consumerName, streamARN); if (consumer is not null) { Console.WriteLine($"{consumer.ConsumerName}"); } } /// <summary> /// Registers the consumer to a Kinesis stream. /// </summary> /// <param name="client">The initialized Kinesis client object.</param> /// <param name="consumerName">A string representing the consumer.</param> /// <param name="streamARN">The ARN of the stream.</param> /// <returns>A Consumer object that contains information about the consumer.</returns> public static async Task<Consumer> RegisterConsumerAsync(IAmazonKinesis client, string consumerName, string streamARN) { var request = new RegisterStreamConsumerRequest { ConsumerName = consumerName, StreamARN = streamARN, }; var response = await client.RegisterStreamConsumerAsync(request); return response.Consumer; } }

Exemples sans serveur

L'exemple de code suivant montre comment implémenter une fonction Lambda qui reçoit un événement déclenché par la réception d'enregistrements provenant d'un flux Kinesis. La fonction récupère la charge utile Kinesis, décode à partir de Base64 et enregistre le contenu de l’enregistrement.

AWS SDK for .NET
Note

Il y en a plus à ce sujet GitHub. Trouvez l’exemple complet et découvrez comment le configurer et l’exécuter dans le référentiel d’exemples sans serveur.

Utilisation d'un événement Kinesis avec Lambda à l'aide de. NET.

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 using System.Text; using Amazon.Lambda.Core; using Amazon.Lambda.KinesisEvents; using AWS.Lambda.Powertools.Logging; // Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class. [assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))] namespace KinesisIntegrationSampleCode; public class Function { // Powertools Logger requires an environment variables against your function // POWERTOOLS_SERVICE_NAME [Logging(LogEvent = true)] public async Task FunctionHandler(KinesisEvent evnt, ILambdaContext context) { if (evnt.Records.Count == 0) { Logger.LogInformation("Empty Kinesis Event received"); return; } foreach (var record in evnt.Records) { try { Logger.LogInformation($"Processed Event with EventId: {record.EventId}"); string data = await GetRecordDataAsync(record.Kinesis, context); Logger.LogInformation($"Data: {data}"); // TODO: Do interesting work based on the new data } catch (Exception ex) { Logger.LogError($"An error occurred {ex.Message}"); throw; } } Logger.LogInformation($"Successfully processed {evnt.Records.Count} records."); } private async Task<string> GetRecordDataAsync(KinesisEvent.Record record, ILambdaContext context) { byte[] bytes = record.Data.ToArray(); string data = Encoding.UTF8.GetString(bytes); await Task.CompletedTask; //Placeholder for actual async work return data; } }

L'exemple de code suivant montre comment implémenter une réponse par lots partielle pour les fonctions Lambda qui reçoivent des événements d'un flux Kinesis. La fonction signale les défaillances échecs d’articles par lots dans la réponse, en indiquant à Lambda de réessayer ces messages ultérieurement.

AWS SDK for .NET
Note

Il y en a plus à ce sujet GitHub. Trouvez l’exemple complet et découvrez comment le configurer et l’exécuter dans le référentiel d’exemples sans serveur.

Signaler les défaillances d'éléments de lot Kinesis avec Lambda à l'aide de. NET.

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 using System.Text; using System.Text.Json.Serialization; using Amazon.Lambda.Core; using Amazon.Lambda.KinesisEvents; using AWS.Lambda.Powertools.Logging; // Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class. [assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))] namespace KinesisIntegration; public class Function { // Powertools Logger requires an environment variables against your function // POWERTOOLS_SERVICE_NAME [Logging(LogEvent = true)] public async Task<StreamsEventResponse> FunctionHandler(KinesisEvent evnt, ILambdaContext context) { if (evnt.Records.Count == 0) { Logger.LogInformation("Empty Kinesis Event received"); return new StreamsEventResponse(); } foreach (var record in evnt.Records) { try { Logger.LogInformation($"Processed Event with EventId: {record.EventId}"); string data = await GetRecordDataAsync(record.Kinesis, context); Logger.LogInformation($"Data: {data}"); // TODO: Do interesting work based on the new data } catch (Exception ex) { Logger.LogError($"An error occurred {ex.Message}"); /* Since we are working with streams, we can return the failed item immediately. Lambda will immediately begin to retry processing from this failed item onwards. */ return new StreamsEventResponse { BatchItemFailures = new List<StreamsEventResponse.BatchItemFailure> { new StreamsEventResponse.BatchItemFailure { ItemIdentifier = record.Kinesis.SequenceNumber } } }; } } Logger.LogInformation($"Successfully processed {evnt.Records.Count} records."); return new StreamsEventResponse(); } private async Task<string> GetRecordDataAsync(KinesisEvent.Record record, ILambdaContext context) { byte[] bytes = record.Data.ToArray(); string data = Encoding.UTF8.GetString(bytes); await Task.CompletedTask; //Placeholder for actual async work return data; } } public class StreamsEventResponse { [JsonPropertyName("batchItemFailures")] public IList<BatchItemFailure> BatchItemFailures { get; set; } public class BatchItemFailure { [JsonPropertyName("itemIdentifier")] public string ItemIdentifier { get; set; } } }