Amazon Managed Service für Apache Flink war zuvor als Amazon Kinesis Data Analytics für Apache Flink bekannt.
Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.
Im Apache Flink-Programmiermodell sind Konnektoren Komponenten, die Ihre Anwendung verwendet, um Daten aus externen Quellen, z. B. anderen AWS Diensten, zu lesen oder zu schreiben.
Mit der Apache Flink Table API können Sie die folgenden Arten von Konnektoren verwenden:
Tabellen-API-Quellen: Sie verwenden Tabellen-API-Quellkonnektoren, um Tabellen innerhalb Ihrer
TableEnvironment
zu erstellen, indem Sie entweder API-Aufrufe oder SQL-Abfragen verwenden.Die Tabellen-API sinkt: Sie verwenden SQL-Befehle, um Tabellendaten in externe Quellen wie ein Amazon-MSK-Thema oder einen Amazon-S3-Bucket zu schreiben.
Tabellen-API-Quellen
Sie erstellen eine Tabellenquelle aus einem Datenstrom. Der folgende Code erstellt eine Tabelle aus einem Amazon-MSK-Thema:
//create the table
final FlinkKafkaConsumer<StockRecord> consumer = new FlinkKafkaConsumer<StockRecord>(kafkaTopic, new KafkaEventDeserializationSchema(), kafkaProperties);
consumer.setStartFromEarliest();
//Obtain stream
DataStream<StockRecord> events = env.addSource(consumer);
Table table = streamTableEnvironment.fromDataStream(events);
Weitere Informationen zu Tabellenquellen finden Sie unter Table & SQL Connectors
Die Tabellen-API sinkt
Um Tabellendaten in eine Senke zu schreiben, erstellen Sie die Senke in SQL und führen dann die SQL-basierte Senke für das StreamTableEnvironment
-Objekt aus.
Das folgende Codebeispiel zeigt, wie Tabellendaten in eine Amazon-S3-Senke geschrieben werden:
final String s3Sink = "CREATE TABLE sink_table (" +
"event_time TIMESTAMP," +
"ticker STRING," +
"price DOUBLE," +
"dt STRING," +
"hr STRING" +
")" +
" PARTITIONED BY (ticker,dt,hr)" +
" WITH" +
"(" +
" 'connector' = 'filesystem'," +
" 'path' = '" + s3Path + "'," +
" 'format' = 'json'" +
") ";
//send to s3
streamTableEnvironment.executeSql(s3Sink);
filteredTable.executeInsert("sink_table");
Sie können den format
-Parameter verwenden, um zu steuern, welches Format Managed Service für Apache Flink verwendet, um die Ausgabe in die Senke zu schreiben. Informationen zu Formaten finden Sie unter Unterstützte Konnektoren
Benutzerdefinierte Quellen und Senken
Sie können vorhandene Apache-Kafka-Konnektoren verwenden, um Daten zu und von anderen AWS -Services wie Amazon MSK und Amazon S3 zu senden. Für die Interaktion mit anderen Datenquellen und -zielen können Sie Ihre eigenen Quellen und Senken definieren. Weitere Informationen finden Sie unter Benutzerdefinierte Quellen und Senken in der Apache Flink-Dokumentation