Anteriormente, o Amazon Managed Service for Apache Flink era conhecido como Amazon Kinesis Data Analytics for Apache Flink.
As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.
No código do aplicativo, você pode usar qualquer conector de coletor Apache Flink
O Apache Flink também fornece coletores para arquivos e soquetes, e você pode implementar coletores personalizados. Entre os vários coletores suportados, os seguintes são usados com frequência:
Use streams de dados do Kinesis
O Apache Flink fornece informações sobre o conector do Kinesis Data Streams
Para obter um exemplo de um aplicativo que usa um fluxo de dados Kinesis para entrada e saída, consulte Tutorial: Comece a usar a DataStream API no Managed Service para Apache Flink.
Use o Apache Kafka e o Amazon Managed Streaming para Apache Kafka (MSK)
O conector Apache Flink Kafka
Use o Amazon S3
É possível utilizar o StreamingFileSink
do Apache Flink para gravar objetos em um bucket do Amazon S3.
Para obter um exemplo sobre como gravar objetos no S3, consulte Exemplo: gravação em um bucket do Amazon S3.
Use Firehose
FlinkKinesisFirehoseProducer
É um coletor Apache Flink confiável e escalável para armazenar a saída do aplicativo usando o serviço Firehose. Esta seção descreve como configurar um projeto do Maven para criar e utilizar um FlinkKinesisFirehoseProducer
.
Criar uma FlinkKinesisFirehoseProducer
O exemplo de código a seguir demonstra como criar um FlinkKinesisFirehoseProducer
:
Properties outputProperties = new Properties();
outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName, new SimpleStringSchema(), outputProperties);
Exemplo de código FlinkKinesisFirehoseProducer
O exemplo de código a seguir demonstra como criar, configurar FlinkKinesisFirehoseProducer
e enviar dados de um stream de dados do Apache Flink para o serviço Firehose.
package com.amazonaws.services.kinesisanalytics;
import com.amazonaws.services.kinesisanalytics.flink.connectors.config.ProducerConfigConstants;
import com.amazonaws.services.kinesisanalytics.flink.connectors.producer.FlinkKinesisFirehoseProducer;
import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
import java.io.IOException;
import java.util.Map;
import java.util.Properties;
public class StreamingJob {
private static final String region = "us-east-1";
private static final String inputStreamName = "ExampleInputStream";
private static final String outputStreamName = "ExampleOutputStream";
private static DataStream<String> createSourceFromStaticConfig(StreamExecutionEnvironment env) {
Properties inputProperties = new Properties();
inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");
return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));
}
private static DataStream<String> createSourceFromApplicationProperties(StreamExecutionEnvironment env)
throws IOException {
Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(),
applicationProperties.get("ConsumerConfigProperties")));
}
private static FlinkKinesisFirehoseProducer<String> createFirehoseSinkFromStaticConfig() {
/*
* com.amazonaws.services.kinesisanalytics.flink.connectors.config.
* ProducerConfigConstants
* lists of all of the properties that firehose sink can be configured with.
*/
Properties outputProperties = new Properties();
outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName,
new SimpleStringSchema(), outputProperties);
ProducerConfigConstants config = new ProducerConfigConstants();
return sink;
}
private static FlinkKinesisFirehoseProducer<String> createFirehoseSinkFromApplicationProperties() throws IOException {
/*
* com.amazonaws.services.kinesisanalytics.flink.connectors.config.
* ProducerConfigConstants
* lists of all of the properties that firehose sink can be configured with.
*/
Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName,
new SimpleStringSchema(),
applicationProperties.get("ProducerConfigProperties"));
return sink;
}
public static void main(String[] args) throws Exception {
// set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
/*
* if you would like to use runtime configuration properties, uncomment the
* lines below
* DataStream<String> input = createSourceFromApplicationProperties(env);
*/
DataStream<String> input = createSourceFromStaticConfig(env);
// Kinesis Firehose sink
input.addSink(createFirehoseSinkFromStaticConfig());
// If you would like to use runtime configuration properties, uncomment the
// lines below
// input.addSink(createFirehoseSinkFromApplicationProperties());
env.execute("Flink Streaming Java API Skeleton");
}
}
Para obter um tutorial completo sobre como usar o coletor Firehose, consulte. Exemplo: Escrevendo para o Firehose