Amazon Managed Service für Apache Flink war zuvor als Amazon Kinesis Data Analytics für Apache Flink bekannt.
Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.
Schreiben Sie Daten mithilfe von Senken in Managed Service für Apache Flink
In Ihrem Anwendungscode können Sie jeden Apache Flink-Sink-Connector
Apache Flink bietet auch Senken für Dateien und Sockets, und Sie können benutzerdefinierte Senken implementieren. Unter den verschiedenen unterstützten Senken werden häufig die folgenden verwendet:
Verwenden Sie Kinesis-Datenstreams
Apache Flink bietet Informationen zum Kinesis Data Streams-Konnector
Ein Beispiel für eine Anwendung, die einen Kinesis Data Stream für Eingabe und Ausgabe verwendet, finden Sie unter Tutorial: Erste Schritte mit dem DataStream API integrierten Managed Service für Apache Flink.
Verwenden Sie Apache Kafka und Amazon Managed Streaming for Apache Kafka () MSK
Der Apache Flink Kafka Connector
Verwenden Sie Amazon S3
Sie können Apache Flink StreamingFileSink
verwenden, um Objekte in einen Amazon S3-Bucket zu schreiben.
Ein Beispiel dafür, wie man Objekte in S3 schreibt, finden Sie unter Beispiel: In einen Amazon S3 S3-Bucket schreiben.
Benutze Firehose
Das FlinkKinesisFirehoseProducer
ist eine zuverlässige, skalierbare Apache Flink-Senke zum Speichern von Anwendungsausgaben mithilfe des Firehose-Dienstes. In diesem Abschnitt wird die Einrichtung eines Maven-Projekts beschrieben, um einen FlinkKinesisFirehoseProducer
zu erstellen und zu verwenden.
Erstellen eines FlinkKinesisFirehoseProducer
Das folgende Code-Beispiel zeigt das Erstellen eines FlinkKinesisFirehoseProducer
:
Properties outputProperties = new Properties(); outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName, new SimpleStringSchema(), outputProperties);
FlinkKinesisFirehoseProducer
-Codebeispiel
Das folgende Codebeispiel zeigt, wie Sie einen Apache Flink-Datenstream erstellen FlinkKinesisFirehoseProducer
und konfigurieren und Daten von diesem an den Firehose senden.
package com.amazonaws.services.kinesisanalytics; import com.amazonaws.services.kinesisanalytics.flink.connectors.config.ProducerConfigConstants; import com.amazonaws.services.kinesisanalytics.flink.connectors.producer.FlinkKinesisFirehoseProducer; import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer; import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants; import java.io.IOException; import java.util.Map; import java.util.Properties; public class StreamingJob { private static final String region = "us-east-1"; private static final String inputStreamName = "ExampleInputStream"; private static final String outputStreamName = "ExampleOutputStream"; private static DataStream<String> createSourceFromStaticConfig(StreamExecutionEnvironment env) { Properties inputProperties = new Properties(); inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST"); return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties)); } private static DataStream<String> createSourceFromApplicationProperties(StreamExecutionEnvironment env) throws IOException { Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties(); return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), applicationProperties.get("ConsumerConfigProperties"))); } private static FlinkKinesisFirehoseProducer<String> createFirehoseSinkFromStaticConfig() { /* * com.amazonaws.services.kinesisanalytics.flink.connectors.config. * ProducerConfigConstants * lists of all of the properties that firehose sink can be configured with. */ Properties outputProperties = new Properties(); outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName, new SimpleStringSchema(), outputProperties); ProducerConfigConstants config = new ProducerConfigConstants(); return sink; } private static FlinkKinesisFirehoseProducer<String> createFirehoseSinkFromApplicationProperties() throws IOException { /* * com.amazonaws.services.kinesisanalytics.flink.connectors.config. * ProducerConfigConstants * lists of all of the properties that firehose sink can be configured with. */ Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties(); FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName, new SimpleStringSchema(), applicationProperties.get("ProducerConfigProperties")); return sink; } public static void main(String[] args) throws Exception { // set up the streaming execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); /* * if you would like to use runtime configuration properties, uncomment the * lines below * DataStream<String> input = createSourceFromApplicationProperties(env); */ DataStream<String> input = createSourceFromStaticConfig(env); // Kinesis Firehose sink input.addSink(createFirehoseSinkFromStaticConfig()); // If you would like to use runtime configuration properties, uncomment the // lines below // input.addSink(createFirehoseSinkFromApplicationProperties()); env.execute("Flink Streaming Java API Skeleton"); } }
Ein vollständiges Tutorial zur Verwendung der Firehose-Spüle finden Sie unterBeispiel: An Firehose schreiben.