Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.
Amazon Kinesis
En utilisant Service géré Amazon pour Apache Flink
Vous pouvez envoyer des données de Kinesis Data Streams à Timestream LiveAnalytics pour utiliser le connecteur de données d' Timestream exemple pour Managed Service for Apache Flink. Reportez-vous Service géré Amazon pour Apache Flink à Apache Flink pour plus d'informations.
Utilisation de EventBridge Pipes pour envoyer des données Kinesis à Timestream
Vous pouvez utiliser EventBridge Pipes pour envoyer des données d'un flux Kinesis vers une table Amazon Timestream for. LiveAnalytics
Les tubes sont destinés aux point-to-point intégrations entre les sources et les cibles prises en charge, avec la prise en charge des transformations avancées et de l'enrichissement. Pipes réduit le besoin de connaissances spécialisées et de code d'intégration lors du développement d'architectures pilotées par les événements. Pour configurer un canal, vous devez choisir la source, ajouter un filtrage facultatif, définir un enrichissement facultatif et choisir la cible pour les données d'événement.
Cette intégration vous permet de tirer parti de la puissance des capacités Timestream d'analyse des séries chronologiques de données, tout en simplifiant votre pipeline d'ingestion de données.
L'utilisation de EventBridge Pipes with Timestream offre les avantages suivants :
Ingestion des données en temps réel : diffusez les données de Kinesis directement vers Timestream pour permettre des analyses et une LiveAnalytics surveillance en temps réel.
Intégration fluide : utilisez EventBridge Pipes pour gérer le flux de données sans avoir besoin d'intégrations personnalisées complexes.
Filtrage et transformation améliorés : filtrez ou transformez les enregistrements Kinesis avant leur stockage afin de répondre Timestream à vos besoins spécifiques en matière de traitement des données.
Évolutivité : gérez des flux de données à haut débit et garantissez un traitement efficace des données grâce aux fonctionnalités intégrées de parallélisme et de traitement par lots.
Configuration
Pour configurer un EventBridge canal vers lequel diffuser des données depuis Kinesis Timestream, procédez comme suit :
Créez un flux Kinesis
Assurez-vous de disposer d'un flux de données Kinesis actif à partir duquel vous souhaitez ingérer des données.
Création d'une Timestream base de données et d'une table
Configurez votre Timestream base de données et votre table dans laquelle les données seront stockées.
Configurez le EventBridge tuyau :
Source : sélectionnez votre flux Kinesis comme source.
Cible : Choisissez Timestream comme cible.
Paramètres de traitement par lots : définissez la fenêtre de traitement par lots et la taille du lot pour optimiser le traitement des données et réduire la latence.
Important
Lors de la configuration d'un canal, nous vous recommandons de tester l'exactitude de toutes les configurations en ingérant quelques enregistrements. Veuillez noter que la création réussie d'un tuyau ne garantit pas que le pipeline est correct et que les données circuleront sans erreur. Certaines erreurs d'exécution, telles qu'une table incorrecte, un paramètre de chemin dynamique incorrect ou un Timestream enregistrement non valide après l'application du mappage, seront découvertes lorsque les données réelles circuleront dans le canal.
Les configurations suivantes déterminent la vitesse à laquelle les données sont ingérées :
BatchSize: taille maximale du lot qui sera envoyé à Timestream pour. LiveAnalytics Plage : 0 à 100. Il est recommandé de conserver cette valeur à 100 pour obtenir un débit maximal.
MaximumBatchingWindowInSeconds: le temps d'attente maximal batchSize avant que le lot ne soit envoyé à Timestream pour LiveAnalytics la cible. En fonction du taux d'événements entrants, cette configuration décidera du délai d'ingestion. Il est recommandé de conserver cette valeur < 10 s pour continuer à envoyer les données Timestream en temps quasi réel.
ParallelizationFactor: le nombre de lots à traiter simultanément à partir de chaque partition. Il est recommandé d'utiliser la valeur maximale de 10 pour obtenir un débit maximal et une ingestion en temps quasi réel.
Si votre flux est lu par plusieurs cibles, utilisez un ventilateur amélioré pour fournir un consommateur dédié à votre chaîne afin d'atteindre un débit élevé. Pour plus d'informations, consultez la section Développement d'utilisateurs optimisés grâce Kinesis Data Streams API au guide de l'Kinesis Data Streams utilisateur.
Note
Le débit maximal pouvant être atteint est limité par les exécutions de canaux simultanées par compte.
La configuration suivante garantit la prévention des pertes de données :
DeadLetterConfig: Il est recommandé de toujours configurer DeadLetterConfig pour éviter toute perte de données dans les cas où les événements n'ont pas pu être ingérés dans Timestream LiveAnalytics en raison d'erreurs utilisateur.
Optimisez les performances de votre canal à l'aide des paramètres de configuration suivants, qui permettent d'éviter que les enregistrements ne provoquent des ralentissements ou des blocages.
MaximumRecordAgeInSeconds: Les enregistrements plus anciens ne seront pas traités et seront directement déplacés versDLQ. Nous recommandons de définir cette valeur pour qu'elle ne soit pas supérieure à la période de rétention de mémoire configurée pour la Timestream table cible.
MaximumRetryAttempts: nombre de nouvelles tentatives pour un enregistrement avant que celui-ci ne soit envoyé à DeadLetterQueue. Il est recommandé de le configurer à 10. Cela devrait permettre de résoudre les problèmes transitoires. En cas de problème persistant, l'enregistrement sera déplacé vers le reste du flux DeadLetterQueue et débloquera le reste du flux.
OnPartialBatchItemFailure: Pour les sources qui prennent en charge le traitement partiel par lots, nous vous recommandons de l'activer et de le configurer sous la forme AUTOMATIC _ BISECT pour réessayer les enregistrements ayant échoué avant de les déposer/de les envoyer à. DLQ
Exemple de configuration
Voici un exemple de configuration d'un EventBridge canal pour diffuser les données d'un flux Kinesis vers une Timestream table :
Exemple IAM mises à jour des politiques pour Timestream
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "timestream:WriteRecords" ], "Resource": [ "arn:aws:timestream:us-east-1:123456789012:database/my-database/table/my-table" ] }, { "Effect": "Allow", "Action": [ "timestream:DescribeEndpoints" ], "Resource": "*" } ] }
Exemple Configuration du flux Kinesis
{ "Source": "arn:aws:kinesis:us-east-1:123456789012:stream/my-kinesis-stream", "SourceParameters": { "KinesisStreamParameters": { "BatchSize": 100, "DeadLetterConfig": { "Arn": "arn:aws:sqs:us-east-1:123456789012:my-sqs-queue" }, "MaximumBatchingWindowInSeconds": 5, "MaximumRecordAgeInSeconds": 1800, "MaximumRetryAttempts": 10, "StartingPosition": "LATEST", "OnPartialBatchItemFailure": "AUTOMATIC_BISECT" } } }
Exemple Timestream configuration cible
{ "Target": "arn:aws:timestream:us-east-1:123456789012:database/my-database/table/my-table", "TargetParameters": { "TimestreamParameters": { "DimensionMappings": [ { "DimensionName": "sensor_id", "DimensionValue": "$.data.device_id", "DimensionValueType": "VARCHAR" }, { "DimensionName": "sensor_type", "DimensionValue": "$.data.sensor_type", "DimensionValueType": "VARCHAR" }, { "DimensionName": "sensor_location", "DimensionValue": "$.data.sensor_loc", "DimensionValueType": "VARCHAR" } ], "MultiMeasureMappings": [ { "MultiMeasureName": "readings", "MultiMeasureAttributeMappings": [ { "MultiMeasureAttributeName": "temperature", "MeasureValue": "$.data.temperature", "MeasureValueType": "DOUBLE" }, { "MultiMeasureAttributeName": "humidity", "MeasureValue": "$.data.humidity", "MeasureValueType": "DOUBLE" }, { "MultiMeasureAttributeName": "pressure", "MeasureValue": "$.data.pressure", "MeasureValueType": "DOUBLE" } ] } ], "SingleMeasureMappings": [], "TimeFieldType": "TIMESTAMP_FORMAT", "TimestampFormat": "yyyy-MM-dd HH:mm:ss.SSS", "TimeValue": "$.data.time", "VersionValue": "$.approximateArrivalTimestamp" } } }
Transformation de l'événement
EventBridge Les canaux vous permettent de transformer les données avant qu'elles ne les atteignent Timestream. Vous pouvez définir des règles de transformation pour modifier les Kinesis enregistrements entrants, par exemple en modifiant les noms de champs.
Supposons que votre Kinesis flux contienne des données de température et d'humidité. Vous pouvez utiliser une EventBridge transformation pour renommer ces champs avant de les y insérer. Timestream
Bonnes pratiques
Traitement par lots et mise en mémoire tampon
Configurez la fenêtre et la taille de traitement par lots afin d'équilibrer la latence d'écriture et l'efficacité du traitement.
Utilisez une fenêtre de traitement par lots pour accumuler suffisamment de données avant le traitement, ce qui réduit les frais liés aux petits lots fréquents.
Traitement parallèle
Utilisez ce ParallelizationFactorparamètre pour augmenter la simultanéité, en particulier pour les flux à haut débit. Cela garantit que plusieurs lots de chaque partition peuvent être traités simultanément.
Transformation des données
Tirez parti des capacités de transformation de EventBridge Pipes pour filtrer et améliorer les enregistrements avant de les stocker Timestream. Cela peut vous aider à aligner les données sur vos exigences analytiques.
Sécurité
Assurez-vous que les IAM rôles utilisés pour EventBridge Pipes disposent des autorisations nécessaires pour lire Kinesis et écrire Timestream.
Utilisez des mesures de cryptage et de contrôle d'accès pour sécuriser les données en transit et au repos.
Défaillances de débogage
-
Désactivation automatique des canalisations
Les canaux seront automatiquement désactivés dans environ 2 heures si la cible n'existe pas ou présente des problèmes d'autorisation
-
Throttles
Les tuyaux ont la capacité de s'arrêter automatiquement et de réessayer jusqu'à ce que les gaz soient réduits.
-
Activation des journaux
Nous vous recommandons d'activer les journaux au ERROR niveau 1 et d'inclure les données d'exécution pour mieux comprendre les échecs. En cas de panne, ces journaux contiendront des informations request/response sent/received de Timestream. Cela vous permet de comprendre l'erreur associée et, si nécessaire, de retraiter les enregistrements après l'avoir corrigée.
Surveillance
Nous vous recommandons de configurer des alarmes sur les points suivants afin de détecter tout problème lié au flux de données :
Âge maximal de l'enregistrement dans la source
GetRecords.IteratorAgeMilliseconds
Mesures de défaillance dans les canalisations
ExecutionFailed
TargetStageFailed
Timestream APIErreurs d'écriture
UserErrors
Pour des mesures de surveillance supplémentaires, consultez la section Surveillance EventBridge dans le guide de EventBridge l'utilisateur.