Amazon Managed Service untuk Apache Flink sebelumnya dikenal sebagai Amazon Kinesis Data Analytics untuk Apache Flink.
Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.
Memulai (Scala)
catatan
Mulai dari versi 1.15, Flink gratis Scala. Aplikasi sekarang dapat menggunakan Java API dari versi Scala apa pun. Flink masih menggunakan Scala di beberapa komponen kunci secara internal, tetapi tidak mengekspos Scala ke dalam classloader kode pengguna. Karena itu, Anda harus menambahkan dependensi Scala ke -archive Anda. JAR
Untuk informasi selengkapnya tentang perubahan Scala di Flink 1.15, lihat Scala Free
Dalam latihan ini, Anda membuat Layanan Terkelola untuk aplikasi Apache Flink untuk Scala dengan aliran Kinesis sebagai sumber dan wastafel.
Topik ini berisi bagian-bagian berikut:
Buat sumber daya yang bergantung
Sebelum Anda membuat Layanan Terkelola untuk aplikasi Apache Flink untuk latihan ini, Anda membuat sumber daya dependen berikut:
Dua aliran Kinesis untuk input dan output.
Bucket Amazon S3 untuk menyimpan kode aplikasi (
ka-app-code-
)<username>
Anda dapat membuat aliran Kinesis dan bucket Amazon S3 menggunakan konsol. Untuk petunjuk membuat sumber daya ini, lihat topik berikut:
Membuat dan Memperbarui Aliran Data di Panduan Developer Amazon Kinesis Data Streams. Beri nama aliran data
ExampleInputStream
danExampleOutputStream
Anda.Untuk membuat aliran data AWS CLI
Untuk membuat stream (
ExampleInputStream
) pertama, gunakan perintah Amazon Kinesis AWS CLI create-stream berikut.aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser
Untuk membuat aliran kedua yang digunakan aplikasi untuk menulis output, jalankan perintah yang sama, yang mengubah nama aliran menjadi
ExampleOutputStream
.aws kinesis create-stream \ --stream-name ExampleOutputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser
Bagaimana Cara Membuat Bucket S3? di Panduan Pengguna Layanan Penyimpanan Sederhana Amazon. Beri bucket Amazon S3 nama yang unik secara global dengan menambahkan nama login Anda, seperti
ka-app-code-
.<username>
Sumber daya lainnya
Saat Anda membuat aplikasi, Managed Service for Apache Flink akan membuat CloudWatch resource Amazon berikut jika belum ada:
Sebuah grup log yang disebut
/AWS/KinesisAnalytics-java/MyApplication
Aliran log yang disebut
kinesis-analytics-log-stream
Tulis catatan sampel ke aliran input
Di bagian ini, Anda menggunakan script Python untuk menulis catatan sampel ke aliran untuk diproses aplikasi.
catatan
Bagian ini memerlukan AWS SDK for Python (Boto)
catatan
Skrip Python di bagian ini menggunakan AWS CLI. Anda harus mengonfigurasi AWS CLI untuk menggunakan kredensyal akun dan wilayah default Anda. Untuk mengkonfigurasi Anda AWS CLI, masukkan yang berikut ini:
aws configure
-
Buat file bernama
stock.py
dengan konten berikut:import datetime import json import random import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return { 'event_time': datetime.datetime.now().isoformat(), 'ticker': random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']), 'price': round(random.random() * 100, 2)} def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey") if __name__ == '__main__': generate(STREAM_NAME, boto3.client('kinesis', region_name='us-west-2'))
-
Jalankan skrip
stock.py
.$ python stock.py
Biarkan skrip tetap berjalan saat menyelesaikan sisa tutorial.
Unduh dan periksa kode aplikasi
Kode aplikasi Python untuk contoh ini tersedia dari. GitHub Untuk mengunduh kode aplikasi, lakukan hal berikut:
Instal klien Git jika Anda belum menginstalnya. Untuk informasi selengkapnya, lihat Menginstal Git
. Klon repositori jarak jauh dengan perintah berikut:
git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-examples.git
Buka direktori
amazon-kinesis-data-analytics-java-examples/scala/GettingStarted
tersebut.
Perhatikan hal tentang kode aplikasi berikut:
build.sbt
File berisi informasi tentang konfigurasi dan dependensi aplikasi, termasuk Layanan Terkelola untuk pustaka Apache Flink.BasicStreamingJob.scala
File berisi metode utama yang mendefinisikan fungsionalitas aplikasi.Aplikasi menggunakan sumber Kinesis untuk membaca dari aliran sumber. Cuplikan berikut ini membuat sumber Kinesis:
private def createSource: FlinkKinesisConsumer[String] = { val applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties val inputProperties = applicationProperties.get("ConsumerConfigProperties") new FlinkKinesisConsumer[String](inputProperties.getProperty(streamNameKey, defaultInputStreamName), new SimpleStringSchema, inputProperties) }
Aplikasi ini juga menggunakan sink Kinesis untuk menulis ke dalam aliran hasil. Cuplikan berikut membuat sink Kinesis:
private def createSink: KinesisStreamsSink[String] = { val applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties val outputProperties = applicationProperties.get("ProducerConfigProperties") KinesisStreamsSink.builder[String] .setKinesisClientProperties(outputProperties) .setSerializationSchema(new SimpleStringSchema) .setStreamName(outputProperties.getProperty(streamNameKey, defaultOutputStreamName)) .setPartitionKeyGenerator((element: String) => String.valueOf(element.hashCode)) .build }
Aplikasi ini membuat konektor sumber dan wastafel untuk mengakses sumber daya eksternal menggunakan StreamExecutionEnvironment objek.
Aplikasi ini membuat konektor sumber dan wastafel menggunakan properti aplikasi dinamis. Properti aplikasi runtime dibaca untuk mengkonfigurasi konektor. Untuk informasi selengkapnya tentang properti runtime, lihat Properti Runtime.
Kompilasi dan unggah kode aplikasi
Di bagian ini, Anda mengkompilasi dan mengunggah kode aplikasi Anda ke bucket Amazon S3 yang Anda buat di Buat sumber daya yang bergantung bagian tersebut.
Kompilasi Kode Aplikasi
Di bagian ini, Anda menggunakan alat SBT
Untuk menggunakan kode aplikasi Anda, Anda mengkompilasi dan mengemasnya ke dalam JAR file. Anda dapat mengkompilasi dan mengemas kode Anda denganSBT:
sbt assembly
-
Jika aplikasi berhasil mengompilasi, file berikut dibuat:
target/scala-3.2.0/getting-started-scala-1.0.jar
Unggah Kode Scala Streaming Apache Flink
Di bagian ini, Anda membuat bucket Amazon S3 dan mengunggah kode aplikasi Anda.
Buka konsol Amazon S3 di. https://console.aws.amazon.com/s3/
Pilih Buat ember
Masukkan
ka-app-code-<username>
di bidang Bucket name (Nama bucket). Tambahkan sufiks ke nama bucket, seperti nama pengguna Anda, untuk membuatnya unik secara global. Pilih Next (Selanjutnya).Di opsi Konfigurasi, pertahankan pengaturan apa adanya, dan pilih Berikutnya.
Di Setel izin, pertahankan pengaturan apa adanya, dan pilih Berikutnya.
Pilih Buat bucket.
Pilih
ka-app-code-<username>
bucket, lalu pilih Unggah.-
Di langkah Pilih file, pilih Add files (Tambahkan berkas). Navigasikan ke file
getting-started-scala-1.0.jar
yang Anda buat di langkah sebelumnya. Anda tidak perlu mengubah pengaturan apa pun untuk objek, jadi pilih Upload (Unggah).
Kode aplikasi Anda sekarang disimpan di bucket Amazon S3 yang dapat diakses aplikasi Anda.