Grave dados usando coletores no Managed Service for Apache Flink - Managed Service for Apache Flink

Anteriormente, o Amazon Managed Service for Apache Flink era conhecido como Amazon Kinesis Data Analytics for Apache Flink.

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Grave dados usando coletores no Managed Service for Apache Flink

No código do seu aplicativo, você pode usar qualquer conector de coletor do Apache Flink para gravar em sistemas externos, incluindo AWS serviços, como Kinesis Data Streams e DynamoDB.

O Apache Flink também fornece coletores para arquivos e soquetes, e você pode implementar coletores personalizados. Entre os vários coletores suportados, os seguintes são usados com frequência:

Use streams de dados do Kinesis

O Apache Flink fornece informações sobre o conector do Kinesis Data Streams na documentação do Apache Flink.

Para obter um exemplo de um aplicativo que usa um fluxo de dados Kinesis para entrada e saída, consulte Tutorial: Comece a usar o serviço DataStream API gerenciado para Apache Flink.

Use o Apache Kafka e o Amazon Managed Streaming para Apache Kafka () MSK

O conector Apache Flink Kafka fornece amplo suporte para publicação de dados no Apache Kafka e na Amazon, incluindo garantias de uma única vez. MSK Para aprender a escrever no Kafka, consulte exemplos de conectores Kafka na documentação do Apache Flink.

Use o Amazon S3

É possível utilizar o StreamingFileSink do Apache Flink para gravar objetos em um bucket do Amazon S3.

Para obter um exemplo sobre como gravar objetos no S3, consulte Exemplo: gravação em um bucket do Amazon S3.

Use Firehose

FlinkKinesisFirehoseProducerÉ um coletor Apache Flink confiável e escalável para armazenar a saída do aplicativo usando o serviço Firehose. Esta seção descreve como configurar um projeto do Maven para criar e utilizar um FlinkKinesisFirehoseProducer.

Criar uma FlinkKinesisFirehoseProducer

O exemplo de código a seguir demonstra como criar um FlinkKinesisFirehoseProducer:

Properties outputProperties = new Properties(); outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName, new SimpleStringSchema(), outputProperties);

Exemplo de código FlinkKinesisFirehoseProducer

O exemplo de código a seguir demonstra como criar, configurar FlinkKinesisFirehoseProducer e enviar dados de um stream de dados do Apache Flink para o serviço Firehose.

package com.amazonaws.services.kinesisanalytics; import com.amazonaws.services.kinesisanalytics.flink.connectors.config.ProducerConfigConstants; import com.amazonaws.services.kinesisanalytics.flink.connectors.producer.FlinkKinesisFirehoseProducer; import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer; import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants; import java.io.IOException; import java.util.Map; import java.util.Properties; public class StreamingJob { private static final String region = "us-east-1"; private static final String inputStreamName = "ExampleInputStream"; private static final String outputStreamName = "ExampleOutputStream"; private static DataStream<String> createSourceFromStaticConfig(StreamExecutionEnvironment env) { Properties inputProperties = new Properties(); inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST"); return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties)); } private static DataStream<String> createSourceFromApplicationProperties(StreamExecutionEnvironment env) throws IOException { Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties(); return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), applicationProperties.get("ConsumerConfigProperties"))); } private static FlinkKinesisFirehoseProducer<String> createFirehoseSinkFromStaticConfig() { /* * com.amazonaws.services.kinesisanalytics.flink.connectors.config. * ProducerConfigConstants * lists of all of the properties that firehose sink can be configured with. */ Properties outputProperties = new Properties(); outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName, new SimpleStringSchema(), outputProperties); ProducerConfigConstants config = new ProducerConfigConstants(); return sink; } private static FlinkKinesisFirehoseProducer<String> createFirehoseSinkFromApplicationProperties() throws IOException { /* * com.amazonaws.services.kinesisanalytics.flink.connectors.config. * ProducerConfigConstants * lists of all of the properties that firehose sink can be configured with. */ Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties(); FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName, new SimpleStringSchema(), applicationProperties.get("ProducerConfigProperties")); return sink; } public static void main(String[] args) throws Exception { // set up the streaming execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); /* * if you would like to use runtime configuration properties, uncomment the * lines below * DataStream<String> input = createSourceFromApplicationProperties(env); */ DataStream<String> input = createSourceFromStaticConfig(env); // Kinesis Firehose sink input.addSink(createFirehoseSinkFromStaticConfig()); // If you would like to use runtime configuration properties, uncomment the // lines below // input.addSink(createFirehoseSinkFromApplicationProperties()); env.execute("Flink Streaming Java API Skeleton"); } }

Para obter um tutorial completo sobre como usar o coletor Firehose, consulte. Exemplo: Escrevendo para o Firehose