Il servizio gestito da Amazon per Apache Flink era precedentemente noto come Analisi dei dati Amazon Kinesis per Apache Flink.
Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.
Aggiungi sorgenti di dati di streaming a Managed Service for Apache Flink
Apache Flink fornisce connettori per la lettura da file, socket, raccolte e origini personalizzate. Nel codice dell'applicazione, utilizzi un'origine Apache Flink
Usa i flussi di dati Kinesis
L'origine FlinkKinesisConsumer
fornisce dati di streaming all'applicazione da un flusso di dati Amazon Kinesis.
Creazione di una FlinkKinesisConsumer
Il seguente esempio di codice illustra la creazione di una FlinkKinesisConsumer
:
Properties inputProperties = new Properties(); inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST"); DataStream<string> input = env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));
Per ulteriori informazioni sull'utilizzo di una FlinkKinesisConsumer
, consulta Scarica ed esamina il codice Java per lo streaming di Apache Flink.
Creane uno FlinkKinesisConsumer
che utilizzi un consumatore EFO
FlinkKinesisConsumer Ora supporta Enhanced Fan-Out () EFO
Se un utente utilizza KinesisEFO, il servizio Kinesis Data Streams gli fornisce una larghezza di banda dedicata, anziché chiedere al consumatore di condividere la larghezza di banda fissa dello stream con gli altri utenti che leggono dallo stream.
Per ulteriori informazioni sull'utilizzo EFO con il consumatore Kinesis, vedere FLIP-128: Enhanced Fan Out for AWS Kinesis
Puoi abilitare il EFO consumatore impostando i seguenti parametri sul consumatore Kinesis:
RECORD_ PUBLISHER _TYPE: Imposta questo parametro in modo EFOche l'applicazione utilizzi un EFO consumatore per accedere ai dati di Kinesis Data Stream.
EFO_ CONSUMER _NAME: Imposta questo parametro su un valore di stringa che sia unico tra i consumatori di questo flusso. Il riutilizzo di un nome consumatore nello stesso flusso di dati Kinesis causerà l'interruzione del precedente consumatore che utilizzava quel nome.
Per configurare un FlinkKinesisConsumer
da utilizzareEFO, aggiungi i seguenti parametri al consumatore:
consumerConfig.putIfAbsent(RECORD_PUBLISHER_TYPE, "EFO"); consumerConfig.putIfAbsent(EFO_CONSUMER_NAME, "basic-efo-flink-app");
Per un esempio di un'applicazione Managed Service for Apache Flink che utilizza un EFO consumatore, vedi. EFOConsumatore
Usa Amazon MSK
La KafkaSource
fonte fornisce dati di streaming all'applicazione da un MSK argomento di Amazon.
Creazione di una KafkaSource
Il seguente esempio di codice illustra la creazione di una KafkaSource
:
KafkaSource<String> source = KafkaSource.<String>builder() .setBootstrapServers(brokers) .setTopics("input-topic") .setGroupId("my-group") .setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(new SimpleStringSchema()) .build(); env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
Per ulteriori informazioni sull'utilizzo di una KafkaSource
, consulta MSKReplica.