Apache Flink용 관리형 서비스에 스트리밍 데이터 소스를 추가합니다. - Managed Service for Apache Flink

Amazon Managed Service for Apache Flink는 이전에 Amazon Kinesis Data Analytics for Apache Flink로 알려졌습니다.

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

Apache Flink용 관리형 서비스에 스트리밍 데이터 소스를 추가합니다.

Apache Flink는 파일, 소켓, 컬렉션, 맞춤 소스에서 읽을 수 있는 커넥터를 제공합니다. 애플리케이션 코드에서 Apache Flink 소스를 사용하여 스트림으로부터 데이터를 수신합니다. 이 섹션에서는 Amazon 서비스에 사용할 수 있는 소스를 설명합니다.

Kinesis 데이터 스트림 사용

FlinkKinesisConsumer 소스는 Amazon Kinesis 데이터 스트림에서 애플리케이션으로 스트리밍 데이터를 제공합니다.

FlinkKinesisConsumer 생성

다음 코드 예는 FlinkKinesisConsumer를 생성하는 방법을 보여 줍니다.

Properties inputProperties = new Properties(); inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST"); DataStream<string> input = env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));

FlinkKinesisConsumer 사용에 대한 자세한 내용을 알아보려면 Apache Flink 스트리밍 Java 코드를 다운로드하여 검토하십시오. 섹션을 참조하세요.

소비자를 FlinkKinesisConsumer 사용하는 광고 만들기 EFO

는 FlinkKinesisConsumer 이제 향상된 팬아웃 () EFO 을 지원합니다.

Kinesis 소비자가 사용하는 EFO 경우 Kinesis Data Streams 서비스는 소비자가 스트림에서 읽는 다른 소비자와 스트림의 고정 대역폭을 공유하지 않고 자체 전용 대역폭을 제공합니다.

Kinesis EFO 소비자와 함께 사용하는 방법에 대한 자세한 내용은 FLIP-128: Kinesis 소비자를 위한 향상된 팬아웃을 참조하십시오. AWS

Kinesis EFO 소비자에 다음 매개변수를 설정하여 소비자를 활성화합니다.

  • RECORD_ PUBLISHER _TYPE: 애플리케이션이 EFOEFO소비자를 사용하여 Kinesis Data Stream 데이터에 액세스하도록 하려면 이 파라미터를 로 설정합니다.

  • EFO_ CONSUMER _NAME: 이 파라미터를 이 스트림의 소비자 간에 고유한 문자열 값으로 설정합니다. 동일한 Kinesis Data Stream에서 컨슈머 명칭을 재사용하면 해당 명칭을 사용하던 이전 컨슈머가 종료됩니다.

FlinkKinesisConsumera를 사용하도록 EFO 구성하려면 소비자에게 다음 매개변수를 추가하십시오.

consumerConfig.putIfAbsent(RECORD_PUBLISHER_TYPE, "EFO"); consumerConfig.putIfAbsent(EFO_CONSUMER_NAME, "basic-efo-flink-app");

EFO소비자를 사용하는 Apache Flink용 관리 서비스 애플리케이션의 예는 을 참조하십시오. EFO 컨슈머

아마존 사용 MSK

KafkaSource소스는 Amazon MSK 주제의 스트리밍 데이터를 애플리케이션에 제공합니다.

KafkaSource 생성

다음 코드 예는 KafkaSource를 생성하는 방법을 보여 줍니다.

KafkaSource<String> source = KafkaSource.<String>builder() .setBootstrapServers(brokers) .setTopics("input-topic") .setGroupId("my-group") .setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(new SimpleStringSchema()) .build(); env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");

KafkaSource 사용에 대한 자세한 내용을 알아보려면 MSK 복제 섹션을 참조하세요.