Amazon Managed Service untuk Apache Flink sebelumnya dikenal sebagai Amazon Kinesis Data Analytics untuk Apache Flink.
Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.
Membuat dan menjalankan Managed Service untuk aplikasi Apache Flink
Pada langkah ini, Anda membuat Layanan Terkelola untuk aplikasi Apache Flink dengan aliran data Kinesis sebagai sumber dan wastafel.
Bagian ini berisi langkah-langkah berikut.
- Buat sumber daya yang bergantung
- Siapkan lingkungan pengembangan lokal Anda
- Unduh dan periksa kode Java streaming Apache Flink
- Tulis catatan sampel ke aliran input
- Jalankan aplikasi Anda secara lokal
- Amati data input dan output dalam aliran Kinesis
- Menghentikan aplikasi Anda berjalan secara lokal
- Kompilasi dan paket kode aplikasi Anda
- Unggah JAR file kode aplikasi
- Buat dan konfigurasikan Layanan Terkelola untuk aplikasi Apache Flink
- Langkah selanjutnya
Buat sumber daya yang bergantung
Sebelum Anda membuat Layanan Terkelola untuk aplikasi Apache Flink untuk latihan ini, Anda membuat sumber daya dependen berikut:
-
Dua aliran data Kinesis untuk input dan output
-
Bucket Amazon S3 untuk menyimpan kode aplikasi
catatan
Tutorial ini mengasumsikan bahwa Anda menerapkan aplikasi Anda di wilayah us-east-1 US East (N. Virginia). Jika Anda menggunakan Wilayah lain, sesuaikan semua langkah yang sesuai.
Buat dua aliran data Amazon Kinesis
Sebelum Anda membuat Layanan Terkelola untuk aplikasi Apache Flink untuk latihan ini, buat dua aliran data Kinesis (dan). ExampleInputStream
ExampleOutputStream
Aplikasi Anda menggunakan aliran ini untuk sumber aplikasi dan aliran tujuan.
Anda dapat membuat aliran ini menggunakan konsol Amazon Kinesis atau perintah berikut. AWS CLI Untuk instruksi konsol, lihat Membuat dan Memperbarui Aliran Data di Panduan Developer Amazon Kinesis Data Streams. Untuk membuat aliran menggunakan AWS CLI, gunakan perintah berikut, sesuaikan dengan Wilayah yang Anda gunakan untuk aplikasi Anda.
Untuk membuat aliran data AWS CLI
-
Untuk membuat stream (
ExampleInputStream
) pertama, gunakan perintah Amazon Kinesiscreate-stream
AWS CLI berikut:$ aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-east-1 \
-
Untuk membuat aliran kedua yang digunakan aplikasi untuk menulis output, jalankan perintah yang sama, ubah nama aliran menjadi
ExampleOutputStream
:$ aws kinesis create-stream \ --stream-name ExampleOutputStream \ --shard-count 1 \ --region us-east-1 \
Buat bucket Amazon S3 untuk kode aplikasi
Anda dapat membuat bucket Amazon S3 menggunakan konsol. Untuk mempelajari cara membuat bucket Amazon S3 menggunakan konsol, lihat Membuat bucket di Panduan Pengguna Amazon S3. Beri nama bucket Amazon S3 menggunakan nama yang unik secara global, misalnya dengan menambahkan nama login Anda.
catatan
Pastikan Anda membuat bucket di Region yang Anda gunakan untuk tutorial ini (us-east-1).
Sumber daya lainnya
Saat Anda membuat aplikasi, Managed Service for Apache Flink secara otomatis membuat CloudWatch resource Amazon berikut jika belum ada:
-
Sebuah grup log yang disebut
/AWS/KinesisAnalytics-java/<my-application>
-
Aliran log yang disebut
kinesis-analytics-log-stream
Siapkan lingkungan pengembangan lokal Anda
Untuk pengembangan dan debugging, Anda dapat menjalankan aplikasi Apache Flink di mesin Anda langsung dari pilihan AndaIDE. Setiap dependensi Apache Flink ditangani seperti dependensi Java biasa menggunakan Apache Maven.
catatan
Pada mesin pengembangan Anda, Anda harus menginstal Java JDK 11, Maven, dan Git. Kami menyarankan Anda menggunakan lingkungan pengembangan seperti Eclipse Java Neon atau
Otentikasi sesi Anda AWS
Aplikasi ini menggunakan aliran data Kinesis untuk mempublikasikan data. Saat berjalan secara lokal, Anda harus memiliki sesi AWS otentikasi yang valid dengan izin untuk menulis ke aliran data Kinesis. Gunakan langkah-langkah berikut untuk mengautentikasi sesi Anda:
-
Jika Anda tidak memiliki AWS CLI dan profil bernama dengan kredensi valid yang dikonfigurasi, lihatMengatur AWS Command Line Interface (AWS CLI).
-
Verifikasi bahwa Anda AWS CLI telah dikonfigurasi dengan benar dan pengguna Anda memiliki izin untuk menulis ke aliran data Kinesis dengan menerbitkan catatan pengujian berikut:
$ aws kinesis put-record --stream-name ExampleOutputStream --data TEST --partition-key TEST
-
Jika Anda IDE memiliki plugin untuk diintegrasikan AWS, Anda dapat menggunakannya untuk meneruskan kredensil ke aplikasi yang berjalan di file. IDE Untuk informasi selengkapnya, lihat AWS Toolkit untuk IDEA AWS IntelliJ dan Toolkit for
Eclipse.
Unduh dan periksa kode Java streaming Apache Flink
Kode aplikasi Java untuk contoh ini tersedia dari GitHub. Untuk mengunduh kode aplikasi, lakukan hal berikut:
-
Kloning repositori jarak jauh menggunakan perintah berikut:
git clone https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples.git
-
Buka direktori
amazon-managed-service-for-apache-flink-examples/tree/main/java/GettingStarted
tersebut.
Tinjau komponen aplikasi
Aplikasi ini sepenuhnya diimplementasikan di com.amazonaws.services.msf.BasicStreamingJob
kelas. main()
Metode ini mendefinisikan aliran data untuk memproses data streaming dan menjalankannya.
catatan
Untuk pengalaman pengembang yang dioptimalkan, aplikasi ini dirancang untuk berjalan tanpa perubahan kode apa pun baik di Amazon Managed Service untuk Apache Flink maupun secara lokal, untuk pengembangan di Anda. IDE
-
Untuk membaca konfigurasi runtime sehingga akan berfungsi saat berjalan di Amazon Managed Service untuk Apache Flink dan di aplikasi AndaIDE, aplikasi secara otomatis mendeteksi apakah itu berjalan mandiri secara lokal di. IDE Dalam hal ini, aplikasi memuat konfigurasi runtime secara berbeda:
-
Saat aplikasi mendeteksi bahwa aplikasi berjalan dalam mode mandiri di AndaIDE, bentuk
application_properties.json
file yang disertakan dalam folder sumber daya proyek. Isi file berikut. -
Saat aplikasi berjalan di Amazon Managed Service untuk Apache Flink, perilaku default memuat konfigurasi aplikasi dari properti runtime yang akan Anda tentukan di Amazon Managed Service untuk aplikasi Apache Flink. Lihat Buat dan konfigurasikan Layanan Terkelola untuk aplikasi Apache Flink.
private static Map<String, Properties> loadApplicationProperties(StreamExecutionEnvironment env) throws IOException { if (env instanceof LocalStreamEnvironment) { LOGGER.info("Loading application properties from '{}'", LOCAL_APPLICATION_PROPERTIES_RESOURCE); return KinesisAnalyticsRuntime.getApplicationProperties( BasicStreamingJob.class.getClassLoader() .getResource(LOCAL_APPLICATION_PROPERTIES_RESOURCE).getPath()); } else { LOGGER.info("Loading application properties from Amazon Managed Service for Apache Flink"); return KinesisAnalyticsRuntime.getApplicationProperties(); } }
-
-
main()
Metode ini mendefinisikan aliran data aplikasi dan menjalankannya.-
Menginisialisasi lingkungan streaming default. Dalam contoh ini, kami menunjukkan cara membuat kedua
StreamExecutionEnvironment
yang akan digunakan dengan DataSteam API dan yangStreamTableEnvironment
akan digunakan dengan SQL dan TabelAPI. Dua objek lingkungan adalah dua referensi terpisah ke lingkungan runtime yang sama, untuk menggunakan yang berbedaAPIs.StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
Muat parameter konfigurasi aplikasi. Ini akan secara otomatis memuatnya dari tempat yang benar, tergantung di mana aplikasi berjalan:
Map<String, Properties> applicationParameters = loadApplicationProperties(env);
-
Aplikasi mendefinisikan sumber menggunakan konektor Konsumen Kinesis
untuk membaca data dari aliran input. Konfigurasi aliran input didefinisikan dalam PropertyGroupId
=InputStream0
. Nama dan Wilayah aliran berada di properti bernamastream.name
danaws.region
masing-masing. Untuk mempermudah, sumber ini membaca catatan sebagai string.private static FlinkKinesisConsumer<String> createSource(Properties inputProperties) { String inputStreamName = inputProperties.getProperty("stream.name"); return new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties); } ... public static void main(String[] args) throws Exception { ... SourceFunction<String> source = createSource(applicationParameters.get("InputStream0")); DataStream<String> input = env.addSource(source, "Kinesis Source"); ... }
-
Aplikasi kemudian mendefinisikan wastafel menggunakan konektor Kinesis Streams
Sink untuk mengirim data ke aliran output. Nama aliran keluaran dan Wilayah didefinisikan dalam PropertyGroupId
=OutputStream0
, mirip dengan aliran input. Wastafel terhubung langsung ke internalDataStream
yang mendapatkan data dari sumbernya. Dalam aplikasi nyata, Anda memiliki beberapa transformasi antara sumber dan wastafel.private static KinesisStreamsSink<String> createSink(Properties outputProperties) { String outputStreamName = outputProperties.getProperty("stream.name"); return KinesisStreamsSink.<String>builder() .setKinesisClientProperties(outputProperties) .setSerializationSchema(new SimpleStringSchema()) .setStreamName(outputStreamName) .setPartitionKeyGenerator(element -> String.valueOf(element.hashCode())) .build(); } ... public static void main(String[] args) throws Exception { ... Sink<String> sink = createSink(applicationParameters.get("OutputStream0")); input.sinkTo(sink); ... }
-
Akhirnya, Anda menjalankan aliran data yang baru saja Anda tentukan. Ini harus menjadi instruksi terakhir dari
main()
metode ini, setelah Anda mendefinisikan semua operator aliran data membutuhkan:env.execute("Flink streaming Java API skeleton");
-
Gunakan file pom.xml
File pom.xml mendefinisikan semua dependensi yang diperlukan oleh aplikasi dan menyiapkan plugin Maven Shade untuk membangun toples lemak yang berisi semua dependensi yang diperlukan oleh Flink.
-
Beberapa dependensi memiliki
provided
ruang lingkup. Dependensi ini secara otomatis tersedia saat aplikasi berjalan di Amazon Managed Service untuk Apache Flink. Mereka diminta untuk mengkompilasi aplikasi, atau untuk menjalankan aplikasi secara lokal di Anda. IDE Untuk informasi selengkapnya, lihat Jalankan aplikasi Anda secara lokal. Pastikan Anda menggunakan versi Flink yang sama dengan runtime yang akan Anda gunakan di Amazon Managed Service untuk Apache Flink.<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency>
-
Anda harus menambahkan dependensi Apache Flink tambahan ke pom dengan cakupan default, seperti konektor Kinesis
yang digunakan oleh aplikasi ini. Untuk informasi selengkapnya, lihat Gunakan konektor Apache Flink. Anda juga dapat menambahkan dependensi Java tambahan yang diperlukan oleh aplikasi Anda. <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kinesis</artifactId> <version>${aws.connector.version}</version> </dependency>
-
Plugin Maven Java Compiler memastikan bahwa kode dikompilasi terhadap Java 11, JDK versi yang saat ini didukung oleh Apache Flink.
-
Plugin Maven Shade mengemas toples lemak, tidak termasuk beberapa pustaka yang disediakan oleh runtime. Ini juga menentukan dua transformer: dan.
ServicesResourceTransformer
ManifestResourceTransformer
Yang terakhir mengkonfigurasi kelas yang berisimain
metode untuk memulai aplikasi. Jika Anda mengganti nama kelas utama, jangan lupa untuk memperbarui transformator ini. -
<plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> ... <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>com.amazonaws.services.msf.BasicStreamingJob</mainClass> </transformer> ... </plugin>
Tulis catatan sampel ke aliran input
Di bagian ini, Anda akan mengirim catatan sampel ke aliran untuk aplikasi untuk diproses. Anda memiliki dua opsi untuk menghasilkan data sampel, baik menggunakan skrip Python atau Kinesis
Menghasilkan data sampel menggunakan skrip Python
Anda dapat menggunakan skrip Python untuk mengirim catatan sampel ke aliran.
catatan
Untuk menjalankan skrip Python ini, Anda harus menggunakan Python 3.x dan menginstal pustaka for AWS SDKPython
Untuk mulai mengirim data uji ke aliran input Kinesis:
-
Unduh skrip
stock.py
Python generator data dari repositori generator GitHub Data. -
Jalankan skrip
stock.py
.$ python stock.py
Jaga agar skrip tetap berjalan saat Anda menyelesaikan sisa tutorial. Anda sekarang dapat menjalankan aplikasi Apache Flink Anda.
Hasilkan data sampel menggunakan Kinesis Data Generator
Atau menggunakan skrip Python, Anda dapat menggunakan Kinesis Data Generator
Untuk mengatur dan menjalankan Kinesis Data Generator:
-
Ikuti petunjuk dalam dokumentasi Kinesis Data Generator
untuk mengatur akses ke alat. Anda akan menjalankan AWS CloudFormation template yang mengatur pengguna dan kata sandi. -
Akses Kinesis Data Generator melalui yang URL dihasilkan oleh template. CloudFormation Anda dapat menemukan URL di Output tab setelah CloudFormation template selesai.
-
Konfigurasikan generator data:
-
Wilayah: Pilih Wilayah yang Anda gunakan untuk tutorial ini: us-east-1
-
Stream/streaming pengiriman: Pilih aliran input yang akan digunakan aplikasi:
ExampleInputStream
-
Catatan per detik: 100
-
Rekam templat: Salin dan tempel templat berikut:
{ "event_time" : "{{date.now("YYYY-MM-DDTkk:mm:ss.SSSSS")}}, "ticker" : "{{random.arrayElement( ["AAPL", "AMZN", "MSFT", "INTC", "TBV"] )}}", "price" : {{random.number(100)}} }
-
-
Uji template: Pilih template Uji dan verifikasi bahwa catatan yang dihasilkan mirip dengan yang berikut:
{ "event_time" : "2024-06-12T15:08:32.04800, "ticker" : "INTC", "price" : 7 }
-
Mulai generator data: Pilih Pilih Kirim Data.
Kinesis Data Generator sekarang mengirimkan data ke file. ExampleInputStream
Jalankan aplikasi Anda secara lokal
Anda dapat menjalankan dan men-debug aplikasi Flink Anda secara lokal di aplikasi Anda. IDE
catatan
Sebelum melanjutkan, verifikasi bahwa aliran input dan output tersedia. Lihat Buat dua aliran data Amazon Kinesis. Juga, verifikasi bahwa Anda memiliki izin untuk membaca dan menulis dari kedua aliran. Lihat Otentikasi sesi Anda AWS.
Menyiapkan lingkungan pengembangan lokal membutuhkan Java 11JDK, Apache Maven, dan dan IDE untuk pengembangan Java. Verifikasi bahwa Anda memenuhi prasyarat yang diperlukan. Lihat Memenuhi prasyarat untuk menyelesaikan latihan.
Impor proyek Java ke IDE
Untuk mulai mengerjakan aplikasi di AndaIDE, Anda harus mengimpornya sebagai proyek Java.
Repositori yang Anda kloning berisi beberapa contoh. Setiap contoh adalah proyek terpisah. Untuk tutorial ini, impor konten dalam ./java/GettingStarted
subdirektori ke dalam AndaIDE.
Masukkan kode sebagai proyek Java yang ada menggunakan Maven.
catatan
Proses yang tepat untuk mengimpor proyek Java baru bervariasi tergantung pada yang IDE Anda gunakan.
Periksa konfigurasi aplikasi lokal
Saat berjalan secara lokal, aplikasi menggunakan konfigurasi dalam application_properties.json
file di folder sumber daya proyek di bawah./src/main/resources
. Anda dapat mengedit file ini untuk menggunakan nama atau Wilayah aliran Kinesis yang berbeda.
[ { "PropertyGroupId": "InputStream0", "PropertyMap": { "stream.name": "ExampleInputStream", "flink.stream.initpos": "LATEST", "aws.region": "us-east-1" } }, { "PropertyGroupId": "OutputStream0", "PropertyMap": { "stream.name": "ExampleOutputStream", "aws.region": "us-east-1" } } ]
Siapkan konfigurasi IDE run Anda
Anda dapat menjalankan dan men-debug aplikasi Flink dari Anda IDE secara langsung dengan menjalankan kelas utamacom.amazonaws.services.msf.BasicStreamingJob
, karena Anda akan menjalankan aplikasi Java apa pun. Sebelum menjalankan aplikasi, Anda harus mengatur konfigurasi Run. Pengaturan tergantung pada yang IDE Anda gunakan. Misalnya, lihat konfigurasi Jalankan/debug dalam
-
Tambahkan
provided
dependensi ke classpath. Ini diperlukan untuk memastikan bahwa dependensi denganprovided
cakupan diteruskan ke aplikasi saat berjalan secara lokal. Tanpa pengaturan ini, aplikasi segera menampilkanclass not found
kesalahan. -
Lulus AWS kredensi untuk mengakses aliran Kinesis ke aplikasi. Cara tercepat adalah dengan menggunakan AWS Toolkit untuk IDEA IntelliJ
. Menggunakan IDE plugin ini dalam konfigurasi Run, Anda dapat memilih AWS profil tertentu. AWS otentikasi terjadi menggunakan profil ini. Anda tidak perlu memberikan AWS kredensil secara langsung. -
Verifikasi bahwa IDE menjalankan aplikasi menggunakan JDK11.
Jalankan aplikasi di IDE
Setelah Anda mengatur konfigurasi Run untukBasicStreamingJob
, Anda dapat menjalankan atau men-debug seperti aplikasi Java biasa.
catatan
Anda tidak dapat menjalankan toples lemak yang dihasilkan oleh Maven langsung dengan java -jar
...
dari baris perintah. Toples ini tidak berisi dependensi inti Flink yang diperlukan untuk menjalankan aplikasi mandiri.
Ketika aplikasi dimulai dengan sukses, ia mencatat beberapa informasi tentang minicluster mandiri dan inisialisasi konektor. Ini diikuti oleh sejumlah INFO dan beberapa WARN log yang biasanya dipancarkan Flink saat aplikasi dimulai.
13:43:31,405 INFO com.amazonaws.services.msf.BasicStreamingJob [] - Loading application properties from 'flink-application-properties-dev.json' 13:43:31,549 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Flink Kinesis Consumer is going to read the following streams: ExampleInputStream, 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.cpu.cores required for local execution is not set, setting it to the maximal possible value. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.task.heap.size required for local execution is not set, setting it to the maximal possible value. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.task.off-heap.size required for local execution is not set, setting it to the maximal possible value. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.network.min required for local execution is not set, setting it to its default value 64 mb. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.network.max required for local execution is not set, setting it to its default value 64 mb. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.managed.size required for local execution is not set, setting it to its default value 128 mb. 13:43:31,677 INFO org.apache.flink.runtime.minicluster.MiniCluster [] - Starting Flink Mini Cluster ....
Setelah inisialisasi selesai, aplikasi tidak memancarkan entri log lebih lanjut. Saat data mengalir, tidak ada log yang dipancarkan.
Untuk memverifikasi apakah aplikasi memproses data dengan benar, Anda dapat memeriksa aliran Kinesis input dan output, seperti yang dijelaskan di bagian berikut.
catatan
Tidak memancarkan log tentang data yang mengalir adalah perilaku normal untuk aplikasi Flink. Memancarkan log pada setiap catatan mungkin nyaman untuk debugging, tetapi dapat menambahkan overhead yang cukup besar saat berjalan dalam produksi.
Amati data input dan output dalam aliran Kinesis
Anda dapat mengamati catatan yang dikirim ke aliran input oleh (menghasilkan sampel Python) atau Kinesis Data Generator (link) dengan menggunakan Data Viewer di konsol Amazon Kinesis.
Untuk mengamati catatan
Buka konsol Kinesis di /kinesis. https://console.aws.amazon.com
-
Verifikasi bahwa Region sama dengan tempat Anda menjalankan tutorial ini, yaitu us-east-1 US East (Virginia N.) secara default. Ubah Wilayah jika tidak cocok.
-
Pilih Aliran Data.
-
Pilih aliran yang ingin Anda amati, salah satu
ExampleInputStream
atauExampleOutputStream.
-
Pilih tab Penampil data.
-
Pilih Shard apa saja, simpan Terbaru sebagai posisi Awal, lalu pilih Dapatkan catatan. Anda mungkin melihat kesalahan “Tidak ada catatan ditemukan untuk permintaan ini”. Jika demikian, pilih Coba lagi mendapatkan catatan. Catatan terbaru yang diterbitkan ke tampilan streaming.
-
Pilih nilai di kolom Data untuk memeriksa konten rekaman dalam JSON format.
Menghentikan aplikasi Anda berjalan secara lokal
Hentikan aplikasi yang berjalan di AndaIDE. IDEBiasanya menyediakan opsi “berhenti”. Lokasi dan metode yang tepat tergantung pada yang IDE Anda gunakan.
Kompilasi dan paket kode aplikasi Anda
Di bagian ini, Anda menggunakan Apache Maven untuk mengkompilasi kode Java dan mengemasnya ke dalam file. JAR Anda dapat mengkompilasi dan mengemas kode Anda menggunakan alat baris perintah Maven atau Anda. IDE
Untuk mengkompilasi dan paket menggunakan baris perintah Maven:
Pindah ke direktori yang berisi GettingStarted proyek Java dan jalankan perintah berikut:
$ mvn package
Untuk mengkompilasi dan mengemas IDE menggunakan:
Jalankan mvn package
dari integrasi IDE Maven Anda.
Dalam kedua kasus, JAR file berikut dibuat:target/amazon-msf-java-stream-app-1.0.jar
.
catatan
Menjalankan “build project” dari Anda IDE mungkin tidak membuat JAR file.
Unggah JAR file kode aplikasi
Di bagian ini, Anda mengunggah JAR file yang Anda buat di bagian sebelumnya ke bucket Amazon Simple Storage Service (Amazon S3) Simple Storage Service (Amazon S3) yang Anda buat di awal tutorial ini. Jika Anda belum menyelesaikan langkah ini, lihat (tautan).
Untuk mengunggah JAR file kode aplikasi
Buka konsol Amazon S3 di. https://console.aws.amazon.com/s3/
-
Pilih bucket yang sebelumnya Anda buat untuk kode aplikasi.
-
Pilih Unggah.
-
Pilih Tambahkan file.
-
Arahkan ke JAR file yang dihasilkan pada langkah sebelumnya:
target/amazon-msf-java-stream-app-1.0.jar
. -
Pilih Unggah tanpa mengubah pengaturan lainnya.
Awas
Pastikan Anda memilih JAR file yang benar<repo-dir>/java/GettingStarted/target/amazon-msf-java-stream-app-1.0.jar
.
target
Direktori ini juga berisi JAR file lain yang tidak perlu Anda unggah.
Buat dan konfigurasikan Layanan Terkelola untuk aplikasi Apache Flink
Anda dapat membuat dan menjalankan Layanan Terkelola untuk aplikasi Apache Flink menggunakan konsol atau aplikasi. AWS CLI Untuk tutorial ini, Anda akan menggunakan konsol.
catatan
Saat Anda membuat aplikasi menggunakan konsol, resource AWS Identity and Access Management (IAM) dan Amazon CloudWatch Logs dibuat untuk Anda. Saat Anda membuat aplikasi menggunakan AWS CLI, Anda membuat sumber daya ini secara terpisah.
Topik
Buat aplikasi
Untuk membuat aplikasi
Buka Layanan Terkelola untuk konsol Apache Flink di /flink https://console.aws.amazon.com
-
Verifikasi bahwa Wilayah yang benar dipilih: us-east-1 US East (Virginia N.)
-
Buka menu di sebelah kanan dan pilih Apache Flink Applications dan kemudian Buat aplikasi streaming. Atau, pilih Buat aplikasi streaming di wadah Memulai halaman awal.
-
Di halaman Buat aplikasi streaming:
-
Pilih metode untuk mengatur aplikasi pemrosesan aliran: pilih Buat dari awal.
-
Konfigurasi Apache Flink, versi Aplikasi Flink: pilih Apache Flink 1.19.
-
-
Konfigurasikan aplikasi Anda
-
Nama aplikasi: masukkan
MyApplication
. -
Deskripsi: masuk
My java test app
. -
Akses ke sumber daya aplikasi: pilih Buat/IAMperbarui peran
kinesis-analytics-MyApplication-us-east-1
dengan kebijakan yang diperlukan.
-
-
Konfigurasikan Template Anda untuk pengaturan aplikasi
-
Template: pilih Pengembangan.
-
-
Pilih Buat aplikasi streaming di bagian bawah halaman.
catatan
Saat membuat aplikasi Managed Service for Apache Flink menggunakan konsol, Anda memiliki opsi untuk membuat IAM peran dan kebijakan untuk aplikasi Anda. Aplikasi Anda menggunakan peran dan kebijakan ini untuk mengakses sumber daya dependen. IAMSumber daya ini diberi nama menggunakan nama aplikasi dan Wilayah Anda sebagai berikut:
-
Kebijakan:
kinesis-analytics-service-
MyApplication
-us-east-1
-
Peran:
kinesisanalytics-
MyApplication
-us-east-1
Amazon Managed Service untuk Apache Flink sebelumnya dikenal sebagai Kinesis Data Analytics. Nama sumber daya yang dibuat secara otomatis diawali kinesis-analytics-
untuk kompatibilitas mundur.
Edit IAM kebijakan
Edit IAM kebijakan untuk menambahkan izin untuk mengakses aliran data Kinesis.
Untuk mengedit kebijakan
Buka IAM konsol di https://console.aws.amazon.com/iam/
. -
Pilih Policies (Kebijakan). Pilih kebijakan
kinesis-analytics-service-MyApplication-us-east-1
yang dibuat konsol untuk Anda di bagian sebelumnya. -
Pilih Edit dan kemudian pilih JSONtab.
-
Tambahkan bagian yang disorot dari contoh kebijakan berikut ke kebijakan. Ganti akun sampel IDs (
012345678901
) dengan ID akun Anda.{ "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:s3:::my-bucket/kinesis-analytics-placeholder-s3-object" ] }, { "Sid": "ListCloudwatchLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-east-1:
012345678901
:log-group:*" ] }, { "Sid": "ListCloudwatchLogStreams", "Effect": "Allow", "Action": [ "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:us-east-1:012345678901
:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" ] }, { "Sid": "PutCloudwatchLogs", "Effect": "Allow", "Action": [ "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:us-east-1:012345678901
:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-east-1:
] }012345678901
:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-east-1:012345678901
:stream/ExampleOutputStream" } -
Pilih Berikutnya di bagian bawah halaman dan kemudian pilih Simpan perubahan.
Konfigurasikan aplikasi
Edit konfigurasi aplikasi untuk mengatur artefak kode aplikasi.
Untuk mengedit konfigurasi
-
Pada MyApplicationhalaman, pilih Konfigurasi.
-
Di bagian Lokasi kode aplikasi:
-
Untuk bucket Amazon S3, pilih bucket yang sebelumnya Anda buat untuk kode aplikasi. Pilih Browse dan pilih bucket yang benar, lalu pilih Pilih. Jangan klik nama bucket.
-
Untuk Jalur ke objek Amazon S3, masukkan
amazon-msf-java-stream-app-1.0.jar
.
-
-
Untuk izin Akses, pilih Buat/perbarui IAM peran
kinesis-analytics-MyApplication-us-east-1
dengan kebijakan yang diperlukan. -
Di bagian properti Runtime, tambahkan properti berikut.
-
Pilih Tambahkan item baru dan tambahkan masing-masing parameter berikut:
ID Grup Kunci Nilai InputStream0
stream.name
ExampleInputStream
InputStream0
aws.region
us-east-1
OutputStream0
stream.name
ExampleOutputStream
OutputStream0
aws.region
us-east-1
-
Jangan memodifikasi bagian lainnya.
-
Pilih Simpan perubahan.
catatan
Saat Anda memilih untuk mengaktifkan CloudWatch pencatatan Amazon, Layanan Terkelola untuk Apache Flink membuat grup log dan aliran log untuk Anda. Nama-nama sumber daya ini adalah sebagai berikut:
-
Grup log:
/aws/kinesis-analytics/MyApplication
-
Aliran log:
kinesis-analytics-log-stream
Jalankan aplikasi
Aplikasi sekarang dikonfigurasi dan siap dijalankan.
Untuk menjalankan aplikasi
-
Di konsol untuk Amazon Managed Service untuk Apache Flink, pilih Aplikasi Saya dan pilih Jalankan.
-
Pada halaman berikutnya, halaman konfigurasi Pemulihan aplikasi, pilih Jalankan dengan snapshot terbaru dan kemudian pilih Jalankan.
Status dalam Aplikasi merinci transisi dari
Ready
keStarting
dan kemudian keRunning
saat aplikasi telah dimulai.
Saat aplikasi dalam Running
status, Anda sekarang dapat membuka dasbor Flink.
Untuk membuka dasbor
-
Pilih Buka dasbor Apache Flink. Dasbor terbuka di halaman baru.
-
Dalam daftar pekerjaan Runing, pilih satu pekerjaan yang dapat Anda lihat.
catatan
Jika Anda menyetel properti Runtime atau mengedit IAM kebijakan secara tidak benar, status aplikasi mungkin berubah menjadi
Running
, tetapi dasbor Flink menunjukkan bahwa pekerjaan terus dimulai ulang. Ini adalah skenario kegagalan umum jika aplikasi salah konfigurasi atau tidak memiliki izin untuk mengakses sumber daya eksternal.Ketika ini terjadi, periksa tab Pengecualian di dasbor Flink untuk melihat penyebab masalah.
Amati metrik aplikasi yang sedang berjalan
Pada MyApplicationhalaman, di bagian CloudWatch metrik Amazon, Anda dapat melihat beberapa metrik dasar dari aplikasi yang sedang berjalan.
Untuk melihat metrik
-
Di sebelah tombol Refresh, pilih 10 detik dari daftar dropdown.
-
Saat aplikasi berjalan dan sehat, Anda dapat melihat metrik uptime terus meningkat.
-
Metrik fullrestart harus nol. Jika meningkat, konfigurasi mungkin memiliki masalah. Untuk menyelidiki masalah ini, tinjau tab Pengecualian di dasbor Flink.
-
Jumlah metrik pos pemeriksaan yang gagal harus nol dalam aplikasi yang sehat.
catatan
Dasbor ini menampilkan satu set metrik tetap dengan perincian 5 menit. Anda dapat membuat dasbor aplikasi khusus dengan metrik apa pun di CloudWatch dasbor.
Amati data keluaran dalam aliran Kinesis
Pastikan Anda masih mempublikasikan data ke input, baik menggunakan script Python atau Kinesis Data Generator.
Anda sekarang dapat mengamati output dari aplikasi yang berjalan pada Managed Service untuk Apache Flink dengan menggunakan Data Viewer di https://console.aws.amazon.com/kinesis/
Untuk melihat output
Buka konsol Kinesis di /kinesis. https://console.aws.amazon.com
-
Verifikasi bahwa Region sama dengan yang Anda gunakan untuk menjalankan tutorial ini. Secara default, itu adalah AS-Timur-1us Timur (Virginia N.). Ubah Wilayah jika perlu.
-
Pilih Aliran Data.
-
Pilih aliran yang ingin Anda amati. Untuk tutorial ini, gunakan
ExampleOutputStream
. -
Pilih tab Penampil data.
-
Pilih Shard apa saja, simpan Terbaru sebagai posisi Awal, lalu pilih Dapatkan catatan. Anda mungkin melihat kesalahan “tidak ada catatan ditemukan untuk permintaan ini”. Jika demikian, pilih Coba lagi mendapatkan catatan. Catatan terbaru yang diterbitkan ke tampilan streaming.
-
Pilih nilai di kolom Data untuk memeriksa konten catatan dalam JSON format.
Hentikan aplikasi
Untuk menghentikan aplikasi, buka halaman konsol dari Layanan Terkelola untuk aplikasi Apache Flink bernama. MyApplication
Untuk menghentikan aplikasi
-
Dari daftar dropdown Action, pilih Stop.
-
Status dalam Aplikasi merinci transisi dari
Running
keStopping
, dan kemudian keReady
saat aplikasi benar-benar dihentikan.catatan
Jangan lupa juga untuk berhenti mengirim data ke input stream dari script Python atau Kinesis Data Generator.