

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

# DataStream API 구성 요소 검토
<a name="how-datastream"></a>

Apache Flink 애플리케이션은 [Apache Flink DataStream API](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/overview/)를 사용하여 데이터 스트림의 데이터를 변환합니다.

이 섹션에서는 데이터를 이동, 변환, 추적하는 다양한 구성 요소를 설명합니다.
+ [Managed Service for Apache Flink에서 DataStream API를 사용한 데이터 이동 커넥터 사용](how-connectors.md): 이러한 구성 요소는 애플리케이션과 외부 데이터 소스 및 목적지 간에 데이터를 이동합니다.
+ [DataStream API를 사용한 Managed Service for Apache Flink의 연산자를 사용하여 데이터 변환](how-operators.md): 이러한 구성 요소는 응용 프로그램 내의 데이터 요소를 변환하거나 그룹화합니다.
+ [DataStream API를 사용하여 Managed Service for Apache Flink에서 이벤트 추적](how-time.md): 이 항목에서는 Managed Service for Apache Flink가 DataStream API를 사용할 때 이벤트를 추적하는 방법에 대해 설명합니다.

# Managed Service for Apache Flink에서 DataStream API를 사용한 데이터 이동 커넥터 사용
<a name="how-connectors"></a>

Amazon Managed Service for Apache Flink DataStream API에서 *커넥터*는 Managed Service for Apache Flink 애플리케이션에서 데이터를 내보내고 받는 소프트웨어 구성 요소입니다. 커넥터는 파일과 디렉터리에서 읽을 수 있는 유연한 통합 구성 요소입니다. 커넥터는 Amazon 서비스 및 제3자 시스템과 상호 작용하기 위한 완전한 모듈로 구성됩니다.

커넥터 유형은 다음을 포함합니다.
+ [스트리밍 데이터 소스 추가](how-sources.md): Kinesis 데이터 스트림, 파일 또는 기타 데이터 소스에서 애플리케이션에 데이터를 제공합니다.
+ [싱크를 사용하여 데이터 쓰기](how-sinks.md): 애플리케이션에서 Kinesis 데이터 스트림, Firehose 스트림 또는 기타 데이터 대상으로 데이터를 전송합니다.
+ [비동기식 I/O 사용](how-async.md): 스트림 이벤트를 강화하기 위해 데이터 소스(예: 데이터베이스)에 대한 비동기 액세스를 제공합니다.

## 사용 가능한 커넥터
<a name="how-connectors-list"></a>

Apache Flink 프레임워크에는 다양한 소스의 데이터에 액세스하기 위한 커넥터가 포함되어 있습니다. Apache Flink 프레임워크에서 사용할 수 있는 커넥터에 대한 자세한 내용을 알아보려면 [Apache Flink 설명서](https://nightlies.apache.org/flink/flink-docs-release-1.15/)의 [커넥터](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/connectors/)를 참조하세요.

**주의**  
Flink 1.6, 1.8, 1.11 또는 1.13에서 실행되는 애플리케이션을 중동(UAE), 아시아 태평양(하이데라바드), 이스라엘(텔아비브), 유럽(취리히), 아시아 태평양(멜버른) 또는 아시아 태평양(자카르타) 리전에서 실행하려는 경우 업데이트된 커넥터로 애플리케이션 아카이브를 재구축하거나 Flink 1.18로 업그레이드해야 할 수 있습니다.  
Apache Flink 커넥터는 각 커넥터별로 분리된 오픈 소스 리포지토리에 저장됩니다. 버전 1.18 이상으로 업그레이드하는 경우 종속 항목을 업데이트해야 합니다. Apache Flink AWS 커넥터의 리포지토리에 액세스하려면 [flink-connector-aws](https://github.com/apache/flink-connector-aws)를 참조하세요.  
이전 Kinesis 소스 `org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer`는 지원이 중단되었으며 향후 Flink 릴리스에서 제거될 수 있습니다. 대신 [Kinesis 소스](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kinesis/#kinesis-streams-source)를 사용합니다.  
`FlinkKinesisConsumer` 및 `KinesisStreamsSource` 사이에는 상태 호환성이 없습니다. 자세한 내용은 Apache Flink 설명서의 [Kinesis Streams 소스로 기존 작업 마이그레이션](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kinesis/#migrating-existing-jobs-to-new-kinesis-streams-source-from-kinesis-consumer)을 참조하세요.  
 다음은 권장 가이드라인입니다.  


**커넥터 업그레이드**  

| Flink 버전 | 커넥터 사용 | 해결 방법 | 
| --- | --- | --- | 
| 1.19, 1.20 | Kinesis 소스 |  Managed Service for Apache Flink 1.19 및 1.20으로 업그레이드할 때는 최신 Kinesis Data Streams 소스 커넥터를 사용하고 있는지 확인합니다. 5.0.0 이상 버전이어야 합니다. 자세한 내용은 [Amazon Kinesis Data Streams 커넥터](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/kinesis/)를 참조하세요.  | 
| 1.19, 1.20 | Kinesis 싱크 |  Managed Service for Apache Flink 1.19 및 1.20으로 업그레이드할 때는 최신 Kinesis Data Streams 싱크 커넥터를 사용하고 있는지 확인합니다. 5.0.0 이상 버전이어야 합니다. 자세한 내용은 [Kinesis Streams 싱크](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kinesis/#kinesis-streams-sink)를 참조하세요.  | 
| 1.19, 1.20 | DynamoDB 스트림 소스 |  Managed Service for Apache Flink 1.19 및 1.20으로 업그레이드할 때는 최신 DynamoDB 스트림 소스 커넥터를 사용하고 있는지 확인합니다. 5.0.0 이상 버전이어야 합니다. 자세한 내용은 [Amazon DynamoDB 커넥터](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/dynamodb/)를 참조하세요.  | 
| 1.19, 1.20 | DynamoDB 싱크 | Managed Service for Apache Flink 1.19 및 1.20으로 업그레이드할 때는 최신 DynamoDB 싱크 커넥터를 사용하고 있는지 확인합니다. 5.0.0 이상 버전이어야 합니다. 자세한 내용은 [Amazon DynamoDB 커넥터](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/dynamodb/)를 참조하세요. | 
| 1.19, 1.20 | Amazon SQS 싱크 |  Managed Service for Apache Flink 버전 1.19 및 1.20으로 업그레이드할 때는 최신 Amazon SQS 싱크 커넥터를 사용하고 있는지 확인합니다. 5.0.0 이상 버전이어야 합니다. 자세한 내용은 [Amazon SQS 싱크](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/sqs/)를 참조하세요.  | 
| 1.19, 1.20 | Amazon Managed Service for Prometheus 싱크 |  Managed Service for Apache Flink 버전 1.19 및 1.20으로 업그레이드할 때는 최신 Amazon Managed Service for Prometheus 싱크 커넥터를 사용하고 있는지 확인합니다. 1.0.0 이상 버전이어야 합니다. 자세한 내용은 [Prometheus 싱크](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/prometheus/)를 참조하세요.  | 

# Managed Service for Apache Flink에 스트리밍 데이터 소스 추가
<a name="how-sources"></a>

Apache Flink는 파일, 소켓, 컬렉션, 맞춤 소스에서 읽을 수 있는 커넥터를 제공합니다. 애플리케이션 코드에서 [Apache Flink 소스](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/datastream_api.html#data-sources)를 사용하여 스트림으로부터 데이터를 수신합니다. 이 섹션에서는 Amazon 서비스에 사용할 수 있는 소스를 설명합니다.

## Kinesis 데이터 스트림 사용
<a name="input-streams"></a>

`KinesisStreamsSource`는 Amazon Kinesis 데이터 스트림에서 애플리케이션으로 스트리밍 데이터를 제공합니다.

### `KinesisStreamsSource` 생성
<a name="input-streams-create"></a>

다음 코드 예는 `KinesisStreamsSource`를 생성하는 방법을 보여 줍니다.

```
// Configure the KinesisStreamsSource
Configuration sourceConfig = new Configuration();
sourceConfig.set(KinesisSourceConfigOptions.STREAM_INITIAL_POSITION, KinesisSourceConfigOptions.InitialPosition.TRIM_HORIZON); // This is optional, by default connector will read from LATEST

// Create a new KinesisStreamsSource to read from specified Kinesis Stream.
KinesisStreamsSource<String> kdsSource =
        KinesisStreamsSource.<String>builder()
                .setStreamArn("arn:aws:kinesis:us-east-1:123456789012:stream/test-stream")
                .setSourceConfig(sourceConfig)
                .setDeserializationSchema(new SimpleStringSchema())
                .setKinesisShardAssigner(ShardAssignerFactory.uniformShardAssigner()) // This is optional, by default uniformShardAssigner will be used.
                .build();
```

`KinesisStreamsSource` 사용에 관한 자세한 내용은 Apache Flink 설명서의 [Amazon Kinesis Data Streams 커넥터](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/kinesis/) 및 [Github에서의 퍼블릭 KinesisConnectors 예제](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KinesisConnectors)를 참조하세요.

### EFO 소비자를 사용하는 `KinesisStreamsSource` 생성
<a name="input-streams-efo"></a>

`KinesisStreamsSource`는 이제 [향상된 팬아웃(EFO)](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kinesis/)을 지원합니다.

Kinesis 컨슈머가 EFO를 사용하는 경우 Kinesis Data Streams 서비스는 컨슈머가 스트림에서 읽는 다른 컨슈머와 스트림의 고정 대역폭을 공유하지 않고 자체 전용 대역폭을 제공합니다.

Kinesis 소비자와 함께 EFO를 사용하는 방법에 대한 자세한 내용은 [ FLIP-128: AWS Kinesis 소비자를 위한 향상된 팬아웃을 참조하세요](https://cwiki.apache.org/confluence/display/FLINK/FLIP-128%3A+Enhanced+Fan+Out+for+AWS+Kinesis+Consumers).

Kinesis 소비자에 다음 파라미터를 설정하여 EFO 소비자를 활성화합니다.
+ **READER\$1TYPE:** 애플리케이션이 **EFO** 소비자를 사용하여 Kinesis Data Stream 데이터에 액세스하도록 이 파라미터를 EFO로 설정하세요.
+ **EFO\$1CONSUMER\$1NAME: ** 이 파라미터를 이 스트림의 소비자 간에 고유한 문자열 값으로 설정합니다. 동일한 Kinesis Data Stream에서 컨슈머 명칭을 재사용하면 해당 명칭을 사용하던 이전 컨슈머가 종료됩니다.

EFO를 사용하도록 `KinesisStreamsSource`를 구성하려면 컨슈머에 다음 파라미터를 추가하세요.

```
sourceConfig.set(KinesisSourceConfigOptions.READER_TYPE, KinesisSourceConfigOptions.ReaderType.EFO);
sourceConfig.set(KinesisSourceConfigOptions.EFO_CONSUMER_NAME, "my-flink-efo-consumer");
```

EFO 소비자를 사용하는 Managed Service for Apache Flink 애플리케이션의 예제는 [Github에서의 퍼블릭 Kinesis 커넥터 예제](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KinesisConnectors)를 참조하세요.

## Amazon MSK 사용
<a name="input-msk"></a>

`KafkaSource` 소스는 Amazon MSK 주제에서 스트리밍 데이터를 애플리케이션에 제공합니다.

### `KafkaSource` 생성
<a name="input-msk-create"></a>

다음 코드 예는 `KafkaSource`를 생성하는 방법을 보여 줍니다.

```
KafkaSource<String> source = KafkaSource.<String>builder()
    .setBootstrapServers(brokers)
    .setTopics("input-topic")
    .setGroupId("my-group")
    .setStartingOffsets(OffsetsInitializer.earliest())
    .setValueOnlyDeserializer(new SimpleStringSchema())
    .build();

env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
```

`KafkaSource` 사용에 대한 자세한 내용을 알아보려면 [MSK 복제](earlier.md#example-msk) 섹션을 참조하세요.

# Managed Service for Apache Flink에서 싱크를 사용하여 데이터 쓰기
<a name="how-sinks"></a>

애플리케이션 코드에서는 Kinesis Data Streams 및 DynamoDB 같은 AWS 서비스를 포함한 외부 시스템에 데이터를 쓰기 위해 어떤 [Apache Flink 싱크](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/overview/) 커넥터든 사용할 수 있습니다.

Apache Flink는 파일과 소켓용 싱크를 제공하며 사용자 지정 싱크를 구현할 수도 있습니다. 지원되는 여러 싱크 중 다음 싱크가 자주 사용됩니다.

## Kinesis 데이터 스트림 사용
<a name="sinks-streams"></a>

Apache Flink는 Apache Flink 설명서에서 [Kinesis Data Streams 커넥터](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/kinesis/)에 대한 정보를 제공합니다.

입력 및 출력에 Kinesis 데이터 스트림을 사용하는 애플리케이션의 예는 [자습서: Managed Service for Apache Flink에서 DataStream API 사용 시작하기](getting-started.md) 섹션을 참조하세요.

## Apache Kafka 및 Amazon Managed Streaming for Apache Kafka(MSK) 사용
<a name="sinks-MSK"></a>

[Apache Flink Kafka 커넥터](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/kafka/#kafka-sink)는 Apache Kafka와 Amazon MSK로 데이터를 게시하기 위한 광범위한 기능을 제공하며, 정확히 한 번 처리 보장도 지원합니다. Kafka에 데이터를 쓰는 방법은 Apache Flink 설명서의 [Kafka 커넥터 예제](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KafkaConnectors)를 참조하세요.

## Amazon S3 사용
<a name="sinks-s3"></a>

Amazon S3 버킷에 객체를 쓰는 데 Apache Flink `StreamingFileSink`를 사용할 수 있습니다.

S3에 객체를 쓰는 방법에 대한 예는 [예: Amazon S3 버킷에 쓰기](earlier.md#examples-s3) 섹션을 참조하세요.

## Firehose 사용
<a name="sinks-firehose"></a>

`FlinkKinesisFirehoseProducer`는 [Firehose](https://docs.aws.amazon.com/firehose/latest/dev/) 서비스를 사용하여 애플리케이션 출력을 저장하기 위한 안정적이고 확장 가능한 Apache Flink 싱크입니다. 이 섹션에서는 Maven 프로젝트를 설정하여 `FlinkKinesisFirehoseProducer`를 생성하고 사용하는 방법을 설명합니다.

**Topics**
+ [`FlinkKinesisFirehoseProducer` 생성](#sinks-firehose-create)
+ [`FlinkKinesisFirehoseProducer` 코드 예](#sinks-firehose-sample)

### `FlinkKinesisFirehoseProducer` 생성
<a name="sinks-firehose-create"></a>

다음 코드 예는 `FlinkKinesisFirehoseProducer`를 생성하는 방법을 보여 줍니다.

```
Properties outputProperties = new Properties();
outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);

FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName, new SimpleStringSchema(), outputProperties);
```

### `FlinkKinesisFirehoseProducer` 코드 예
<a name="sinks-firehose-sample"></a>

다음 코드 예제는 `FlinkKinesisFirehoseProducer`를 생성 및 구성하고 Apache Flink 데이터 스트림에서 Firehose 서비스로 데이터를 전송하는 방법을 보여줍니다.

```
 
package com.amazonaws.services.kinesisanalytics;

import com.amazonaws.services.kinesisanalytics.flink.connectors.config.ProducerConfigConstants;
import com.amazonaws.services.kinesisanalytics.flink.connectors.producer.FlinkKinesisFirehoseProducer;
import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;

import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;

import java.io.IOException;
import java.util.Map;
import java.util.Properties;

public class StreamingJob {

	private static final String region = "us-east-1";
	private static final String inputStreamName = "ExampleInputStream";
	private static final String outputStreamName = "ExampleOutputStream";

	private static DataStream<String> createSourceFromStaticConfig(StreamExecutionEnvironment env) {
		Properties inputProperties = new Properties();
		inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
		inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");

		return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));
	}

	private static DataStream<String> createSourceFromApplicationProperties(StreamExecutionEnvironment env)
			throws IOException {
		Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
		return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(),
				applicationProperties.get("ConsumerConfigProperties")));
	}

	private static FlinkKinesisFirehoseProducer<String> createFirehoseSinkFromStaticConfig() {
		/*
		 * com.amazonaws.services.kinesisanalytics.flink.connectors.config.
		 * ProducerConfigConstants
		 * lists of all of the properties that firehose sink can be configured with.
		 */

		Properties outputProperties = new Properties();
		outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);

		FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName,
				new SimpleStringSchema(), outputProperties);
		ProducerConfigConstants config = new ProducerConfigConstants();
		return sink;
	}

	private static FlinkKinesisFirehoseProducer<String> createFirehoseSinkFromApplicationProperties() throws IOException {
		/*
		 * com.amazonaws.services.kinesisanalytics.flink.connectors.config.
		 * ProducerConfigConstants
		 * lists of all of the properties that firehose sink can be configured with.
		 */

		Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
		FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName,
				new SimpleStringSchema(),
				applicationProperties.get("ProducerConfigProperties"));
		return sink;
	}

	public static void main(String[] args) throws Exception {
		// set up the streaming execution environment
		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		/*
		 * if you would like to use runtime configuration properties, uncomment the
		 * lines below
		 * DataStream<String> input = createSourceFromApplicationProperties(env);
		 */

		DataStream<String> input = createSourceFromStaticConfig(env);

		// Kinesis Firehose sink
		input.addSink(createFirehoseSinkFromStaticConfig());

		// If you would like to use runtime configuration properties, uncomment the
		// lines below
		// input.addSink(createFirehoseSinkFromApplicationProperties());

		env.execute("Flink Streaming Java API Skeleton");
	}
}
```

Firehose 싱크를 사용하는 방법에 대한 전체 자습서는 [예제: Firehose에 쓰기](earlier.md#get-started-exercise-fh) 섹션을 참조하세요.

# Managed Service for Apache Flink에서 비동기식 I/O 사용
<a name="how-async"></a>

비동기식 I/O 연산자는 데이터베이스와 같은 외부 데이터 소스를 사용하여 스트림 데이터를 강화합니다. Managed Service for Apache Flink는 스트림 이벤트를 비동기적으로 보강하여 요청을 일괄 처리하여 효율성을 높일 수 있도록 합니다.

자세한 내용을 알아보려면 Apache Flink 설명서의 [비동기식 I/O](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/operators/asyncio/) 섹션을 참조하세요.

# DataStream API를 사용한 Managed Service for Apache Flink의 연산자를 사용하여 데이터 변환
<a name="how-operators"></a>

Managed Service for Apache Flink에서 들어오는 데이터를 변환하려면 Apache Flink *연산자*를 사용합니다. Apache Flink 연산자는 하나 이상의 데이터 스트림을 새 데이터 스트림으로 변환합니다. 새 데이터 스트림에는 원래 데이터 스트림에서 수정된 데이터가 포함됩니다. Apache Flink는 25개 이상의 사전 빌드된 스트림 처리 연산자를 제공합니다. 자세한 내용은 Apache Flink 설명서의 [연산자](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/operators/overview/)를 참조하세요.

**Topics**
+ [변환 연산자 사용](#how-operators-transform)
+ [집계 연산자 사용](#how-operators-agg)

## 변환 연산자 사용
<a name="how-operators-transform"></a>

다음은 JSON 데이터 스트림의 필드 중 하나에 대한 간단한 텍스트 변환의 예입니다.

이 코드는 변환된 데이터 스트림을 만듭니다. 새 데이터 스트림에는 원래 스트림과 동일한 데이터가 포함되며 `TICKER` 필드 내용에 “` Company`” 문자열이 추가됩니다.

```
DataStream<ObjectNode> output = input.map(
    new MapFunction<ObjectNode, ObjectNode>() {
        @Override
        public ObjectNode map(ObjectNode value) throws Exception {
            return value.put("TICKER", value.get("TICKER").asText() + " Company");
        }
    }
);
```

## 집계 연산자 사용
<a name="how-operators-agg"></a>

다음은 집계 연산자의 예시입니다. 코드는 집계된 데이터 스트림을 만듭니다. 연산자는 5초짜리 텀블링 윈도우를 만들고 창에 있는 레코드에 대한 `PRICE` 값의 합계를 같은 `TICKER` 값으로 반환합니다.

```
DataStream<ObjectNode> output = input.keyBy(node -> node.get("TICKER").asText())
    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    .reduce((node1, node2) -> {
        double priceTotal = node1.get("PRICE").asDouble() + node2.get("PRICE").asDouble();
        node1.replace("PRICE", JsonNodeFactory.instance.numberNode(priceTotal));
    return node1;
});
```

더 많은 코드 예제는 [Managed Service for Apache Flink 애플리케이션 생성 및 사용 예제](examples-collapsibles.md) 섹션을 참조하세요.

# DataStream API를 사용하여 Managed Service for Apache Flink에서 이벤트 추적
<a name="how-time"></a>

Managed Service for Apache Flink는 다음 타임스탬프를 사용하여 이벤트를 추적합니다.
+ **처리 시간:** 각 작업을 실행 중인 시스템의 시스템 시간을 나타냅니다.
+ **이벤트 시간:** 각 개별 이벤트가 생성 장치에서 발생한 시간을 나타냅니다.
+ **수집 시간:** 이벤트가 Managed Service for Apache Flink에 입력되는 시간을 나타냅니다.

`setStreamTimeCharacteristic`을 사용하여 스트리밍 환경에서 사용되는 시간을 설정합니다.

```
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
```

타임스탬프에 관한 자세한 내용을 알아보려면 Apache Flink 설명서의 [워터마크 생성](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/event-time/generating_watermarks/)을 참조하세요.