

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

# Managed Service for Apache Flink 애플리케이션 생성 및 실행
<a name="get-started-exercise"></a>

이 단계에서는 Kinesis 데이터 스트림을 소스 및 싱크로 사용하여 Managed Service for Apache Flink 애플리케이션을 만듭니다.

**Topics**
+ [종속 리소스 생성](#get-started-exercise-0)
+ [로컬 개발 환경 설정](#get-started-exercise-2)
+ [Apache Flink 스트리밍 Java 코드 다운로드 및 검사](#get-started-exercise-5)
+ [샘플 레코드를 입력 스트림에 쓰기](#get-started-exercise-5-4)
+ [로컬에서 애플리케이션 실행](#get-started-exercise-5-run)
+ [Kinesis 스트림의 입력 및 출력 데이터 관찰](#get-started-exercise-input-output)
+ [로컬에서 실행 중인 애플리케이션 중지](#get-started-exercise-stop)
+ [애플리케이션 코드 컴파일 및 패키징](#get-started-exercise-5-5)
+ [애플리케이션 코드 JAR 파일 업로드](#get-started-exercise-6)
+ [Managed Service for Apache Flink 애플리케이션 생성 및 구성](#get-started-exercise-7)
+ [다음 단계](#get-started-exercise-next-step-4)

## 종속 리소스 생성
<a name="get-started-exercise-0"></a>

이 연습을 위해 Managed Service for Apache Flink 애플리케이션을 생성하기 전에 다음과 같은 종속 리소스를 생성해야 합니다.
+ 입력 및 출력을 위한 Kinesis 데이터 스트림 2개.
+ 애플리케이션 코드를 저장할 Amazon S3 버킷
**참고**  
이 자습서에서는 us-east-1 미국 동부(버지니아 북부) 리전에 애플리케이션을 배포한다고 가정합니다. 다른 리전을 사용하는 경우 모든 단계를 해당 리전에 맞게 조정합니다.

### 2개의 Amazon Kinesis 데이터 스트림 생성
<a name="get-started-exercise-1"></a>

이 연습을 위해 Managed Service for Apache Flink 애플리케이션을 생성하기 전에 두 개의 Kinesis 데이터 스트림(`ExampleInputStream` 및 `ExampleOutputStream`)을 생성하세요. 이 애플리케이션은 애플리케이션 소스 및 대상 스트림에 대해 이러한 스트림을 사용합니다.

Amazon Kinesis 콘솔 또는 다음 AWS CLI 명령을 사용하여 이러한 스트림을 생성할 수 있습니다. 콘솔 지침은 *Amazon Kinesis Data Streams 개발자 가이드*의 [데이터 스트림 생성 및 업데이트](https://docs.aws.amazon.com/kinesis/latest/dev/amazon-kinesis-streams.html)를 참조하세요. 를 사용하여 스트림을 생성하려면 다음 명령을 AWS CLI사용하여 애플리케이션에 사용하는 리전에 맞게 조정합니다.

**데이터 스트림 (AWS CLI)을 생성하려면**

1. 첫 번째 스트림(`ExampleInputStream`)을 생성하려면 다음 Amazon Kinesis `create-stream` AWS CLI 명령을 사용합니다.

   ```
   $ aws kinesis create-stream \
   --stream-name ExampleInputStream \
   --shard-count 1 \
   --region us-east-1 \
   ```

1. 애플리케이션에서 출력을 쓰는 데 사용하는 두 번째 스트림을 생성하려면 동일한 명령을 실행하여 스트림 명칭을 `ExampleOutputStream`으로 변경합니다.

   ```
   $ aws kinesis create-stream \
   --stream-name ExampleOutputStream \
   --shard-count 1 \
   --region us-east-1 \
   ```

### 애플리케이션 코드를 위한 Amazon S3 버킷 생성
<a name="get-started-exercise-1-5"></a>

콘솔을 사용하여 Amazon S3 버킷을 생성할 수 있습니다. 콘솔을 사용하여 Amazon S3 버킷을 생성하는 방법을 알아보려면 [Amazon S3 사용 설명서](https://docs.aws.amazon.com/AmazonS3/latest/userguide/)의 [버킷 생성](https://docs.aws.amazon.com/AmazonS3/latest/userguide/create-bucket-overview.html)을 참조하세요. Amazon S3 버킷 이름은 전역적으로 고유한 명칭을 지정해야 합니다. 예를 들어 로그인 이름을 추가하여 고유하게 만들 수 있습니다.

**참고**  
 이 자습서에서 사용하는 리전(us-east-1)에 버킷을 생성해야 합니다.

### 기타 리소스
<a name="get-started-exercise-1-6"></a>

애플리케이션을 생성할 때 Managed Service for Apache Flink는 다음과 같은 Amazon CloudWatch 리소스를 자동으로 성합니다(아직 존재하지 않는 경우).
+ `/AWS/KinesisAnalytics-java/<my-application>`라는 로그 그룹.
+ `kinesis-analytics-log-stream`라는 로그 스트림.

## 로컬 개발 환경 설정
<a name="get-started-exercise-2"></a>

개발 및 디버깅을 위해 선택한 IDE에서 Apache Flink 애플리케이션을 로컬 머신에서 직접 실행할 수 있습니다. Apache Flink 종속성은 Apache Maven을 사용하는 일반 Java 종속성과 동일하게 처리됩니다.

**참고**  
로컬 개발 머신에는 Java JDK 11, Maven, Git이 설치되어 있어야 합니다. [Eclipse Java Neon](https://www.eclipse.org/downloads/packages/release/neon/3) 또는 [IntelliJ IDEA](https://www.jetbrains.com/idea/)와 같은 개발 환경을 사용하는 것이 좋습니다. 모든 사전 조건 충족 여부를 확인하려면 [연습 완료를 위한 필수 조건 충족](getting-started.md#setting-up-prerequisites) 섹션을 참조하세요. 로컬 머신에 Apache Flink 클러스터를 설치할 필요는 **없습니다**.

### AWS 세션 인증
<a name="get-started-exercise-2-5"></a>

애플리케이션이 Kinesis 데이터 스트림을 사용하여 데이터를 게시합니다. 로컬에서 실행하는 경우 Kinesis 데이터 스트림에 쓸 수 있는 권한이 있는 유효한 AWS 인증된 세션이 있어야 합니다. 다음 단계에 따라 세션을 인증합니다.

1.  AWS CLI 유효한 자격 증명이 구성된 및 명명된 프로파일이 없는 경우 섹션을 참조하세요[AWS Command Line Interface (AWS CLI) 설정](setup-awscli.md).

1. 다음 테스트 레코드를 게시하여 AWS CLI 가 올바르게 구성되었고 사용자에게 Kinesis 데이터 스트림에 쓸 수 있는 권한이 있는지 확인합니다.

   ```
   $ aws kinesis put-record --stream-name ExampleOutputStream --data TEST --partition-key TEST
   ```

1. IDE에 통합할 플러그인이 있는 경우 AWS이를 사용하여 IDE에서 실행 중인 애플리케이션에 자격 증명을 전달할 수 있습니다. 자세한 내용은 [IntelliJ IDEA용AWS 툴킷](https://aws.amazon.com/intellij/) 및 [Eclipse용AWS 툴킷](https://docs.aws.amazon.com/toolkit-for-eclipse/v1/user-guide/welcome.html)을 참조하세요.

## Apache Flink 스트리밍 Java 코드 다운로드 및 검사
<a name="get-started-exercise-5"></a>

이 예에 대한 Java 애플리케이션 코드는 GitHub에서 사용할 수 있습니다. 애플리케이션 코드를 다운로드하려면 다음을 수행하세요.

1. 다음 명령을 사용하여 원격 리포지토리를 복제합니다.

   ```
   git clone https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples.git
   ```

1. `amazon-managed-service-for-apache-flink-examples/tree/main/java/GettingStarted` 디렉터리로 이동합니다.

### 애플리케이션 구성 요소 검토
<a name="get-started-exercise-5-1"></a>

애플리케이션은 `com.amazonaws.services.msf.BasicStreamingJob` 클래스에 모두 구현되어 있습니다. `main()` 메서드는 스트리밍 데이터를 처리하고 실행하기 위한 데이터 흐름을 정의합니다.

**참고**  
최적화된 개발자 경험을 위해 이 애플리케이션은 Amazon Managed Service for Apache Flink와 IDE의 로컬 개발 환경 모두에서 코드 변경 없이 실행되도록 설계되었습니다.
+ Amazon Managed Service for Apache Flink에서 실행할 때와 IDE에서 실행할 때 모두 동작하도록 런타임 구성을 읽기 위해 애플리케이션은 IDE에서 로컬로 독립형으로 실행되고 있는지를 자동으로 감지합니다. 이 경우 애플리케이션은 다음과 같이 런타임 구성을 다르게 로드합니다.

  1. 애플리케이션이 IDE에서 독립 실행 모드로 실행되고 있음을 감지하면, 프로젝트의 **resources** 폴더에 포함된 `application_properties.json` 파일을 생성합니다. 파일 내용은 다음과 같습니다.

  1. 애플리케이션이 Amazon Managed Service for Apache Flink에서 실행될 때는, 기본 동작으로 Amazon Managed Service for Apache Flink 애플리케이션에서 정의하게 될 런타임 속성에서 애플리케이션 구성을 로드합니다. [Managed Service for Apache Flink 애플리케이션 생성 및 구성](#get-started-exercise-7)을(를) 참조하세요.

     ```
     private static Map<String, Properties> loadApplicationProperties(StreamExecutionEnvironment env) throws IOException {
         if (env instanceof LocalStreamEnvironment) {
             LOGGER.info("Loading application properties from '{}'", LOCAL_APPLICATION_PROPERTIES_RESOURCE);
             return KinesisAnalyticsRuntime.getApplicationProperties(
                     BasicStreamingJob.class.getClassLoader()
                             .getResource(LOCAL_APPLICATION_PROPERTIES_RESOURCE).getPath());
         } else {
             LOGGER.info("Loading application properties from Amazon Managed Service for Apache Flink");
             return KinesisAnalyticsRuntime.getApplicationProperties();
         }
     }
     ```
+ `main()` 메서드는 애플리케이션 데이터 흐름을 정의하고 실행합니다.
  + 기본 스트리밍 환경을 초기화합니다. 이 예제에서는 DataSteam API에서 사용할 `StreamExecutionEnvironment`와 SQL 및 Table API에서 사용할 `StreamTableEnvironment`를 모두 생성하는 방법을 보여줍니다. 두 환경 객체는 동일한 런타임 환경에 대한 별도의 참조이며, 서로 다른 API를 사용하기 위한 것입니다.

    ```
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    ```
  + 애플리케이션 구성 파라미터를 로드합니다. 이렇게 하면 애플리케이션이 실행되는 위치에 따라 올바른 위치에서 구성 파라미터를 자동으로 로드합니다.

    ```
    Map<String, Properties> applicationParameters = loadApplicationProperties(env);
    ```
  + 애플리케이션은 입력 스트림에서 데이터를 읽기 위해 [Kinesis 소비자](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/datastream/kinesis/#kinesis-consumer) 커넥터를 사용하여 소스를 정의합니다. 입력 스트림의 구성은 `PropertyGroupId`=`InputStream0`에 정의되어 있습니다. 스트림의 이름과 리전은 각각 `stream.name` 및 `aws.region`이라는 속성에 정의되어 있습니다. 단순화를 위해, 이 소스는 레코드를 문자열로 읽습니다.

    ```
    private static FlinkKinesisConsumer<String> createSource(Properties inputProperties) {
        String inputStreamName = inputProperties.getProperty("stream.name");
        return new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties);
    }
    ...
    
    public static void main(String[] args) throws Exception { 
       ...
       SourceFunction<String> source = createSource(applicationParameters.get("InputStream0"));
       DataStream<String> input = env.addSource(source, "Kinesis Source");  
       ...
    }
    ```
  + 애플리케이션은 출력 스트림으로 데이터를 전송하기 위해 [Kinesis Streams Sink](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/datastream/kinesis/#kinesis-streams-sink) 커넥터를 사용하여 싱크를 정의합니다. 출력 스트림의 이름과 리전은 입력 스트림과 마찬가지로 `PropertyGroupId`=`OutputStream0`에 정의되어 있습니다. 싱크는 소스에서 데이터를 가져오는 내부 `DataStream`에 직접 연결됩니다. 실제 애플리케이션에서는 소스와 싱크 사이에 일부 변환 과정이 존재합니다.

    ```
    private static KinesisStreamsSink<String> createSink(Properties outputProperties) {
        String outputStreamName = outputProperties.getProperty("stream.name");
        return KinesisStreamsSink.<String>builder()
                .setKinesisClientProperties(outputProperties)
                .setSerializationSchema(new SimpleStringSchema())
                .setStreamName(outputStreamName)
                .setPartitionKeyGenerator(element -> String.valueOf(element.hashCode()))
                .build();
    }
    ...
    public static void main(String[] args) throws Exception { 
       ...
       Sink<String> sink = createSink(applicationParameters.get("OutputStream0"));
       input.sinkTo(sink);
       ...
    }
    ```
  + 마지막으로 방금 정의한 데이터 흐름을 실행합니다. 이는 데이터 흐름에 필요한 모든 연산자를 정의한 후 `main()` 메서드의 마지막 명령이어야 합니다.

    ```
    env.execute("Flink streaming Java API skeleton");
    ```

### pom.xml 파일 사용
<a name="get-started-exercise-5-2"></a>

pom.xml 파일은 애플리케이션에서 필요한 모든 종속성을 정의하고, Flink에서 필요한 모든 종속성을 포함하는 fat-jar를 빌드하기 위해 Maven Shade 플러그인을 설정합니다.
+ 일부 종속성은 `provided` 범위를 가집니다. 이러한 종속성은 애플리케이션이 Amazon Managed Service for Apache Flink에서 실행될 때 자동으로 제공됩니다. 이는 애플리케이션을 컴파일하거나 IDE에서 로컬로 실행하는 데 필요합니다. 자세한 내용은 [로컬에서 애플리케이션 실행](#get-started-exercise-5-run) 단원을 참조하십시오. Amazon Managed Service for Apache Flink에서 사용할 런타임과 동일한 Flink 버전을 사용해야 합니다.

  ```
  <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-clients</artifactId>
      <version>${flink.version}</version>
      <scope>provided</scope>
  </dependency>
  <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java</artifactId>
      <version>${flink.version}</version>
      <scope>provided</scope>
  </dependency>
  ```
+ 이 애플리케이션에서 사용하는 [Kinesis 커넥터](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kinesis/)와 같이 Apache Flink 종속성을 pom에 기본 범위로 추가해야 합니다. 자세한 내용은 [Apache Flink 커넥터 사용](how-flink-connectors.md) 단원을 참조하십시오. 애플리케이션에 필요한 Java 종속성도 추가할 수 있습니다.

  ```
  <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-kinesis</artifactId>
      <version>${aws.connector.version}</version>
  </dependency>
  ```
+ Maven Java Compiler 플러그인은 코드가 Apache Flink에서 현재 지원하는 JDK 버전인 Java 11로 컴파일되도록 보장합니다.
+ Maven Shade 플러그인은 fat-jar를 패키징하며 이때 런타임에서 제공하는 일부 라이브러리는 제외합니다. 또한 `ServicesResourceTransformer` 및 `ManifestResourceTransformer`라는 두 가지 트랜스포머를 지정합니다. 후자는 애플리케이션을 시작하기 위해 `main` 메서드를 포함하는 클래스를 구성합니다. 기본 클래스 이름을 변경하는 경우 이 트랜스포머도 업데이트해야 합니다.
+ 

  ```
  <plugin>
      <groupId>org.apache.maven.plugins</groupId>
      <artifactId>maven-shade-plugin</artifactId>
      ...
          <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
              <mainClass>com.amazonaws.services.msf.BasicStreamingJob</mainClass>
          </transformer>
      ...
  </plugin>
  ```

## 샘플 레코드를 입력 스트림에 쓰기
<a name="get-started-exercise-5-4"></a>

이 섹션에서는 애플리케이션이 처리할 수 있도록 스트림에 샘플 레코드를 전송합니다. 샘플 데이터를 생성하는 방법은 Python 스크립트를 사용하거나 [Kinesis Data Generator](https://github.com/awslabs/amazon-kinesis-data-generator)를 사용하는 두 가지 옵션이 있습니다.

### Python 스크립트를 사용하여 샘플 데이터 생성
<a name="get-started-exercise-5-4-1"></a>

Python 스크립트를 사용하여 스트림으로 샘플 레코드를 전송할 수 있습니다.

**참고**  
이 Python 스크립트를 실행하려면 Python 3.x를 사용해야 하며 [AWS SDK for Python (Boto)](https://aws.amazon.com/developer/language/python/) 라이브러리가 설치되어 있어야 합니다.

**Kinesis 입력 스트림으로 테스트 데이터 전송을 시작하려면:**

1. 데이터 생성기 `stock.py` Python 스크립트를 [Data generator GitHub repository](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/python/data-generator)에서 다운로드합니다.

1. `stock.py` 스크립트를 실행합니다.

   ```
   $ python stock.py
   ```

자습서의 나머지 단계를 완료하는 동안 스크립트를 계속 실행한 상태로 유지합니다. 이제 Apache Flink 애플리케이션을 실행할 수 있습니다.

### Kinesis Data Generator를 사용하여 샘플 데이터 생성
<a name="get-started-exercise-5-4-2"></a>

Python 스크립트를 사용하는 대신, [호스팅 버전](https://awslabs.github.io/amazon-kinesis-data-generator/web/producer.html)으로도 제공되는 [Kinesis Data Generator](https://github.com/awslabs/amazon-kinesis-data-generator)를 사용하여 스트림으로 임의의 샘플 데이터를 전송할 수 있습니다. Kinesis Data Generator는 브라우저에서 실행되며, 로컬 머신에 아무것도 설치할 필요가 없습니다.

**Kinesis Data Generator를 설정하고 실행하려면:**

1. [Kinesis Data Generator 설명서](https://awslabs.github.io/amazon-kinesis-data-generator/web/help.html)의 지침에 따라 도구에 대한 액세스를 설정합니다. 사용자와 암호를 설정하는 CloudFormation 템플릿을 실행합니다.

1. CloudFormation 템플릿에서 생성된 URL을 통해 Kinesis Data Generator에 액세스합니다. CloudFormation 템플릿이 완료되면 **출력** 탭에서 URL을 확인할 수 있습니다.

1. 데이터 생성기를 구성합니다.
   + **리전:** 이 자습서에서 사용 중인 리전(us-east-1)을 선택합니다.
   + **스트림/전송 스트림:** 애플리케이션이 사용할 입력 스트림을 선택합니다. 예: `ExampleInputStream`
   + **초당 레코드 수:** 100
   + **레코드 템플릿:** 다음 템플릿을 복사하여 붙여 넣습니다.

     ```
     {
       "event_time" : "{{date.now("YYYY-MM-DDTkk:mm:ss.SSSSS")}},
       "ticker" : "{{random.arrayElement(
             ["AAPL", "AMZN", "MSFT", "INTC", "TBV"]
         )}}",
       "price" : {{random.number(100)}}          
     }
     ```

1. 템플릿 테스트: **템플릿 테스트**를 선택하고 생성된 레코드가 다음과 유사한지 확인합니다.

   ```
   { "event_time" : "2024-06-12T15:08:32.04800, "ticker" : "INTC", "price" : 7 }
   ```

1. 데이터 생성기 시작: **데이터 전송 선택**을 선택합니다.

Kinesis Data Generator가 이제 `ExampleInputStream`으로 데이터를 전송하고 있습니다.

## 로컬에서 애플리케이션 실행
<a name="get-started-exercise-5-run"></a>

IDE에서 Flink 애플리케이션을 로컬로 실행하고 디버깅할 수 있습니다.

**참고**  
계속하기 전에 입력 스트림과 출력 스트림 사용이 가능한지 확인합니다. [2개의 Amazon Kinesis 데이터 스트림 생성](#get-started-exercise-1)을(를) 참조하세요. 또한 두 스트림에 대해 읽기 및 쓰기 권한이 있는지 확인합니다. [AWS 세션 인증](#get-started-exercise-2-5)을(를) 참조하세요.  
로컬 개발 환경을 설정하려면 Java 11 JDK, Apache Maven 및 Java 개발용 IDE가 필요합니다. 필수 사전 조건을 충족하는지 확인합니다. [연습 완료를 위한 필수 조건 충족](getting-started.md#setting-up-prerequisites)을(를) 참조하세요.

### Java 프로젝트를 IDE로 가져오기
<a name="get-started-exercise-5-run-1"></a>

IDE에서 애플리케이션 작업을 시작하려면 이를 Java 프로젝트로 가져와야 합니다.

복제한 리포지토리에는 여러 예제가 포함되어 있습니다. 각 예제는 별도의 프로젝트입니다. 이 자습서에서는 `./java/GettingStarted` 하위 디렉터리의 내용을 IDE로 가져옵니다.

코드를 Maven을 사용하는 기존 Java 프로젝트로 삽입합니다.

**참고**  
새 Java 프로젝트를 가져오는 정확한 프로세스는 사용 중인 IDE에 따라 다릅니다.

### 로컬 애플리케이션 구성 확인
<a name="get-started-exercise-5-run-2"></a>

로컬에서 실행할 때 애플리케이션은 `./src/main/resources` 경로 아래 프로젝트의 리소스 폴더에 있는 `application_properties.json` 파일의 구성을 사용합니다. Kinesis 스트림 이름이나 리전을 변경하려면 이 파일을 편집할 수 있습니다.

```
[
  {
    "PropertyGroupId": "InputStream0",
    "PropertyMap": {
      "stream.name": "ExampleInputStream",
      "flink.stream.initpos": "LATEST",
      "aws.region": "us-east-1"
    }
  },
  {
    "PropertyGroupId": "OutputStream0",
    "PropertyMap": {
      "stream.name": "ExampleOutputStream",
      "aws.region": "us-east-1"
    }
  }
]
```

### IDE 실행 구성 설정
<a name="get-started-exercise-5-run-3"></a>

IDE에서 `com.amazonaws.services.msf.BasicStreamingJob` 기본 클래스를 실행하여 Flink 애플리케이션을 Java 애플리케이션처럼 직접 실행하고 디버깅할 수 있습니다. 애플리케이션을 실행하기 전에 실행 구성을 설정해야 합니다. 설정은 사용 중인 IDE에 따라 다릅니다. 예를 들어 IntelliJ IDEA 설명서의 [구성 요소 실행 및 디버깅](https://www.jetbrains.com/help/idea/run-debug-configuration.html)을 참조하세요. 특히 다음을 설정해야 합니다.

1. **클래스 경로에 `provided` 종속성을 추가합니다**. 이는 로컬에서 실행할 때 `provided` 범위의 종속성이 애플리케이션에 전달되도록 하는 데 필요합니다. 이 설정이 없으면 애플리케이션은 즉시 `class not found` 오류를 표시합니다.

1. **자격 AWS 증명을 전달하여 Kinesis 스트림에 액세스합니다**. 가장 빠른 방법은 [AWS Toolkit for IntelliJ IDEA](https://aws.amazon.com/intellij/)를 사용하는 것입니다. 실행 구성에서이 IDE 플러그인을 사용하여 특정 AWS 프로필을 선택할 수 있습니다. AWS 인증은이 프로필을 사용하여 수행됩니다. 따라서 AWS 자격 증명을 직접 전달할 필요가 없습니다.

1. IDE가 **JDK 11**을 사용하여 애플리케이션을 실행하는지 확인합니다.

### IDE에서 애플리케이션 실행
<a name="get-started-exercise-5-run-4"></a>

`BasicStreamingJob`에 대한 실행 구성을 설정한 후에는 일반 Java 애플리케이션처럼 실행하거나 디버깅할 수 있습니다.

**참고**  
Maven이 생성한 fat-jar는 명령줄에서 `java -jar ...`로 직접 실행할 수 없습니다. 이 jar에는 애플리케이션을 독립적으로 실행하는 데 필요한 Flink 코어 종속성이 포함되어 있지 않습니다.

애플리케이션이 성공적으로 시작되면 독립 실행형 미니클러스터 및 커넥터 초기화에 대한 일부 정보를 로그로 기록합니다. 그 이후에는 애플리케이션이 시작될 때 Flink가 일반적으로 출력하는 여러 INFO 로그와 일부 WARN 로그가 이어집니다.

```
13:43:31,405 INFO  com.amazonaws.services.msf.BasicStreamingJob                 [] - Loading application properties from 'flink-application-properties-dev.json'
13:43:31,549 INFO  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Flink Kinesis Consumer is going to read the following streams: ExampleInputStream, 
13:43:31,676 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.cpu.cores required for local execution is not set, setting it to the maximal possible value.
13:43:31,676 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.task.heap.size required for local execution is not set, setting it to the maximal possible value.
13:43:31,676 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.task.off-heap.size required for local execution is not set, setting it to the maximal possible value.
13:43:31,676 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.network.min required for local execution is not set, setting it to its default value 64 mb.
13:43:31,676 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.network.max required for local execution is not set, setting it to its default value 64 mb.
13:43:31,676 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.managed.size required for local execution is not set, setting it to its default value 128 mb.
13:43:31,677 INFO  org.apache.flink.runtime.minicluster.MiniCluster             [] - Starting Flink Mini Cluster
....
```

초기화가 완료되면 애플리케이션은 더 이상 로그 항목을 출력하지 않습니다. **데이터가 흐르는 동안에는 로그가 출력되지 않습니다.**

애플리케이션이 데이터를 올바르게 처리하고 있는지 확인하려면 다음 섹션에 설명된 대로 입력 및 출력 Kinesis 스트림을 검사할 수 있습니다.

**참고**  
 데이터 흐름에 대한 로그를 출력하지 않는 것은 Flink 애플리케이션의 정상적인 동작입니다. 레코드마다 로그를 출력하면 디버깅에는 유용할 수 있지만 프로덕션 환경에서 실행할 때는 상당한 오버헤드를 초래할 수 있습니다.

## Kinesis 스트림의 입력 및 출력 데이터 관찰
<a name="get-started-exercise-input-output"></a>

Python(샘플 데이터를 생성하는 Python) 또는 Kinesis Data Generator(링크)을 사용하여 입력 스트림으로 전송된 레코드를 Amazon Kinesis 콘솔의 **데이터 뷰어**에서 관찰할 수 있습니다.

**레코드를 관찰하려면**

1. [https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis)에서 Kinesis 콘솔을 엽니다.

1. 이 자습서를 실행 중인 리전이 기본값인 us-east-1 미국 동부(버지니아 북부)와 동일한지 확인합니다. 일치하지 않으면 리전을 변경합니다.

1. **데이터 스트림**을 선택합니다.

1. `ExampleInputStream` 또는 `ExampleOutputStream.` 중에서 관찰할 스트림을 선택합니다.

1. **데이터 뷰어** 탭을 선택합니다.

1. **샤드**를 선택하고 **시작 위치**를 **최신**으로 유지한 다음 **레코드 가져오기**를 선택합니다. ‘이 요청에 대한 레코드를 찾을 수 없음’ 오류가 표시될 수 있습니다. 그렇다면 **레코드 가져오기 재시도**를 선택합니다. 스트림에 게시된 최신 레코드가 표시됩니다.

1. 데이터 열의 값을 선택하여 레코드 내용을 JSON 형식으로 확인합니다.

## 로컬에서 실행 중인 애플리케이션 중지
<a name="get-started-exercise-stop"></a>

IDE에서 실행 중인 애플리케이션을 중지합니다. IDE는 일반적으로 ‘중지’ 옵션을 제공합니다. 정확한 위치와 방법은 사용 중인 IDE에 따라 다릅니다.

## 애플리케이션 코드 컴파일 및 패키징
<a name="get-started-exercise-5-5"></a>

이 섹션에서는 Apache Maven을 사용하여 Java 코드를 컴파일하고 JAR 파일로 패키징합니다. Maven 명령줄 도구 또는 IDE를 사용하여 코드를 컴파일하고 패키징할 수 있습니다.

**Maven 명령줄을 사용하여 컴파일 및 패키징하려면:**

Java GettingStarted 프로젝트가 포함된 디렉터리로 이동하여 다음 명령을 실행합니다.

```
$ mvn package
```

**IDE를 사용하여 컴파일 및 패키징하려면:**

IDE Maven 통합 기능을 사용하여 `mvn package`를 실행합니다.

두 경우 다음 JAR 파일이 생성됩니다. `target/amazon-msf-java-stream-app-1.0.jar` 

**참고**  
 IDE에서 ‘프로젝트 빌드’를 실행해도 JAR 파일이 생성되지 않을 수 있습니다.

## 애플리케이션 코드 JAR 파일 업로드
<a name="get-started-exercise-6"></a>

이 섹션에서는 이전 섹션에서 생성한 JAR 파일을 자습서 시작 부분에서 생성한 Amazon Simple Storage Service(Amazon S3) 버킷에 업로드합니다. 이 단계를 아직 완료하지 않았으면 (링크) 섹션을 참조하세요.

**애플리케이션 코드 JAR 파일을 업로드하려면**

1. [https://console.aws.amazon.com/s3/](https://console.aws.amazon.com/s3/)에서 S3 콘솔을 엽니다.

1. 애플리케이션 코드를 위해 이전에 생성한 버킷을 선택합니다.

1. **업로드**를 선택합니다.

1. **Add Files**를 선택합니다.

1. 이전 단계 `target/amazon-msf-java-stream-app-1.0.jar`에서 생성한 JAR 파일로 이동합니다.

1. 다른 설정을 변경하지 않고 **업로드**를 선택합니다.

**주의**  
`<repo-dir>/java/GettingStarted/target/amazon-msf-java-stream-app-1.0.jar`에서 반드시 올바른 JAR 파일을 선택해야 합니다.  
`target` 디렉터리에는 업로드할 필요가 없는 다른 JAR 파일도 포함되어 있습니다.

## Managed Service for Apache Flink 애플리케이션 생성 및 구성
<a name="get-started-exercise-7"></a>

콘솔이나 AWS CLI를 사용하여 Managed Service for Apache Flink 애플리케이션을 생성하고 실행할 수 있습니다. 이 자습서에서는 콘솔을 사용합니다.

**참고**  
콘솔을 사용하여 애플리케이션을 생성하면 AWS Identity and Access Management (IAM) 및 Amazon CloudWatch Logs 리소스가 자동으로 생성됩니다. 를 사용하여 애플리케이션을 생성할 때 이러한 리소스를 별도로 AWS CLI생성합니다.

**Topics**
+ [애플리케이션 생성](#get-started-exercise-7-console-create)
+ [IAM 정책 편집](#get-started-exercise-7-console-iam)
+ [애플리케이션 구성](#get-started-exercise-7-console-configure)
+ [애플리케이션을 실행합니다](#get-started-exercise-7-console-run)
+ [실행 중인 애플리케이션의 지표 관찰](#get-started-exercise-7-console-stop)
+ [Kinesis 스트림의 출력 데이터 관찰](#get-started-exercise-7-console-output)
+ [애플리케이션 중지](#get-started-exercise-stop)

### 애플리케이션 생성
<a name="get-started-exercise-7-console-create"></a>

**애플리케이션을 생성하려면**

1. 에 로그인 AWS Management Console하고 https://console.aws.amazon.com/flink Amazon MSF 콘솔을 엽니다.

1. us-east-1 미국 동부(버지니아 북부) 리전이 올바르게 선택되었는지 확인합니다.

1. 오른쪽의 메뉴를 열고 **Apache Flink 애플리케이션**을 선택한 다음 **스트리밍 애플리케이션 생성**을 선택합니다. 또는 초기 페이지의 시작하기 컨테이너에서 **스트리밍 애플리케이션 생성**을 선택합니다.

1. **스트리밍 애플리케이션 생성** 페이지에서 다음을 수행합니다.
   + **스트림 처리 애플리케이션을 설정하는 방법 선택:** **처음부터 생성**을 선택합니다.
   + **Apache Flink 구성, 애플리케이션 Flink 버전:** **Apache Flink 1.20**을 선택합니다.

1. 애플리케이션 구성
   + **애플리케이션 이름:** **MyApplication**을 입력합니다.
   + **설명:** **My java test app**을 입력합니다.
   + **애플리케이션 리소스 액세스:** **필요한 정책과 함께 IAM 역할 `kinesis-analytics-MyApplication-us-east-1` 생성 및 업데이트**를 선택합니다.

1. **애플리케이션 설정 템플릿** 구성
   + **템플릿:** **개발**을 선택합니다.

1. 페이지 하단에서 **스트리밍 애플리케이션 생성**을 선택합니다.

**참고**  
콘솔을 사용하여 Managed Service for Apache Flink 애플리케이션을 만들 때 내 애플리케이션에 대한 IAM 역할 및 정책을 둘 수 있는 옵션이 있습니다. 귀하의 애플리케이션은 이 역할 및 정책을 사용하여 종속 리소스에 액세스합니다. 이러한 IAM 리소스의 이름은 애플리케이션 명칭과 리전을 사용하여 다음과 같이 지정됩니다.  
정책: `kinesis-analytics-service-MyApplication-us-east-1`
역할: `kinesisanalytics-MyApplication-us-east-1`
Amazon Managed Service for Apache Flink는 이전에 Kinesis Data Analytics로 알려졌습니다. 자동으로 생성되는 리소스의 이름은 이전 버전과의 호환성을 위해 `kinesis-analytics-` 접두사를 사용합니다.

### IAM 정책 편집
<a name="get-started-exercise-7-console-iam"></a>

IAM 정책을 편집하여 Kinesis Data Streams에 액세스할 수 있는 권한을 추가합니다.

**정책을 편집하려면:**

1. [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/)에서 IAM 콘솔을 여세요.

1. **정책**을 선택하세요. 이전 섹션에서 콘솔이 생성한 **`kinesis-analytics-service-MyApplication-us-east-1`** 정책을 선택합니다.

1. **편집**을 선택한 후 **JSON** 탭을 선택합니다.

1. 다음 정책 예제의 강조 표시된 부분을 정책에 추가하세요. 샘플 계정 ID(*012345678901*)를 내 계정 ID로 바꿉니다.

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Sid": "ReadCode",
               "Effect": "Allow",
               "Action": [
                   "s3:GetObject",
                   "s3:GetObjectVersion"
               ],
               "Resource": [
                   "arn:aws:s3:::my-bucket/kinesis-analytics-placeholder-s3-object"
               ]
           },
           {
               "Sid": "ListCloudwatchLogGroups",
               "Effect": "Allow",
               "Action": [
                   "logs:DescribeLogGroups"
               ],
               "Resource": [
                   "arn:aws:logs:us-east-1:012345678901:log-group:*"
               ]
           },
           {
               "Sid": "ListCloudwatchLogStreams",
               "Effect": "Allow",
               "Action": [
                   "logs:DescribeLogStreams"
               ],
               "Resource": [
                   "arn:aws:logs:us-east-1:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*"
               ]
           },
           {
               "Sid": "PutCloudwatchLogs",
               "Effect": "Allow",
               "Action": [
                   "logs:PutLogEvents"
               ],
               "Resource": [
                   "arn:aws:logs:us-east-1:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream"
               ]
           },
           {
               "Sid": "ReadInputStream",
               "Effect": "Allow",
               "Action": "kinesis:*",
               "Resource": "arn:aws:kinesis:us-east-1:012345678901:stream/ExampleInputStream"
           },
           {
               "Sid": "WriteOutputStream",
               "Effect": "Allow",
               "Action": "kinesis:*",
               "Resource": "arn:aws:kinesis:us-east-1:012345678901:stream/ExampleOutputStream"
           }
       ]
   }
   ```

------

1.  페이지 하단에서 **다음**을 선택한 다음 **변경 사항 저장**을 선택합니다.

### 애플리케이션 구성
<a name="get-started-exercise-7-console-configure"></a>

애플리케이션 구성을 편집하여 애플리케이션 코드 아티팩트를 설정합니다.

**구성을 편집하려면**

1. **MyApplication** 페이지에서 **구성**을 선택합니다.

1. **애플리케이션 코드 위치** 섹션에서 다음을 수행합니다.
   + **Amazon S3 버킷**의 경우 애플리케이션 코드를 위해 이전에 생성한 버킷을 선택합니다. **찾아보기**를 선택하고 올바른 버킷을 선택한 다음 **선택**을 선택합니다. 버킷 이름 자체를 클릭하지는 마세요.
   + **Amazon S3 객체 경로**에는 **amazon-msf-java-stream-app-1.0.jar**를 입력합니다.

1. **액세스 권한**에서 **IAM 역할 `kinesis-analytics-MyApplication-us-east-1` 생성 및 업데이트**를 선택합니다.

1. **런타임 속성** 섹션에서 다음 속성을 추가합니다.

1. **새 항목 추가**를 선택하고 다음 각 파라미터를 추가합니다.    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/ko_kr/managed-flink/latest/java/get-started-exercise.html)

1. 다른 어떤 섹션도 수정하지 않습니다.

1. **변경 사항 저장**을 선택합니다.

**참고**  
Amazon CloudWatch 로깅을 활성화하도록 선택하면 Managed Service for Apache Flink에서 로그 그룹 및 로그 스트림을 생성합니다. 이러한 리소스의 이름은 다음과 같습니다.  
로그 그룹: `/aws/kinesis-analytics/MyApplication`
로그 스트림: `kinesis-analytics-log-stream`

### 애플리케이션을 실행합니다
<a name="get-started-exercise-7-console-run"></a>

이제 애플리케이션이 구성이 완료되었으며 실행할 준비가 되었습니다.

**애플리케이션을 실행하려면**

1. Amazon Managed Service for Apache Flink 콘솔에서 **내 애플리케이션**을 선택하고 **실행**을 선택합니다.

1. 다음 페이지의 애플리케이션 복원 구성 페이지에서 **최신 스냅샷으로 실행**을 선택한 다음 **실행**을 선택합니다.

   **애플리케이션 세부 정보**의 **상태**는 애플리케이션이 시작되면 `Ready`에서 `Starting`으로 그리고 `Running`으로 전환됩니다.

애플리케이션이 `Running` 상태일 때 Flink 대시보드를 열 수 있습니다.

**대시보드 열기**

1. **Apache Flink 대시보드 열기**를 선택합니다. 대시보드가 새 페이지에서 열립니다.

1. **실행 중 작업** 목록에서 보이는 단일 작업을 선택합니다.
**참고**  
런타임 속성을 잘못 설정하거나 IAM 정책을 잘못 편집한 경우 애플리케이션 상태는 `Running`으로 변경될 수 있지만, Flink 대시보드에는 작업이 계속 재시작되는 것으로 표시됩니다. 이는 애플리케이션이 잘못 구성되었거나 외부 리소스에 액세스할 권한이 부족할 때 흔히 발생하는 장애 시나리오입니다.  
이러한 상황이 발생하면 Flink 대시보드의 **예외** 탭을 확인하여 문제의 원인을 확인합니다.

### 실행 중인 애플리케이션의 지표 관찰
<a name="get-started-exercise-7-console-stop"></a>

**MyApplication** 페이지의 **Amazon CloudWatch 지표** 섹션을 보면 실행 중인 애플리케이션의 일부 기본 지표를 확인할 수 있습니다.

**지표를 보려면**

1. **새로 고침** 버튼 옆의 드롭다운 목록에서 **10 seconds**를 선택합니다.

1. 애플리케이션이 실행 중이고 정상이면 **가동 시간** 지표가 계속 증가하는 것을 확인할 수 있습니다.

1. **fullrestarts** 지표는 0이어야 합니다. 값이 증가하고 있다면 구성에 문제가 있을 수 있습니다. 문제를 조사하려면 Flink 대시보드의 **예외** 탭을 검토합니다.

1. **실패한 체크포인트 수** 지표는 정상 애플리케이션에서는 0이어야 합니다.
**참고**  
이 대시보드는 고정된 지표 세트를 5분 단위로 표시합니다. CloudWatch 대시보드에서 어떤 지표든 사용하여 사용자 지정 애플리케이션 대시보드를 생성할 수 있습니다.

### Kinesis 스트림의 출력 데이터 관찰
<a name="get-started-exercise-7-console-output"></a>

Python 스크립트 또는 Kinesis Data Generator를 사용하여 입력 스트림에 데이터를 계속 게시하고 있는지 확인합니다.

이제 앞서 수행한 것처럼 [https://console.aws.amazon.com/kinesis/](https://console.aws.amazon.com/kinesis/)의 데이터 뷰어를 사용하여 Managed Service for Apache Flink에서 실행 중인 애플리케이션의 출력을 관찰할 수 있습니다.

**출력을 보려면**

1. [https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis)에서 Kinesis 콘솔을 엽니다.

1. 이 자습서를 실행하는 데 사용하는 리전과 동일한 리전이 선택되어 있는지 확인합니다. 기본값은 us-east-1 미국 동부(버지니아 북부)입니다. 필요시 리전을 변경합니다.

1. **데이터 스트림을** 선택합니다.

1. 관찰하려는 스트림을 선택합니다. 본 자습서에서는 `ExampleOutputStream`를 사용합니다.

1.  **데이터 뷰어** 탭을 선택합니다.

1. **샤드**를 선택하고 **시작 위치**를 **최신**으로 유지한 다음 **레코드 가져오기**를 선택합니다. ‘이 요청에 대한 레코드를 찾을 수 없음’ 오류가 표시될 수 있습니다. 그렇다면 **레코드 가져오기 재시도**를 선택합니다. 스트림에 게시된 최신 레코드가 표시됩니다.

1. 데이터 열의 값을 선택하여 레코드 내용을 JSON 형식으로 확인합니다.

### 애플리케이션 중지
<a name="get-started-exercise-stop"></a>

애플리케이션을 중지하려면 `MyApplication`이라는 Managed Service for Apache Flink 애플리케이션의 콘솔 페이지로 이동합니다.

**애플리케이션을 중지하려면**

1. **작업** 드롭다운 목록에서 **정지**를 선택합니다.

1. **애플리케이션 세부 정보**의 **상태**는 애플리케이션이 완전히 중지되면 `Running`에서 `Stopping`으로 그리고 `Ready`로 전환됩니다.
**참고**  
Python 스크립트 또는 Kinesis Data Generator에서 입력 스트림으로 데이터를 전송하는 것도 잊지 말고 중단합니다.

## 다음 단계
<a name="get-started-exercise-next-step-4"></a>

[AWS 리소스 정리](getting-started-cleanup.md)