Amazon Managed Service for Apache Flink 之前称为 Amazon Kinesis Data Analytics for Apache Flink。
本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
在 Apache Flink 编程模型中,连接器是应用程序用来从外部源(例如其他 AWS 服务)读取或写入数据的组件。
使用 Apache Flink Table API,您可以使用以下类型的连接器:
表 API 来源
您可以从数据流创建表源。以下代码根据Amazon MSK 主题创建表:
//create the table
final FlinkKafkaConsumer<StockRecord> consumer = new FlinkKafkaConsumer<StockRecord>(kafkaTopic, new KafkaEventDeserializationSchema(), kafkaProperties);
consumer.setStartFromEarliest();
//Obtain stream
DataStream<StockRecord> events = env.addSource(consumer);
Table table = streamTableEnvironment.fromDataStream(events);
有关表源的更多信息,请参阅 Apache Flink 文档中的表和 SQL 连接器
表 API 接收器
要将表数据写入接收器,可以在 SQL 中创建接收器,然后在对象上运行基于 SQL 的StreamTableEnvironment
接收器。
以下代码示例演示了如何将表数据写入 Amazon S3 接收器:
final String s3Sink = "CREATE TABLE sink_table (" +
"event_time TIMESTAMP," +
"ticker STRING," +
"price DOUBLE," +
"dt STRING," +
"hr STRING" +
")" +
" PARTITIONED BY (ticker,dt,hr)" +
" WITH" +
"(" +
" 'connector' = 'filesystem'," +
" 'path' = '" + s3Path + "'," +
" 'format' = 'json'" +
") ";
//send to s3
streamTableEnvironment.executeSql(s3Sink);
filteredTable.executeInsert("sink_table");
您可以使用format
参数来控制 Managed Service for Apache Flink 使用何种格式将输出写入接收器。有关格式的信息,请参阅 Apache Flink 文档中支持的连接器
用户定义的源和汇
您可以使用现有的 Apache Kafka 连接器向其他 AWS 服务(例如Amazon MSK 和Amazon S3)发送数据。为了与其他数据源和目标进行交互,您可以定义自己的源和接收器。有关更多信息,请参阅 Apache Flink 文档中的用户定义源和接收器