Amazon Managed Service for Apache Flink 之前稱為 Amazon Kinesis Data Analytics for Apache Flink。
本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
資料表API連接器
在 Apache Flink 程式設計模型中,連接器是應用程式用來從外部來源讀取或寫入資料的元件,例如其他服務 AWS 。
透過 Apache Flink 資料表 API,您可以使用下列類型的連接器:
資料表API來源
您可以從資料串流建立資料表來源。下列程式碼會從 Amazon MSK主題建立資料表:
//create the table final FlinkKafkaConsumer<StockRecord> consumer = new FlinkKafkaConsumer<StockRecord>(kafkaTopic, new KafkaEventDeserializationSchema(), kafkaProperties); consumer.setStartFromEarliest(); //Obtain stream DataStream<StockRecord> events = env.addSource(consumer); Table table = streamTableEnvironment.fromDataStream(events);
如需資料表來源的詳細資訊,請參閱 Apache Flink 文件中的資料表和SQL連接器
資料表API接收器
若要將資料表資料寫入 接收端,請在 中建立接收端SQL,然後在 StreamTableEnvironment
物件上執行 SQL型接收端。
下列程式碼範例示範如何將資料表資料寫入 Amazon S3 接收器:
final String s3Sink = "CREATE TABLE sink_table (" + "event_time TIMESTAMP," + "ticker STRING," + "price DOUBLE," + "dt STRING," + "hr STRING" + ")" + " PARTITIONED BY (ticker,dt,hr)" + " WITH" + "(" + " 'connector' = 'filesystem'," + " 'path' = '" + s3Path + "'," + " 'format' = 'json'" + ") "; //send to s3 streamTableEnvironment.executeSql(s3Sink); filteredTable.executeInsert("sink_table");
您可以使用 format
參數來控制 Managed Service for Apache Flink 用來將輸出寫入接收器的格式。如需有關格式的資訊,請參閱 Apache Flink 文件中的支援的連接器
使用者定義的來源和接收器
您可以使用現有的 Apache Kafka 連接器,在 Amazon MSK和 Amazon S3 等 AWS 其他服務之間傳送資料。若要與其他資料來源和目的地互動,您可以定義自己的來源和接收器。如需詳細資訊,請參閱 Apache Flink 文件中的使用者定義來源和接收