Managed Service for Apache Flink에 스트리밍 데이터 소스 추가 - Managed Service for Apache Flink

Amazon Managed Service for Apache Flink는 이전에 Amazon Kinesis Data Analytics for Apache Flink로 알려졌습니다.

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

Managed Service for Apache Flink에 스트리밍 데이터 소스 추가

Apache Flink는 파일, 소켓, 컬렉션, 맞춤 소스에서 읽을 수 있는 커넥터를 제공합니다. 애플리케이션 코드에서 Apache Flink 소스를 사용하여 스트림으로부터 데이터를 수신합니다. 이 섹션에서는 Amazon 서비스에 사용할 수 있는 소스를 설명합니다.

Kinesis 데이터 스트림 사용

KinesisStreamsSource 소스는 Amazon Kinesis 데이터 스트림에서 애플리케이션으로 스트리밍 데이터를 제공합니다.

KinesisStreamsSource 생성

다음 코드 예는 KinesisStreamsSource를 생성하는 방법을 보여 줍니다.

// Configure the KinesisStreamsSource Configuration sourceConfig = new Configuration(); sourceConfig.set(KinesisSourceConfigOptions.STREAM_INITIAL_POSITION, KinesisSourceConfigOptions.InitialPosition.TRIM_HORIZON); // This is optional, by default connector will read from LATEST // Create a new KinesisStreamsSource to read from specified Kinesis Stream. KinesisStreamsSource<String> kdsSource = KinesisStreamsSource.<String>builder() .setStreamArn("arn:aws:kinesis:us-east-1:123456789012:stream/test-stream") .setSourceConfig(sourceConfig) .setDeserializationSchema(new SimpleStringSchema()) .setKinesisShardAssigner(ShardAssignerFactory.uniformShardAssigner()) // This is optional, by default uniformShardAssigner will be used. .build();

사용에 대한 자세한 내용은 Apache Flink 설명서의 Amazon Kinesis Data Streams Connector와 Github의 공개 예제를 KinesisStreamsSource참조하세요. KinesisConnectors

EFO 소비자를 KinesisStreamsSource 사용하는 생성

KinesisStreamsSource 이제 는 향상된 팬아웃(EFO)을 지원합니다.

Kinesis 소비자가 를 사용하는 경우 EFOKinesis Data Streams 서비스는 소비자가 스트림에서 읽는 다른 소비자와 스트림의 고정 대역폭을 공유하지 않고 자체 전용 대역폭을 제공합니다.

Kinesis 소비자와 EFO 함께 를 사용하는 방법에 대한 자세한 내용은 FLIP-128: AWS Kinesis 소비자를 위한 향상된 팬 아웃을 참조하세요.

Kinesis EFO 소비자에서 다음 파라미터를 설정하여 소비자를 활성화합니다.

  • READER_TYPE: EFO 소비자가 Kinesis Data Stream 데이터에 액세스하도록 애플리케이션에 EFO 대해 이 파라미터를 로 설정합니다.

  • EFO_CONSUMER_NAME: 이 파라미터를 이 스트림의 소비자 간에 고유한 문자열 값으로 설정합니다. 동일한 Kinesis Data Stream에서 컨슈머 명칭을 재사용하면 해당 명칭을 사용하던 이전 컨슈머가 종료됩니다.

를 사용하도록 KinesisStreamsSource를 구성하려면 소비자에게 다음 파라미터를 EFO추가합니다.

sourceConfig.set(KinesisSourceConfigOptions.READER_TYPE, KinesisSourceConfigOptions.ReaderType.EFO); sourceConfig.set(KinesisSourceConfigOptions.EFO_CONSUMER_NAME, "my-flink-efo-consumer");

EFO 소비자를 사용하는 Managed Service for Apache Flink 애플리케이션의 예는 Github의 퍼블릭 Kinesis Connectors 예제를 참조하세요.

Amazon 사용 MSK

KafkaSource 소스는 Amazon MSK 주제에서 애플리케이션에 스트리밍 데이터를 제공합니다.

KafkaSource 생성

다음 코드 예는 KafkaSource를 생성하는 방법을 보여 줍니다.

KafkaSource<String> source = KafkaSource.<String>builder() .setBootstrapServers(brokers) .setTopics("input-topic") .setGroupId("my-group") .setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(new SimpleStringSchema()) .build(); env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");

KafkaSource 사용에 대한 자세한 내용을 알아보려면 MSK 복제 섹션을 참조하세요.