Gunakan API konektor Tabel - Layanan Terkelola untuk Apache Flink

Amazon Managed Service untuk Apache Flink sebelumnya dikenal sebagai Amazon Kinesis Data Analytics untuk Apache Flink.

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Gunakan API konektor Tabel

Dalam model pemrograman Apache Flink, konektor adalah komponen yang digunakan aplikasi Anda untuk membaca atau menulis data dari sumber eksternal, seperti layanan lain AWS .

Dengan Apache Flink TableAPI, Anda dapat menggunakan jenis konektor berikut:

  • APISumber tabel: Anda menggunakan konektor API sumber Tabel untuk membuat tabel dalam TableEnvironment menggunakan API panggilan atau SQL kueri.

  • APIWastafel meja: Anda menggunakan SQL perintah untuk menulis data tabel ke sumber eksternal seperti MSK topik Amazon atau bucket Amazon S3.

APISumber tabel

Anda membuat sumber tabel dari aliran data. Kode berikut membuat tabel dari MSK topik Amazon:

//create the table final FlinkKafkaConsumer<StockRecord> consumer = new FlinkKafkaConsumer<StockRecord>(kafkaTopic, new KafkaEventDeserializationSchema(), kafkaProperties); consumer.setStartFromEarliest(); //Obtain stream DataStream<StockRecord> events = env.addSource(consumer); Table table = streamTableEnvironment.fromDataStream(events);

Untuk informasi selengkapnya tentang sumber tabel, lihat Tabel & SQL Konektor di Dokumentasi Apache Flink.

APIWastafel meja

Untuk menulis data tabel ke wastafel, Anda membuat wastafelSQL, dan kemudian menjalankan wastafel SQL berbasis pada StreamTableEnvironment objek.

Contoh kode berikut mendemonstrasikan cara menulis data tabel ke sink Amazon S3:

final String s3Sink = "CREATE TABLE sink_table (" + "event_time TIMESTAMP," + "ticker STRING," + "price DOUBLE," + "dt STRING," + "hr STRING" + ")" + " PARTITIONED BY (ticker,dt,hr)" + " WITH" + "(" + " 'connector' = 'filesystem'," + " 'path' = '" + s3Path + "'," + " 'format' = 'json'" + ") "; //send to s3 streamTableEnvironment.executeSql(s3Sink); filteredTable.executeInsert("sink_table");

Anda dapat menggunakan format parameter untuk mengontrol format Managed Service untuk Apache Flink yang digunakan untuk menulis output ke wastafel. Untuk informasi tentang format, lihat Konektor yang Didukung di Dokumentasi Apache Flink.

Sumber dan sink yang ditentukan pengguna

Anda dapat menggunakan konektor Apache Kafka yang ada untuk mengirim data ke dan dari AWS layanan lain, seperti Amazon MSK dan Amazon S3. Untuk berinteraksi dengan sumber data dan tujuan lainnya, Anda dapat menentukan sumber dan sink Anda sendiri. Untuk informasi selengkapnya, lihat Sumber dan Tenggelam yang ditentukan pengguna di Dokumentasi Apache Flink.