Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.
Implementasikan produsen
Tutorial ini menggunakan skenario dunia nyata pemantauan perdagangan pasar saham. Prinsip-prinsip berikut menjelaskan secara singkat bagaimana skenario ini memetakan ke produsen dan struktur kode pendukungnya.
Lihat kode sumber
- StockTrade kelas
-
Perdagangan saham individu diwakili oleh contoh StockTrade kelas. Contoh ini berisi atribut seperti simbol ticker, harga, jumlah saham, jenis perdagangan (beli atau jual), dan ID yang secara unik mengidentifikasi perdagangan. Kelas ini diterapkan untuk Anda.
- Rekam aliran
-
Aliran adalah urutan catatan. Rekaman adalah serialisasi
StockTrade
instance dalam JSON format. Sebagai contoh:{ "tickerSymbol": "AMZN", "tradeType": "BUY", "price": 395.87, "quantity": 16, "id": 3567129045 }
- StockTradeGenerator kelas
-
StockTradeGenerator memiliki metode
getRandomTrade()
yang disebut yang mengembalikan perdagangan saham baru yang dihasilkan secara acak setiap kali dipanggil. Kelas ini diterapkan untuk Anda. - StockTradesWriter kelas
-
main
Metode produsen, StockTradesWriter terus mengambil perdagangan acak dan kemudian mengirimkannya ke Kinesis Data Streams dengan melakukan tugas-tugas berikut:-
Membaca nama aliran data dan nama Wilayah sebagai masukan.
-
Menggunakan
KinesisAsyncClientBuilder
untuk mengatur Region, kredensial, dan konfigurasi klien. -
Memeriksa apakah aliran ada dan aktif (jika tidak, ia keluar dengan kesalahan).
-
Dalam loop kontinu, panggil
StockTradeGenerator.getRandomTrade()
metode dan kemudiansendStockTrade
metode untuk mengirim perdagangan ke aliran setiap 100 milidetik.
sendStockTrade
MetodeStockTradesWriter
kelas memiliki kode berikut:private static void sendStockTrade(StockTrade trade, KinesisAsyncClient kinesisClient, String streamName) { byte[] bytes = trade.toJsonAsBytes(); // The bytes could be null if there is an issue with the JSON serialization by the Jackson JSON library. if (bytes == null) { LOG.warn("Could not get JSON bytes for stock trade"); return; } LOG.info("Putting trade: " + trade.toString()); PutRecordRequest request = PutRecordRequest.builder() .partitionKey(trade.getTickerSymbol()) // We use the ticker symbol as the partition key, explained in the Supplemental Information section below. .streamName(streamName) .data(SdkBytes.fromByteArray(bytes)) .build(); try { kinesisClient.putRecord(request).get(); } catch (InterruptedException e) { LOG.info("Interrupted, assuming shutdown."); } catch (ExecutionException e) { LOG.error("Exception while sending data to Kinesis. Will try again next cycle.", e); } }
Lihat rincian kode berikut:
-
PutRecord
APIMengharapkan array byte, dan Anda harus mengonversi perdagangan ke JSON format. Baris kode tunggal ini melakukan operasi itu:byte[] bytes = trade.toJsonAsBytes();
-
Sebelum Anda dapat mengirim perdagangan, Anda membuat
PutRecordRequest
instance baru (disebut permintaan dalam kasus ini). Masing-masingrequest
membutuhkan nama aliran, kunci partisi, dan gumpalan data.PutPutRecordRequest request = PutRecordRequest.builder() .partitionKey(trade.getTickerSymbol()) // We use the ticker symbol as the partition key, explained in the Supplemental Information section below. .streamName(streamName) .data(SdkBytes.fromByteArray(bytes)) .build();
Contoh menggunakan ticker saham sebagai kunci partisi, yang memetakan catatan ke pecahan tertentu. Dalam praktiknya, Anda harus memiliki ratusan atau ribuan kunci partisi per pecahan sehingga catatan tersebar merata di seluruh aliran Anda. Untuk informasi selengkapnya tentang cara menambahkan data ke aliran, lihatMenulis data ke Amazon Kinesis Data Streams.
Sekarang
request
siap untuk mengirim ke klien (operasi put):kinesisClient.putRecord(request).get();
-
Pemeriksaan kesalahan dan pencatatan selalu merupakan tambahan yang berguna. Kode ini mencatat kondisi kesalahan:
if (bytes == null) { LOG.warn("Could not get JSON bytes for stock trade"); return; }
Tambahkan blok coba/tangkap di sekitar operasi:
put
try { kinesisClient.putRecord(request).get(); } catch (InterruptedException e) { LOG.info("Interrupted, assuming shutdown."); } catch (ExecutionException e) { LOG.error("Exception while sending data to Kinesis. Will try again next cycle.", e); }
Ini karena operasi put Kinesis Data Streams dapat gagal karena kesalahan jaringan, atau karena aliran data mencapai batas throughputnya dan terhambat. Disarankan agar Anda mempertimbangkan dengan cermat kebijakan coba ulang Anda untuk
put
operasi untuk menghindari kehilangan data, seperti menggunakan coba lagi. -
Pencatatan status sangat membantu tetapi opsional:
LOG.info("Putting trade: " + trade.toString());
Produser yang ditampilkan di sini menggunakan fungsi rekaman tunggal Kinesis API Data Streams,.
PutRecord
Dalam prakteknya, jika produsen individu menghasilkan banyak catatan, seringkali lebih efisien untuk menggunakan fungsi beberapa catatanPutRecords
dan mengirim batch catatan pada suatu waktu. Untuk informasi selengkapnya, lihat Menulis data ke Amazon Kinesis Data Streams. -
Untuk menjalankan produser
-
Verifikasi bahwa kunci akses dan key pair rahasia yang diambil Buat IAM kebijakan dan pengguna disimpan dalam file
~/.aws/credentials
. -
Jalankan
StockTradeWriter
kelas dengan argumen berikut:StockTradeStream us-west-2
Jika Anda membuat streaming di wilayah selain
us-west-2
, Anda harus menentukan wilayah tersebut di sini.
Anda akan melihat output yang serupa dengan yang berikut:
Feb 16, 2015 3:53:00 PM
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 8: SELL 996 shares of BUD for $124.18
Feb 16, 2015 3:53:00 PM
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 9: BUY 159 shares of GE for $20.85
Feb 16, 2015 3:53:01 PM
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 10: BUY 322 shares of WMT for $90.08
Perdagangan saham Anda sekarang sedang dicerna oleh Kinesis Data Streams.