資料表API連接器 - Managed Service for Apache Flink

Amazon Managed Service for Apache Flink 之前稱為 Amazon Kinesis Data Analytics for Apache Flink。

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

資料表API連接器

在 Apache Flink 程式設計模型中,連接器是應用程式用來從外部來源讀取或寫入資料的元件,例如其他服務 AWS 。

透過 Apache Flink 資料表 API,您可以使用下列類型的連接器:

  • 資料表API來源:您可以使用資料表API來源連接器,TableEnvironment使用API呼叫或SQL查詢在 中建立資料表。

  • 資料表API接收器:您可以使用SQL命令將資料表資料寫入外部來源,例如 Amazon MSK主題或 Amazon S3 儲存貯體。

資料表API來源

您可以從資料串流建立資料表來源。下列程式碼會從 Amazon MSK主題建立資料表:

//create the table final FlinkKafkaConsumer<StockRecord> consumer = new FlinkKafkaConsumer<StockRecord>(kafkaTopic, new KafkaEventDeserializationSchema(), kafkaProperties); consumer.setStartFromEarliest(); //Obtain stream DataStream<StockRecord> events = env.addSource(consumer); Table table = streamTableEnvironment.fromDataStream(events);

如需資料表來源的詳細資訊,請參閱 Apache Flink 文件中的資料表和SQL連接器

資料表API接收器

若要將資料表資料寫入 接收端,請在 中建立接收端SQL,然後在 StreamTableEnvironment 物件上執行 SQL型接收端。

下列程式碼範例示範如何將資料表資料寫入 Amazon S3 接收器:

final String s3Sink = "CREATE TABLE sink_table (" + "event_time TIMESTAMP," + "ticker STRING," + "price DOUBLE," + "dt STRING," + "hr STRING" + ")" + " PARTITIONED BY (ticker,dt,hr)" + " WITH" + "(" + " 'connector' = 'filesystem'," + " 'path' = '" + s3Path + "'," + " 'format' = 'json'" + ") "; //send to s3 streamTableEnvironment.executeSql(s3Sink); filteredTable.executeInsert("sink_table");

您可以使用 format 參數來控制 Managed Service for Apache Flink 用來將輸出寫入接收器的格式。如需有關格式的資訊,請參閱 Apache Flink 文件中的支援的連接器

使用者定義的來源和接收器

您可以使用現有的 Apache Kafka 連接器,在 Amazon MSK和 Amazon S3 等 AWS 其他服務之間傳送資料。若要與其他資料來源和目的地互動,您可以定義自己的來源和接收器。如需詳細資訊,請參閱 Apache Flink 文件中的使用者定義來源和接收