Implementasikan produsen - Amazon Kinesis Data Streams

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Implementasikan produsen

Aplikasi dalam Tutorial: Memproses data stok real-time menggunakan KPL dan KCL 1.x menggunakan skenario dunia nyata pemantauan perdagangan pasar saham. Prinsip-prinsip berikut menjelaskan secara singkat bagaimana skenario ini memetakan ke produsen dan struktur kode pendukung.

Lihat kode sumber dan tinjau informasi berikut.

StockTrade kelas

Perdagangan saham individu diwakili oleh contoh StockTrade kelas. Contoh ini berisi atribut seperti simbol ticker, harga, jumlah saham, jenis perdagangan (beli atau jual), dan ID yang secara unik mengidentifikasi perdagangan. Kelas ini diterapkan untuk Anda.

Rekam aliran

Aliran adalah urutan catatan. Rekaman adalah serialisasi StockTrade instance dalam JSON format. Sebagai contoh:

{ "tickerSymbol": "AMZN", "tradeType": "BUY", "price": 395.87, "quantity": 16, "id": 3567129045 }
StockTradeGenerator kelas

StockTradeGeneratormemiliki metode getRandomTrade() yang disebut yang mengembalikan perdagangan saham baru yang dihasilkan secara acak setiap kali dipanggil. Kelas ini diterapkan untuk Anda.

StockTradesWriter kelas

mainMetode produsen, StockTradesWriter terus mengambil perdagangan acak dan kemudian mengirimkannya ke Kinesis Data Streams dengan melakukan tugas-tugas berikut:

  1. Membaca nama aliran dan nama Wilayah sebagai masukan.

  2. Menciptakan sebuahAmazonKinesisClientBuilder.

  3. Menggunakan pembuat klien untuk mengatur Region, kredensial, dan konfigurasi klien.

  4. Membangun AmazonKinesis klien menggunakan pembuat klien.

  5. Memeriksa apakah aliran ada dan aktif (jika tidak, ia keluar dengan kesalahan).

  6. Dalam loop kontinu, panggil StockTradeGenerator.getRandomTrade() metode dan kemudian sendStockTrade metode untuk mengirim perdagangan ke aliran setiap 100 milidetik.

sendStockTradeMetode StockTradesWriter kelas memiliki kode berikut:

private static void sendStockTrade(StockTrade trade, AmazonKinesis kinesisClient, String streamName) { byte[] bytes = trade.toJsonAsBytes(); // The bytes could be null if there is an issue with the JSON serialization by the Jackson JSON library. if (bytes == null) { LOG.warn("Could not get JSON bytes for stock trade"); return; } LOG.info("Putting trade: " + trade.toString()); PutRecordRequest putRecord = new PutRecordRequest(); putRecord.setStreamName(streamName); // We use the ticker symbol as the partition key, explained in the Supplemental Information section below. putRecord.setPartitionKey(trade.getTickerSymbol()); putRecord.setData(ByteBuffer.wrap(bytes)); try { kinesisClient.putRecord(putRecord); } catch (AmazonClientException ex) { LOG.warn("Error sending record to Amazon Kinesis.", ex); } }

Lihat rincian kode berikut:

  • PutRecordAPIMengharapkan array byte, dan Anda harus mengkonversi trade ke JSON format. Baris kode tunggal ini melakukan operasi itu:

    byte[] bytes = trade.toJsonAsBytes();
  • Sebelum Anda dapat mengirim perdagangan, Anda membuat PutRecordRequest instance baru (dipanggil putRecord dalam kasus ini):

    PutRecordRequest putRecord = new PutRecordRequest();

    Setiap PutRecord panggilan memerlukan nama aliran, kunci partisi, dan gumpalan data. Kode berikut mengisi bidang-bidang ini dalam putRecord objek menggunakan setXxxx() metodenya:

    putRecord.setStreamName(streamName); putRecord.setPartitionKey(trade.getTickerSymbol()); putRecord.setData(ByteBuffer.wrap(bytes));

    Contoh menggunakan tiket saham sebagai kunci partisi, yang memetakan catatan ke pecahan tertentu. Dalam praktiknya, Anda harus memiliki ratusan atau ribuan kunci partisi per pecahan sehingga catatan tersebar merata di seluruh aliran Anda. Untuk informasi selengkapnya tentang cara menambahkan data ke aliran, lihatTambahkan data ke aliran.

    Sekarang putRecord siap untuk mengirim ke klien (putoperasi):

    kinesisClient.putRecord(putRecord);
  • Pemeriksaan kesalahan dan pencatatan selalu merupakan tambahan yang berguna. Kode ini mencatat kondisi kesalahan:

    if (bytes == null) { LOG.warn("Could not get JSON bytes for stock trade"); return; }

    Tambahkan blok coba/tangkap di sekitar operasi: put

    try { kinesisClient.putRecord(putRecord); } catch (AmazonClientException ex) { LOG.warn("Error sending record to Amazon Kinesis.", ex); }

    Ini karena operasi Kinesis put Data Streams dapat gagal karena kesalahan jaringan, atau karena aliran mencapai batas throughputnya dan terhambat. Sebaiknya pertimbangkan dengan cermat kebijakan coba ulang Anda untuk put operasi untuk menghindari kehilangan data, seperti menggunakan coba lagi.

  • Pencatatan status sangat membantu tetapi opsional:

    LOG.info("Putting trade: " + trade.toString());

Produser yang ditampilkan di sini menggunakan fungsi rekaman tunggal Kinesis API Data Streams,. PutRecord Dalam prakteknya, jika produsen individu menghasilkan banyak catatan, seringkali lebih efisien untuk menggunakan fungsi beberapa catatan PutRecords dan mengirim batch catatan pada suatu waktu. Untuk informasi selengkapnya, lihat Tambahkan data ke aliran.

Untuk menjalankan produser
  1. Verifikasi bahwa kunci akses dan key pair rahasia yang diambil sebelumnya (saat membuat IAM pengguna) disimpan dalam file~/.aws/credentials.

  2. Jalankan StockTradeWriter kelas dengan argumen berikut:

    StockTradeStream us-west-2

    Jika Anda membuat streaming di Wilayah selainus-west-2, Anda harus menentukan Wilayah tersebut di sini.

Anda akan melihat output yang serupa dengan yang berikut:

Feb 16, 2015 3:53:00 PM com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade INFO: Putting trade: ID 8: SELL 996 shares of BUD for $124.18 Feb 16, 2015 3:53:00 PM com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade INFO: Putting trade: ID 9: BUY 159 shares of GE for $20.85 Feb 16, 2015 3:53:01 PM com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade INFO: Putting trade: ID 10: BUY 322 shares of WMT for $90.08

Aliran perdagangan saham Anda sekarang sedang dicerna oleh Kinesis Data Streams.

Langkah selanjutnya

Implementasikan konsumen