Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.
Implémenter le producteur
Ce didacticiel utilise le scénario réel de surveillance des opérations boursières. Les principes suivants expliquent brièvement comment ce scénario est mis en correspondance avec l’application producteur et la structure de code associée.
Reportez-vous au code source
- StockTrade classe
-
Une transaction boursière individuelle est représentée par une instance de la StockTrade classe. Cette instance contient des attributs tels que le symbole boursier, le prix, le nombre d'actions, le type de l'opération (achat ou vente) et un ID identifiant l'opération de manière unique. Cette classe est implémentée pour vous.
- Enregistrement de flux
-
Un flux est une séquence d'enregistrements. Un enregistrement est une sérialisation d'une
StockTrade
instance au JSON format. Par exemple :{ "tickerSymbol": "AMZN", "tradeType": "BUY", "price": 395.87, "quantity": 16, "id": 3567129045 }
- StockTradeGenerator classe
-
StockTradeGenerator possède une méthode appelée
getRandomTrade()
qui renvoie une nouvelle transaction boursière générée de manière aléatoire chaque fois qu'elle est invoquée. Cette classe est implémentée pour vous. - StockTradesWriter classe
-
La
main
méthode du producteur récupère StockTradesWriter en permanence une transaction aléatoire, puis l'envoie à Kinesis Data Streams en effectuant les tâches suivantes :-
Lit le nom du flux de données et le nom de la région en entrée.
-
Utilise le
KinesisAsyncClientBuilder
pour définir la région, les informations d'identification et la configuration du client. -
Elle vérifie que le flux existe et qu'il est actif (sinon il se ferme et génère une erreur).
-
Dans une boucle continue, elle appelle la méthode
StockTradeGenerator.getRandomTrade()
, puis la méthodesendStockTrade
pour envoyer l'opération boursière au flux toutes les 100 millisecondes.
La méthode
sendStockTrade
de la classeStockTradesWriter
comprend le code suivant :private static void sendStockTrade(StockTrade trade, KinesisAsyncClient kinesisClient, String streamName) { byte[] bytes = trade.toJsonAsBytes(); // The bytes could be null if there is an issue with the JSON serialization by the Jackson JSON library. if (bytes == null) { LOG.warn("Could not get JSON bytes for stock trade"); return; } LOG.info("Putting trade: " + trade.toString()); PutRecordRequest request = PutRecordRequest.builder() .partitionKey(trade.getTickerSymbol()) // We use the ticker symbol as the partition key, explained in the Supplemental Information section below. .streamName(streamName) .data(SdkBytes.fromByteArray(bytes)) .build(); try { kinesisClient.putRecord(request).get(); } catch (InterruptedException e) { LOG.info("Interrupted, assuming shutdown."); } catch (ExecutionException e) { LOG.error("Exception while sending data to Kinesis. Will try again next cycle.", e); } }
Reportez-vous à la ventilation de code suivante :
-
Le
PutRecord
API attend un tableau d'octets, et vous devez convertir le trade en JSON format. La seule ligne de code suivante effectue cette opération :byte[] bytes = trade.toJsonAsBytes();
-
Avant d’envoyer l'opération, vous créez une nouvelle instance
PutRecordRequest
(appelée request dans le cas suivant) : Chaque appel derequest
requiert le nom du flux, la clé de partition et un blob de données.PutPutRecordRequest request = PutRecordRequest.builder() .partitionKey(trade.getTickerSymbol()) // We use the ticker symbol as the partition key, explained in the Supplemental Information section below. .streamName(streamName) .data(SdkBytes.fromByteArray(bytes)) .build();
L'exemple utilise un ticker boursier comme clé de partition, qui fait correspondre l'enregistrement à une partition spécifique. En pratique, vous devez avoir des centaines ou des milliers de clés de partition par partition afin de répartir les enregistrements de façon égale dans votre flux. Pour plus d'informations sur la façon d'ajouter des données à un flux, consultez la pageÉcrire des données dans Amazon Kinesis Data Streams.
Maintenant
request
est prêt à envoyer au client (l'opération put) :kinesisClient.putRecord(request).get();
-
La vérification des erreurs et leur consignation sont toujours très utiles. Le code suivant consigne les conditions d'erreur :
if (bytes == null) { LOG.warn("Could not get JSON bytes for stock trade"); return; }
Ajoutez le bloc try/catch autour de l'opération
put
:try { kinesisClient.putRecord(request).get(); } catch (InterruptedException e) { LOG.info("Interrupted, assuming shutdown."); } catch (ExecutionException e) { LOG.error("Exception while sending data to Kinesis. Will try again next cycle.", e); }
Une opération put de Kinesis Data Streams peut échouer en raison d’une erreur réseau ou si le flux de données atteint ses limites de débit. Il est recommandé d'examiner attentivement votre politique de nouvelle tentative pour les
put
opérations afin d'éviter toute perte de données, par exemple en utilisant une nouvelle tentative. -
La journalisation d'état est très utile, mais elle est facultative :
LOG.info("Putting trade: " + trade.toString());
Le producteur présenté ici utilise la fonctionnalité
PutRecord
d'enregistrement unique de Kinesis Data API Streams,. En pratique, si une application producteur génère de nombreux enregistrements, il est souvent plus efficace d'utiliser la fonctionnalité Plusieurs enregistrements dePutRecords
et d'envoyer les lots d'enregistrements en même temps. Pour de plus amples informations, veuillez consulter Écrire des données dans Amazon Kinesis Data Streams. -
Pour exécuter l'application producteur
-
Vérifiez que la clé d'accès et la paire de clés secrètes récupérées dans Création d'une IAM politique et d'un utilisateur sont enregistrées dans le fichier
~/.aws/credentials
. -
Exécutez la classe
StockTradeWriter
avec les arguments suivants :StockTradeStream us-west-2
Si vous avez créé votre flux dans une région autre que
us-west-2
, vous devez spécifier cette région ici.
Vous devez voir des résultats similaires à ce qui suit :
Feb 16, 2015 3:53:00 PM
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 8: SELL 996 shares of BUD for $124.18
Feb 16, 2015 3:53:00 PM
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 9: BUY 159 shares of GE for $20.85
Feb 16, 2015 3:53:01 PM
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 10: BUY 322 shares of WMT for $90.08
Vos transactions boursières sont maintenant ingérées par Kinesis Data Streams.
Étapes suivantes
Mettre en œuvre le consommateur