

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# Python 用 Amazon Managed Service for Apache Flink 入門
<a name="gs-python"></a>

このセクションでは、Python とテーブル API を使用した Apache Flink 向けマネージドサービスの基本概念を紹介します。アプリケーションの作成とテストに使用できるオプションについて説明します。また、このガイドのチュートリアルを完了し、初めてアプリケーションを作成するのに必要なツールのインストール方法についても説明します。

**Topics**
+ [

## Managed Service for Apache Flink アプリケーションのコンポーネントを確認する
](#gs-python-table-components)
+ [

## 前提条件を満たす
](#gs-python-prerequisites)
+ [

# Python アプリケーション用の Apache Flink の作成と実行
](gs-python-createapp.md)
+ [

# AWS リソースをクリーンアップする
](gs-python-cleanup.md)

## Managed Service for Apache Flink アプリケーションのコンポーネントを確認する
<a name="gs-python-table-components"></a>

**注記**  
Amazon Managed Service for Apache Flink はすべての [Apache Flink API](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/concepts/overview/#flinks-apis) をサポートします。選択する API により、アプリケーションの構造が若干異なります。Python で Apache Flink アプリケーションを開発する一般的なアプローチの 1 つは、Python コードに埋め込まれた SQL を使用してアプリケーションフローを定義することです。これは、次の入門チュートリアルで従うアプローチです。

データを処理するため、Managed Service for Apache Flink アプリケーションでは、Apache Flink ランタイムを使用して入力を処理し、出力を生成するデータフローを定義する Python スクリプトが使用されます。

通常の Managed Service for Apache Flink アプリケーションには、次のコンポーネントがあります。
+ **ランタイムプロパティ:** *ランタイムプロパティ* を使用すると、アプリケーションコードを再コンパイルせずにアプリケーションを設定できます。
+ **ソース:** アプリケーションは 1 つ以上の*ソース*からデータを消費します。ソースは[コネクタ](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/overview/)を使用して、Kinesis データストリームや Amazon MSK トピックなどの外部システムからデータを読み込みます。特殊なコネクタを使用して、アプリケーション内からデータを生成することもできます。SQL を使用すると、アプリケーションによってソースが*ソーステーブル*として定義されます。
+ **変換:** アプリケーションにより、データをフィルタリング、強化、集計できる 1 つ以上の*変換*を使用してデータが処理されます。SQL を使用すると、アプリケーションによって変換が SQL クエリとして定義されます。
+ **シンク:** アプリケーションにより、*シンク*を通じて外部ソースにデータが送信されます。シンクは[コネクタ](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/overview/)を使用して、Kinesis データストリーム、Amazon MSK トピック、Amazon S3 バケット、リレーショナルデータベースなどの外部システムにデータを送信します。特別なコネクタを使用して、開発目的として出力データを出力することもできます。SQL を使用すると、アプリケーションは結果を挿入する*シンクテーブル*として、シンクを定義します。詳細については、「[Managed Service for Apache Flink でシンクを使用してデータを書き込む](how-sinks.md)」を参照してください。

Python アプリケーションには、外部依存関係が必要な場合もあります (アプリケーションが使用する追加の Python ライブラリや Flink コネクタなど)。アプリケーションをパッケージ化するとき、アプリケーションに必要なすべての依存関係を含める必要があります。このチュートリアルでは、コネクタの依存関係を含める方法、ならびに Amazon Managed Service for Apache Flink にデプロイするためにアプリケーションをパッケージ化する方法について説明します。

## 前提条件を満たす
<a name="gs-python-prerequisites"></a>

このチュートリアルを完了するには、以下が必要です。
+ **Python 3.11** は [VirtualEnv (venv)](https://docs.python.org/3.11/library/venv.html)、[Conda](https://docs.conda.io/en/latest/)、[Miniconda](https://docs.anaconda.com/miniconda/) などのスタンドアロン環境を使用することが推奨されます。
+  [Git クライアント](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git) - Git クライアントをまだインストールしていない場合、インストールします。
+ [Java Development Kit (JDK) バージョン 11](https://www.oracle.com/java/technologies/downloads/#java11) - Java JDK 11 をインストールして、`JAVA_HOME` 環境変数がインストール場所を指すように設定します。JDK 11 がない場合、[Amazon Corretto](https://docs.aws.amazon.com/corretto) または推奨される任意の標準 JDK を使用できます。
  + JDK が正しくインストールされていることを確認するには、次のコマンドを実行します。Amazon Corretto 11 以外の JDK を使用している場合、出力は異なります。バージョンが 11.x であることを確認してください。

    ```
    $ java --version
    
    openjdk 11.0.23 2024-04-16 LTS
    OpenJDK Runtime Environment Corretto-11.0.23.9.1 (build 11.0.23+9-LTS)
    OpenJDK 64-Bit Server VM Corretto-11.0.23.9.1 (build 11.0.23+9-LTS, mixed mode)
    ```
+ [Apache Maven](https://maven.apache.org/) - Apache Maven をまだインストールしていない場合、インストールします。詳細については、「[Installing Apache Maven](https://maven.apache.org/install.html)」を参照してください。
  + Apache Maven のインストールをテストするには、次のコマンドを実行します。

    ```
    $ mvn -version
    ```

**注記**  
アプリケーションは Python で記述されますが、Apache Flink は Java 仮想マシン (JVM) で実行されます。Kinesis コネクタなど、ほとんどの依存関係が JAR ファイルとして配布されます。これらの依存関係を管理してアプリケーションを ZIP ファイルにパッケージ化するには、[Apache Maven](https://maven.apache.org/) を使用します。このチュートリアルでは、この方法について説明します。

**警告**  
ローカル開発には、Python 3.11 を使用することをお勧めします。Flink ランタイム 1.19 を使用した Amazon Managed Service for Apache Flink で使用される Python バージョンと同じです。  
Python 3.12 に Python Flink ライブラリ 1.19 をインストールすると、失敗する可能性があります。  
デフォルトでマシンに別の Python バージョンがインストールされている場合、Python 3.11 を使用して VirtualEnv などのスタンドアロン環境を作成することをお勧めします。

**ローカル開発用の IDE**

[PyCharm](https://www.jetbrains.com/pycharm/) や [Visual Studio Code](https://code.visualstudio.com/) などの開発環境を使用して、アプリケーションを開発およびコンパイルすることをお勧めします。

その後、[Amazon Managed Service for Apache Flink (DataStream API) の概要](getting-started.md) の最初の 2 ステップを完了します。
+ [AWS アカウントをセットアップし、管理者ユーザーを作成する](setting-up.md)
+ [AWS Command Line Interface (AWS CLI) のセットアップ](setup-awscli.md)

開始するには、「[ アプリケーションの作成](gs-python-createapp.md)」を参照してください。

# Python アプリケーション用の Apache Flink の作成と実行
<a name="gs-python-createapp"></a>

このセクションでは、ソースおよびシンクとして Kinesis ストリームを使用して、Python アプリケーション用の Managed Service for Apache Flink を作成します。

**Topics**
+ [

## 依存リソースを作成する
](#gs-python-resources)
+ [

## ローカルの開発環境のセットアップ
](#gs-python-set-up)
+ [

## Apache Flink ストリーミング Python コードをダウンロードして検証する
](#gs-python-download)
+ [

## JAR 依存関係を管理する
](#gs-python-jar-dependencies)
+ [

## 入力ストリームにサンプルレコードを書き込む
](#gs-python-sample-records)
+ [

## アプリケーションをローカルで実行する
](#gs-python-run-locally)
+ [

## Kinesis ストリームで入出力データを観察する
](#gs-python-observe-input-output)
+ [

## ローカルで実行されているアプリケーションを停止する
](#gs-python-stop)
+ [

## アプリケーションコードをパッケージ化する
](#gs-python-package-code)
+ [

## Amazon S3 バケットにデモアプリケーションをアップロードする
](#gs-python-upload-bucket)
+ [

## Managed Service for Apache Flink アプリケーションを作成して設定する
](#gs-python-7)
+ [

## 次のステップ
](#gs-python-next-step-4)

## 依存リソースを作成する
<a name="gs-python-resources"></a>

このエクササイズで Apache Flink 用 Managed Service を作成する前に、以下の依存リソースを作成します。
+ 入力用と出力用の 2 つの Kinesis ストリーム。
+ アプリケーションのコードを保存する Amazon S3 バケット。

**注記**  
このチュートリアルでは、アプリケーションを us-east-1 リージョンにデプロイすることが前提とされます。別のリージョンを使用する場合、必要に応じてすべてのステップを調整する必要があります。

### 2 つの Kinesis ストリームを作成する
<a name="gs-python-resources-streams"></a>

この演習用に Managed Service for Apache Flink アプリケーションを作成する前に、アプリケーション (この例では us-east-1) のデプロイに使用するのと同じリージョンで 2 つの Kinesis Data Streams (`ExampleInputStream` および `ExampleOutputStream`) を作成します。アプリケーションでは、これらのストリームを使用してアプリケーションの送信元と送信先のストリームを選択します。

これらのストリームは Amazon Kinesis コンソールまたは次の AWS CLI コマンドを使用して作成できます。コンソールの操作方法については、「*Amazon Kinesis Data Streams デベロッパーガイド*」の「[Creating and Updating Data Streams](https://docs.aws.amazon.com/kinesis/latest/dev/amazon-kinesis-streams.html)」を参照してください。

**データストリームを作成するには (AWS CLI)**

1. 最初のストリーム (`ExampleInputStream`) を作成するには、次の Amazon Kinesis `create-stream` AWS CLI コマンドを使用します。

   ```
   $ aws kinesis create-stream \
   --stream-name ExampleInputStream \
   --shard-count 1 \
   --region us-east-1
   ```

1. アプリケーションが出力の書き込みに使用する 2 つめのストリームを作成するには、ストリーム名を `ExampleOutputStream` に変更して同じコマンドを実行します。

   ```
   $ aws kinesis create-stream \
   --stream-name ExampleOutputStream \
   --shard-count 1 \
   --region us-east-1
   ```

### Amazon S3 バケットを作成する
<a name="gs-python-resources-s3"></a>

Amazon S3 バケットは、コンソールを使用して作成できます。このリソースの作成手順については、次のトピックを参照してください。
+ 「*Amazon Simple Storage Service ユーザーガイド*」の「[How Do I Create an S3 Bucket?](https://docs.aws.amazon.com/AmazonS3/latest/userguide/create-bucket.html)」。Amazon S3 バケットにグローバルに一意の名前を付けます (例えば、ログイン名を追加して)。
**注記**  
このチュートリアルで使用するリージョン (us-east-1) で必ず S3 バケットを作成してください。

### その他のリソース
<a name="gs-python-resources-cw"></a>

アプリケーションを作成すると、Apache Flink 用 Managed Service によって次の Amazon CloudWatch リソースが作成されます（これらのリソースがまだ存在しない場合）。
+ `/AWS/KinesisAnalytics-java/<my-application>`という名前のロググループ。
+ `kinesis-analytics-log-stream` というログストリーム。

## ローカルの開発環境のセットアップ
<a name="gs-python-set-up"></a>

開発およびデバッグの場合、マシンで Python Flink アプリケーションを実行できます。`python main.py` または任意の Python IDE を使用して、コマンドラインからアプリケーションを起動できます。

**注記**  
開発マシンには Python 3.10 または 3.11 の他に、Java 11、Apache Maven、Git がインストールされている必要があります。[PyCharm](https://www.jetbrains.com/pycharm/) や [Visual Studio Code](https://code.visualstudio.com/) などの IDE を使用することをお勧めします。すべての前提条件を満たしていることを確認するには、先に進む前に「[演習を完了するための前提条件を満たす](gs-python.md#gs-python-prerequisites)」を参照してください。

### PyFlink ライブラリをインストールする
<a name="gs-python-install-pyflink"></a>

アプリケーションを開発してローカルで実行するには、Flink Python ライブラリをインストールする必要があります。

1. VirtualEnv、Conda、同様の Python ツールを使用して、スタンドアロンの Python 環境を作成します。

1. PyFlink ライブラリをその環境にインストールします。Amazon Managed Service for Apache Flink で使用する Apache Flink ランタイムバージョンと同じものを使用します。現在、推奨されるランタイムは 1.19.1 です。

   ```
   $ pip install apache-flink==1.19.1
   ```

1. アプリケーションを実行するときの環境がアクティブであることを確認してください。IDE でアプリケーションを実行する場合、IDE がランタイムとして環境を使用していることを確認してください。プロセスは使用している IDE によって異なります。
**注記**  
必要な準備は PyFlink ライブラリをインストールするだけです。マシンに Apache Flink クラスターをインストールする必要は**ありません**。

### AWS セッションを認証する
<a name="gs-python-authenticate"></a>

アプリケーションは Kinesis Data Streams を使用してデータを発行します。ローカルで実行する場合は、Kinesis データストリームに書き込むアクセス許可を持つ有効な AWS 認証済みセッションが必要です。次のステップに従って、セッションを認証します。

1.  AWS CLI と、有効な認証情報が設定された名前付きプロファイルがない場合は、「」を参照してください[AWS Command Line Interface (AWS CLI) のセットアップ](setup-awscli.md)。

1.  AWS CLI が正しく設定されており、ユーザーが次のテストレコードを発行して Kinesis データストリームに書き込むアクセス許可を持っていることを確認します。

   ```
   $ aws kinesis put-record --stream-name ExampleOutputStream --data TEST --partition-key TEST
   ```

1. IDE に統合するプラグインがある場合は AWS、それを使用して IDE で実行されているアプリケーションに認証情報を渡すことができます。詳細については、「[AWS Toolkit for PyCharm](https://aws.amazon.com/pycharm/)」、「[「AWS Toolkit for Visual Studio Code](https://aws.amazon.com/visualstudiocode/)」、「[AWS Toolkit for IntelliJ IDEA](https://aws.amazon.com/intellij/)」を参照してください。

## Apache Flink ストリーミング Python コードをダウンロードして検証する
<a name="gs-python-download"></a>

この例の Python アプリケーションコードは GitHub から入手できます。アプリケーションコードをダウンロードするには、次の操作を行います。

1. 次のコマンドを使用してリモートリポジトリのクローンを作成します。

   ```
   git clone https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples.git
   ```

1. `./python/GettingStarted` ディレクトリに移動します。

### アプリケーションコンポーネントを確認する
<a name="gs-python-review"></a>

アプリケーションコードは `main.py` にあります。Python に埋め込まれた SQL を使用して、アプリケーションのフローを定義します。

**注記**  
最適化された開発者エクスペリエンスをお使いのマシンで開発するため、アプリケーションは Amazon Managed Service for Apache Flink とローカルの両方でコードを変更せずに実行されるように設計されています。アプリケーションは環境変数 `IS_LOCAL = true` を使用して、ローカルで実行されているタイミングを検出します。環境変数 `IS_LOCAL = true` をシェルまたは IDE の実行設定で設定する必要があります。
+ アプリケーションは実行環境を設定して、ランタイム設定を読み取ります。Amazon Managed Service for Apache Flink およびローカルの両方で動作させるため、アプリケーションは `IS_LOCAL` 変数を確認します。
  + 次の内容は、アプリケーションを Amazon Managed Service for Apache Flink で実行するときのデフォルト動作です。

    1. アプリケーションでパッケージ化された依存関係を読み込む 詳細については、「(リンク)」 を参照してください。

    1. Amazon Managed Service for Apache Flink アプリケーションで定義したランタイムプロパティから設定を読み込みます。詳細については、「(リンク)」 を参照してください。
  + ローカルでアプリケーションを実行するときにアプリケーションが `IS_LOCAL = true` を検出した場合

    1. プロジェクトから外部依存関係を読み込みます。

    1. プロジェクトに含まれる `application_properties.json` ファイルから設定を読み込みます。

       ```
       ...
       APPLICATION_PROPERTIES_FILE_PATH = "/etc/flink/application_properties.json"
       ...
       is_local = (
           True if os.environ.get("IS_LOCAL") else False
       )
       ...
       if is_local:
           APPLICATION_PROPERTIES_FILE_PATH = "application_properties.json"
           CURRENT_DIR = os.path.dirname(os.path.realpath(__file__))
           table_env.get_config().get_configuration().set_string(
               "pipeline.jars",
               "file:///" + CURRENT_DIR + "/target/pyflink-dependencies.jar",
           )
       ```
+ アプリケーションは [Kinesis コネクタ](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/kinesis/)を使用して、`CREATE TABLE` ステートメントでソーステーブルが定義されます。このテーブルは、入力 Kinesis ストリームからデータを読み取ります。アプリケーションにより、ランタイム設定からストリームの名前、リージョン、初期位置が取得されます。

  ```
  table_env.execute_sql(f"""
          CREATE TABLE prices (
                  ticker VARCHAR(6),
                  price DOUBLE,
                  event_time TIMESTAMP(3),
                  WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
                )
                PARTITIONED BY (ticker)
                WITH (
                  'connector' = 'kinesis',
                  'stream' = '{input_stream_name}',
                  'aws.region' = '{input_stream_region}',
                  'format' = 'json',
                  'json.timestamp-format.standard' = 'ISO-8601'
                ) """)
  ```
+ この例では、アプリケーションで [Kinesis コネクタ](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/kinesis/) を使用してシンクテーブルも定義されます。このテーブルは出力 Kinesis ストリームにデータを送信します。

  ```
  table_env.execute_sql(f"""
              CREATE TABLE output (
                  ticker VARCHAR(6),
                  price DOUBLE,
                  event_time TIMESTAMP(3)
                )
                PARTITIONED BY (ticker)
                WITH (
                  'connector' = 'kinesis',
                  'stream' = '{output_stream_name}',
                  'aws.region' = '{output_stream_region}',
                  'sink.partitioner-field-delimiter' = ';',
                  'sink.batch.max-size' = '100',
                  'format' = 'json',
                  'json.timestamp-format.standard' = 'ISO-8601'
                )""")
  ```
+ 最後に、アプリケーションにより、ソーステーブルからシンクテーブルを `INSERT INTO...` する SQL が実行されます。より複雑なアプリケーションでは、シンクに書き込む前にデータを変換する追加のステップが含まれている可能性があります。

  ```
  table_result = table_env.execute_sql("""INSERT INTO output 
          SELECT ticker, price, event_time FROM prices""")
  ```
+ アプリケーションをローカルで実行するには、`main()` 関数の最後に別のステップを追加する必要があります。

  ```
  if is_local:
      table_result.wait()
  ```

  このステートメントがないと、ローカルで実行したときにアプリケーションは直ちに終了します。Amazon Managed Service for Apache Flink でアプリケーションを実行するとき、このステートメントを実行することはできません。

## JAR 依存関係を管理する
<a name="gs-python-jar-dependencies"></a>

通常、PyFlink アプリケーションには 1 つ以上のコネクタが必要です。このチュートリアルのアプリケーションは [Kinesis コネクタ](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/kinesis/) を使用します。Apache Flink は Java JVM で実行されるため、Python でアプリケーションを実装するかどうかを問わず、コネクタは JAR ファイルとして配布されます。Amazon Managed Service for Apache Flink にデプロイするとき、これらの依存関係をアプリケーションと一緒にパッケージ化する必要があります。

この例では、Apache Maven を使用して依存関係を取得し、アプリケーションをパッケージ化して Managed Service for Apache Flink で実行する方法について示されます。

**注記**  
依存関係を取得してパッケージ化する別の方法があります。この例では、1 つ以上のコネクタで正しく動作するメソッドが示されます。コードを変更せず、アプリケーションをローカルで開発用に実行することも、Managed Service for Apache Flink で実行することもできます。

### pom.xml ファイルを使用する
<a name="gs-python-jar-pom"></a>

Apache Maven は `pom.xml` ファイルを使用して、依存関係およびアプリケーションのパッケージ化を制御します。

JAR 依存関係は、`<dependencies>...</dependencies>` ブロックの `pom.xml` ファイルで指定されます。

```
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    ...
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kinesis</artifactId>
            <version>4.3.0-1.19</version>
        </dependency>
    </dependencies>
    ...
```

使用するコネクタの正しいアーティファクトおよびバージョンを確認するには、「[Managed Service for Apache Flink で Apache Flink コネクタを使用する](how-flink-connectors.md)」を参照してください。使用している Apache Flink のバージョンを必ず参照してください。この例では、Kinesis コネクタを使用します。Apache Flink 1.19 の場合、コネクタのバージョンは `4.3.0-1.19` です。

**注記**  
Apache Flink 1.19 を使用している場合、特にこのバージョン用にリリースされたコネクタのバージョンはありません。1.18 用にリリースされたコネクタを使用します。

### ダウンロードおよびパッケージ化の依存関係
<a name="gs-python-dependencies-download"></a>

Maven を使用して `pom.xml` ファイルで定義されている依存関係をダウンロードし、Python Flink アプリケーション用にパッケージ化します。

1. `python/GettingStarted` という Python 入門プロジェクトを含むディレクトリに移動します。

1. 次のコマンドを実行します。

```
$ mvn package
```

Maven は、`./target/pyflink-dependencies.jar` という名前の新しいファイルを作成します。マシンでローカルに開発するとき、Python アプリケーションはこのファイルを検索します。

**注記**  
このコマンドの実行を忘れて、アプリケーションを実行しようとした場合、**[識別子「kinesis」のファクトリが見つかりませんでした]** というエラーが表示され失敗します。

## 入力ストリームにサンプルレコードを書き込む
<a name="gs-python-sample-records"></a>

このセクションでは、アプリケーションが処理するサンプルレコードをストリームに送信します。サンプルデータを生成するオプションは 2 つあり、Python スクリプトまたは [Kinesis Data Generator](https://github.com/awslabs/amazon-kinesis-data-generator) のいずれかを使用します。

### Python スクリプトを使用してサンプルデータを生成する
<a name="gs-python-sample-data"></a>

Python スクリプトを使用して、サンプルレコードをストリームに送信できます。

**注記**  
この Python スクリプトを実行するには、Python 3.x を使用して [AWS SDK for Python (Boto)](https://aws.amazon.com/developer/language/python/) ライブラリがインストールされている必要があります。

**Kinesis 入力ストリームにテストデータの送信を開始する方法**

1. [Data Generator GitHub リポジトリ](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/python/data-generator)から Data Generator `stock.py` Python スクリプトをダウンロードします。

1. `stock.py` スクリプトを実行します。

   ```
   $ python stock.py
   ```

チュートリアルの残りの部分を実践する間、スクリプトを実行し続けてください。Apache Flink アプリケーションを実行できるようになりました。

### Kinesis Data Generator を使用してサンプルデータを生成する
<a name="gs-python-sample-kinesis"></a>

Python スクリプトを使用する代替手段として、[ホストバージョン](https://awslabs.github.io/amazon-kinesis-data-generator/web/producer.html)でも利用可能な [Kinesis Data Generator](https://github.com/awslabs/amazon-kinesis-data-generator) を使用し、ランダムなサンプルデータをストリームに送信できます。Kinesis Data Generator はブラウザで実行されるため、マシンに何もインストールする必要はありません。

**Kinesis Data Generator を設定して実行する方法**

1. 「[Kinesis Data Generator ドキュメント](https://awslabs.github.io/amazon-kinesis-data-generator/web/help.html)」の指示に従って、ツールへのアクセスを設定します。ユーザーとパスワードを設定する CloudFormation テンプレートを実行します。

1. CloudFormation テンプレートによって生成された URL を介して Kinesis Data Generator にアクセスします。CloudFormation テンプレートが完了したら、**[出力]** タブに URL が表示されます。

1. Data Generator を設定します。
   + **リージョン:** このチュートリアルで使用しているリージョン (us-east-1) を選択します。
   + **ストリーム/配信ストリーム:** アプリケーションが使用する入力ストリーム (`ExampleInputStream`) を選択します。
   + **1 秒あたりのレコード数:** 100
   + **レコードテンプレート:** 次のテンプレートをコピーして貼り付けます。

     ```
     {
       "event_time" : "{{date.now("YYYY-MM-DDTkk:mm:ss.SSSSS")}},
       "ticker" : "{{random.arrayElement(
             ["AAPL", "AMZN", "MSFT", "INTC", "TBV"]
         )}}",
       "price" : {{random.number(100)}}          
     }
     ```

1. テンプレートをテストする: **[テンプレートのテスト]** を選択し、生成されたレコードが次の内容と同じであることを確認してください。

   ```
   { "event_time" : "2024-06-12T15:08:32.04800, "ticker" : "INTC", "price" : 7 }
   ```

1. Data Generator を起動する: **[データ送信の選択]** を選択します。

現在、Kinesis Data Generator は `ExampleInputStream` にデータを送信しています。

## アプリケーションをローカルで実行する
<a name="gs-python-run-locally"></a>

`python main.py` を使用してコマンドラインから実行するか IDE から実行して、アプリケーションをローカルでテストできます。

アプリケーションをローカルで実行するには、前のセクションで記述されているとおり、PyFlink ライブラリの正しいバージョンがインストールされている必要があります。詳細については、「(リンク)」 を参照してください。

**注記**  
続行する前に、入力ストリームと出力ストリームが利用できることを確認してください。「[2 つの Amazon Kinesis Data Streams を作成する](get-started-exercise.md#get-started-exercise-1)」を参照してください。また、両方のストリームから読み書きするアクセス許可があることを確認してください。「[AWS セッションを認証する](get-started-exercise.md#get-started-exercise-2-5)」を参照してください。

### Python プロジェクトを IDE にインポートする
<a name="gs-python-import"></a>

IDE でアプリケーションの使用を開始するには、Python プロジェクトとしてインポートする必要があります。

クローンしたリポジトリには、複数の例が含まれています。各例は個別のプロジェクトです。このチュートリアルでは、`./python/GettingStarted` サブディレクトリのコンテンツを IDE にインポートします。

コードを既存の Python プロジェクトとしてインポートします。

**注記**  
新しい Python プロジェクトをインポートする正確なプロセスは、使用している IDE によって異なります。

### ローカルアプリケーション設定を確認する
<a name="gs-python-check-configuration"></a>

ローカルで実行すると、アプリケーションでは `./src/main/resources` のプロジェクトのリソースフォルダにある `application_properties.json` ファイルの設定が使用されます。このファイルを編集して、異なる Kinesis ストリーム名またはリージョンを使用できます。

```
[
  {
    "PropertyGroupId": "InputStream0",
    "PropertyMap": {
      "stream.name": "ExampleInputStream",
      "flink.stream.initpos": "LATEST",
      "aws.region": "us-east-1"
    }
  },
  {
    "PropertyGroupId": "OutputStream0",
    "PropertyMap": {
      "stream.name": "ExampleOutputStream",
      "aws.region": "us-east-1"
    }
  }
]
```

### Python アプリケーションをローカルで実行する
<a name="gs-python-run-locally"></a>

アプリケーションはローカルで実行できますが、コマンドラインから通常の Python スクリプトとして実行するか、IDE から実行できます。

**コマンドラインからアプリケーションを実行する方法**

1. Python Flink ライブラリをインストールした Conda や VirtualEnv などのスタンドアロン Python 環境が、現在アクティブであることを確認してください。

1. `mvn package` を少なくとも 1 回実行したことを確認してください。

1. `IS_LOCAL = true` 環境変数を設定します:

   ```
   $ export IS_LOCAL=true
   ```

1. アプリケーションを通常の Python スクリプトとして実行します。

   ```
   $python main.py
   ```

**IDE 内からアプリケーションを実行する方法**

1. 次の設定で `main.py` スクリプトを実行するように IDE を設定します。

   1. PyFlink ライブラリをインストールした Conda や VirtualEnv などのスタンドアロン Python 環境を使用します。

   1.  AWS 認証情報を使用して、入出力 Kinesis データストリームにアクセスします。

   1. `IS_LOCAL = true` を設定します。

1. 実行設定を設定する正確なプロセスは、IDE によって異なります。

1. IDE を設定したら、アプリケーションの実行中に Python スクリプトを実行し、IDE が提供するツールを使用します。

### アプリケーションログをローカルで確認する
<a name="gs-python-run-IDE"></a>

ローカルで実行するとき、アプリケーションはコンソールでログを表示しません。ただし、アプリケーションの起動時に数行が出力されて表示されます。PyFlink は、Python Flink ライブラリがインストールされているディレクトリ内のファイルにログを書き込みます。アプリケーションにより、起動時にログの場所が出力されます。次のコマンドを実行してログを取得することができます。

```
$ python -c "import pyflink;import os;print(os.path.dirname(os.path.abspath(pyflink.__file__))+'/log')"
```

1. ファイルをログ記録ディレクトリに一覧表示します。通常、1 つの `.log` ファイルがあります。

1. アプリケーションの実行中にファイルを監視します (`tail -f <log-path>/<log-file>.log`)。

## Kinesis ストリームで入出力データを観察する
<a name="gs-python-observe-input-output"></a>

Amazon Kinesis コンソールで **[データビューワー]** を使用することで、(生成サンプル Python) または Amazon Kinesis Data Generator (リンク) によって入力ストリームに送信されるレコードを観察できます。

**レコードを観察する方法**  Kinesis コンソール ([https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis)) を開きます。  リージョンが、このチュートリアルを実行しているリージョンと同じであることを確認してください。デフォルトでは us-east-1 米国東部 (バージニア北部) です。リージョンが一致しない場合は変更します。   **[データストリーム]** を選択します。   `ExampleInputStream` または `ExampleOutputStream.` のいずれか、観察するストリームを選択します。   **[データビューワー]** タブを選択します。   任意の **[シャード]** を選択し、**[最新]** を **[開始位置]** のままにして **[レコードを取得]** を選択します。「このリクエストのレコードが見つかりません」というエラーが表示される場合があります。この場合、**[レコードの取得を再試行]** を選択します。ストリームディスプレイに発行された最新レコード。   データ列の値を選択すると、レコードの内容を JSON 形式で確認できます。   

## ローカルで実行されているアプリケーションを停止する
<a name="gs-python-stop"></a>

IDE で実行されているアプリケーションを停止します。　 通常、IDE には「停止」オプションがあります。正確な場所および方法は IDE によって異なります。

## アプリケーションコードをパッケージ化する
<a name="gs-python-package-code"></a>

このセクションでは、Apache Maven を使用して、アプリケーションコードおよび必要な依存関係をすべて .zip ファイルにパッケージ化します。

Maven パッケージコマンドを再度実行します。

```
$ mvn package
```

このコマンドは `target/managed-flink-pyflink-getting-started-1.0.0.zip` ファイルを生成します。

## Amazon S3 バケットにデモアプリケーションをアップロードする
<a name="gs-python-upload-bucket"></a>

このセクションでは、前のセクションで作成した .zip ファイルを、このチュートリアルの冒頭で作成した Amazon Simple Storage Service (Amazon S3) バケットにアップロードします。このステップを完了していない場合、「(リンク)」を参照してください。

**アプリケーションコードの JAR ファイルをアップロードする方法**

1. Amazon S3 コンソール ([https://console.aws.amazon.com/s3/](https://console.aws.amazon.com/s3/)) を開きます。

1. アプリケーションコード用に以前作成したバケットを選択します。

1. **アップロード**を選択します。

1. **ファイルの追加**を選択します。

1. 前のステップで生成された .zip ファイルに移動します (`target/managed-flink-pyflink-getting-started-1.0.0.zip`)。

1. 他の設定を変更せずに **[アップロード]** を選択します。

## Managed Service for Apache Flink アプリケーションを作成して設定する
<a name="gs-python-7"></a>

コンソールまたは AWS CLIのいずれかを使用して Managed Service for Apache Flink アプリケーションを作成および設定することができます。このチュートリアルでは、コンソールを使用します。

### アプリケーションの作成
<a name="gs-python-7-console-create"></a>

1. にサインインし AWS マネジメントコンソール、https://console.aws.amazon.com/flink で Amazon MSF コンソールを開きます。

1. 正しいリージョンが選択されていることを確認してください: 米国東部 (バージニア北部)us-east-1。

1. 右側のメニューを開いて **[Apache Flink アプリケーション]** を選択したら、**[ストリーミングアプリケーションの作成]** を選択します。または、最初のページの**[開始方法]** セクションの **[ストリーミングアプリケーションの作成]** を選択します。

1. **[ストリーミングアプリケーションの作成]** ページで、次の操作を行います。
   + **[ストリーム処理アプリケーションの設定方法の選択]** には、**[最初から作成]** を選択します。
   + **[Apache Flink の設定、Application Flink バージョン]** には、**[Apache Flink 1.19]** を選択します。
   + **[アプリケーション設定]** の場合
     + [**アプリケーション名**] には **MyApplication** と入力します。
     + [**Description (説明)**] に **My Python test app** と入力します。
     + **[アプリケーションリソースへのアクセス]** で、**[必要なポリシーを使用して IAM ロール kinesis-analytics-MyApplication-us-east-1 を作成/更新]** を選択します。
   + **[アプリケーション設定のテンプレート]** の場合
     + **[テンプレート]** には、**[開発]** を選択します。
   + **[ストリーミングアプリケーションの作成]** を選択します。

**注記**  
コンソールを使用して Apache Flink アプリケーション用 Managed Service を作成する場合は、IAM ロールとポリシーをアプリケーションが自動的に作成するオプションを選択できます。アプリケーションではこのロールとポリシーを使用して、依存リソースにアクセスします。これらの IAM リソースは、次のようにアプリケーション名とリージョンを使用して命名されます。  
ポリシー: `kinesis-analytics-service-MyApplication-us-west-2`
ロール: `kinesisanalytics-MyApplication-us-west-2`
Amazon Managed Service for Apache Flink は、以前は *Kinesis Data Analytics* と呼ばれていました。自動的に生成されるリソースの名前には、下位互換性のために「`kinesis-analytics`」のプレフィックスが付きます。

### IAM ポリシーを編集する
<a name="gs-python-7-console-iam"></a>

Amazon S3 バケットにアクセスする許可を追加するように IAM ポリシーを編集します。

**IAM ポリシーを編集して S3 バケット権限を追加するには**

1. IAM コンソール ([https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/)) を開きます。

1. **[ポリシー]** を選択します。前のセクションでコンソールによって作成された **`kinesis-analytics-service-MyApplication-us-east-1`** ポリシーを選択します。

1. **[編集]** を選択して、**[JSON]** タブを選択します。

1. 次のポリシー例で強調表示されているセクションをポリシーに追加します。サンプルのアカウント ID (*012345678901*) を自分のアカウント ID に置き換えます。

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Sid": "ReadCode",
               "Effect": "Allow",
               "Action": [
                   "s3:GetObject",
                   "s3:GetObjectVersion"
               ],
               "Resource": [
                   "arn:aws:s3:::my-bucket/kinesis-analytics-placeholder-s3-object"
               ]
           },
           {
               "Sid": "ListCloudwatchLogGroups",
               "Effect": "Allow",
               "Action": [
                   "logs:DescribeLogGroups"
               ],
               "Resource": [
                   "arn:aws:logs:us-east-1:012345678901:log-group:*"
               ]
           },
           {
               "Sid": "ListCloudwatchLogStreams",
               "Effect": "Allow",
               "Action": [
                   "logs:DescribeLogStreams"
               ],
               "Resource": [
                   "arn:aws:logs:us-east-1:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*"
               ]
           },
           {
               "Sid": "PutCloudwatchLogs",
               "Effect": "Allow",
               "Action": [
                   "logs:PutLogEvents"
               ],
               "Resource": [
                   "arn:aws:logs:us-east-1:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream"
               ]
           },
           {
               "Sid": "ReadInputStream",
               "Effect": "Allow",
               "Action": "kinesis:*",
               "Resource": "arn:aws:kinesis:us-east-1:012345678901:stream/ExampleInputStream"
           },
           {
               "Sid": "WriteOutputStream",
               "Effect": "Allow",
               "Action": "kinesis:*",
               "Resource": "arn:aws:kinesis:us-east-1:012345678901:stream/ExampleOutputStream"
           }
       ]
   }
   ```

------

1. [**次へ**]、[**変更を保存**] の順に選択します。

### アプリケーションを設定する
<a name="gs-python-7-console-configure"></a>

アプリケーション設定を編集して、アプリケーションコードアーティファクトを設定します。

**アプリケーションを構成するには**

1. [**MyApplication**] ページで、[**Congirue**] を選択します。

1. **[アプリケーションコードの場所]** セクションで、次の操作を行います。
   + **[Amazon S3 バケット]** には、アプリケーションコード用に以前作成したバケットを選択します。**[参照]** を選択して正しいバケットを選択したら、**[選択]** を選択します。バケット名を選択しないでください。
   + [**Amazon S3 オブジェクトへのパス**] で、**managed-flink-pyflink-getting-started-1.0.0.zip**と入力します。

1. **[アクセス許可]** には、**[必要なポリシーで IAM ロール `kinesis-analytics-MyApplication-us-east-1` を作成/更新]** を選択します。

1. **[ランタイムプロパティ]** に移動し、他のすべての設定はデフォルト値のままにします。

1. **[新しい項目の追加]** を選択して、次のパラメータをすべて追加します。    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/ja_jp/managed-flink/latest/java/gs-python-createapp.html)

1. 他のセクションは変更せず、**[変更を保存]** を選択します。

**注記**  
Amazon CloudWatch ログ記録を有効にすることを選択すると、ロググループとログストリームが Kinesis Data Analytics によって作成されます。これらのリソースの名前は次のとおりです。  
ロググループ: `/aws/kinesis-analytics/MyApplication`
ログストリーム: `kinesis-analytics-log-stream`

### アプリケーションを実行する
<a name="gs-python-7-console-run"></a>

これでアプリケーションが設定され、実行する準備が整いました。

**アプリケーションを実行するには**

1. Amazon Managed Service for Apache Flink のコンソールで、**[自分のアプリケーション]** を選択して **[実行]** を選択します。

1. 次のページのアプリケーション復元設定ページで、**[最新のスナップショットで実行]** を選択したら、**[実行]** を選択します。

   **[アプリケーション詳細]** の **[ステータス]** は「`Ready`」から「`Starting`」に移行し、アプリケーションが起動されると「`Running`」に移行します。

アプリケーションが「`Running`」ステータスのとき、Flink ダッシュボードを開けるようになります。

**ダッシュボードを開くには**

1. **[Open Apache Flink ダッシュボード]** を選択します。ダッシュボードは新しいページで開かれます。

1. **[実行中のジョブ]** リストで、表示されている 1 つのジョブを選択します。
**注記**  
ランタイムプロパティを設定したり、IAM ポリシーを誤って編集したりすると、アプリケーションのステータスが「`Running`」になることがありますが、Flink ダッシュボードではジョブが継続的に再起動されていることが表示されます。これは、アプリケーションの設定が間違っているか、外部リソースへのアクセス許可がない場合の一般的な障害シナリオです。  
これが発生した場合、Flink ダッシュボードの **[例外]** タブを見て、問題の原因を確認してください。

### 実行中のアプリケーションのメトリクスを観察する
<a name="gs-python-observe-metrics"></a>

**[MyApplication]** ページの **[Amazon CloudWatch メトリクス]** セクションで、実行中のアプリケーションの基本的なメトリクスの一部を確認できます。

**メトリクスを表示する方法**

1. **[更新]** ボタンの横にあるドロップダウンリストから **[10 秒]** を選択します。

1. アプリケーションが実行中で正常なとき、**[アップタイム]** メトリクスが継続的に増加していることを確認できます。

1. **[完全再起動]** メトリクスはゼロである必要があります。増加している場合、設定に問題がある可能性があります。問題を調査するには、Flink ダッシュボードの **[例外]** タブを確認してください。

1. 正常なアプリケーションでは、**[失敗したチェックポイント数]** メトリクスは 0 です。
**注記**  
このダッシュボードには、5 分の粒度で一定の一連のメトリクスが表示されます。CloudWatch ダッシュボードで任意のメトリクスを使用してカスタムアプリケーションのダッシュボードを作成できます。

### Kinesis ストリームの出力データを観察する
<a name="gs-python-observe-output"></a>

Python スクリプトまたは Kinesis Data Generator のいずれかを使用して、入力にデータを引き続き発行していることを確認してください。

「[https://console.aws.amazon.com/kinesis/](https://console.aws.amazon.com/kinesis/)」のデータビューワーを使用 (以前に行った内容と同様に) することで、Managed Service for Apache Flink で実行されているアプリケーションの出力を観察できるようになりました。

**出力の表示方法**

1. Kinesis コンソール ([https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis)) を開きます。

1. リージョンが、このチュートリアルの実行に使用しているリージョンと同じであることを確認してください。デフォルトでは、us-east-1 米国東部 (バージニア北部) です。必要に応じてリージョンを変更します。

1. **[データストリーム]** を選択します。

1. 観察するストリームを選択します。このチュートリアルでは、`ExampleOutputStream` を使用します。

1.  **[データビューワー]** タブを選択します。

1. 任意の **[シャード]** を選択し、**[最新]** を **[開始位置]** のままにして **[レコードの取得]** を選択します。「このリクエストのレコードが見つかりません」というエラーが表示される場合があります。この場合、**[レコードの取得を再試行]** を選択します。ストリームディスプレイに発行された最新レコード。

1. データ列の値を選択して、レコードの内容を JSON 形式で確認します。

### アプリケーションを停止する
<a name="gs-python-7-console-stop"></a>

アプリケーションを停止するには、`MyApplication` という名前の Managed Service for Apache Flink アプリケーションのコンソールページに移動します。

**アプリケーションを停止するには**

1. **[アクション]** ドロップダウンリストで、**[停止]** を選択します。

1. **[アプリケーション詳細]** の **[ステータス]** は「`Running`」から「`Stopping`」に移行し、アプリケーションが完全に停止すると「`Ready`」に移行します。
**注記**  
Python スクリプトまたは Kinesis Data Generator から入力ストリームへのデータ送信も必ず停止してください。

## 次のステップ
<a name="gs-python-next-step-4"></a>

[AWS リソースをクリーンアップする](gs-python-cleanup.md)

# AWS リソースをクリーンアップする
<a name="gs-python-cleanup"></a>

このセクションでは、入門 (Python) チュートリアルで作成した AWS リソースをクリーンアップする手順について説明します。

**Topics**
+ [

## Managed Service for Apache Flink アプリケーションを削除する
](#gs-python-cleanup-app)
+ [

## Kinesis Data Streams を削除する
](#gs-python-cleanup-msk)
+ [

## Amazon S3 オブジェクトとバケットを削除する
](#gs-python-cleanup-s3)
+ [

## IAM リソースを削除する
](#gs-python-cleanup-iam)
+ [

## CloudWatch リソースを削除する
](#gs-python-cleanup-cw)

## Managed Service for Apache Flink アプリケーションを削除する
<a name="gs-python-cleanup-app"></a>

アプリケーションを削除するには、次の手順に従います。

**アプリケーションを削除するには**

1. Kinesis コンソール ([https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis)) を開きます。

1. Apache Flink 用 Managed Serviceパネルで、**MyApplication** を選択します。

1. **[アクション]** ドロップダウンリストで **[削除]** を選択したら、削除を確定します。

## Kinesis Data Streams を削除する
<a name="gs-python-cleanup-msk"></a>

1. にサインインし AWS マネジメントコンソール、https://console.aws.amazon.com/flink で Amazon MSF コンソールを開きます。

1. **[データストリーム]** を選択します。

1. 作成した 2 つのストリーム、`ExampleInputStream` および `ExampleOutputStream` を選択します。

1. **[アクション]** ドロップダウンリストで **[削除]** を選択したら、削除を確定します。

## Amazon S3 オブジェクトとバケットを削除する
<a name="gs-python-cleanup-s3"></a>

S3 オブジェクトとバケットを削除するには、次の手順に従います。

**S3 バケットからオブジェクトを削除する方法**

1. Amazon S3 コンソール ([https://console.aws.amazon.com/s3/](https://console.aws.amazon.com/s3/)) を開きます。

1. アプリケーションアーティファクト用に作成した S3 バケットを選択します。

1. アップロードした `amazon-msf-java-stream-app-1.0.jar` という名前のアプリケーションアーティファクトを選択します。

1. **[削除]** を選択し、削除を確定します。

**S3 バケットを削除するには**

1. Amazon S3 コンソール ([https://console.aws.amazon.com/s3/](https://console.aws.amazon.com/s3/)) を開きます。

1. アーティファクト用に作成したバケットを選択します。

1. **[削除]** を選択し、削除を確定します。
**注記**  
S3 バケットを削除するには、空である必要があります。

## IAM リソースを削除する
<a name="gs-python-cleanup-iam"></a>

以下の手順を使用して IAM リソースを削除します。

**IAM リソースを削除するには**

1. IAM コンソール ([https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/)) を開きます。

1. ナビゲーションバーで、[**ポリシー**] を選択します。

1. フィルターコントロールに「**kinesis**」と入力します。

1. **[kinesis-analytics-service-MyApplication-us-east-1]** ポリシーを選択します。

1. [**ポリシーアクション**]、[**削除**] の順に選択します。

1. ナビゲーションバーで ［**ロール**］を選択します。

1. **[kinesis-analytics-MyApplication-us-east-1]** ロールを選択します。

1. **[ロールの削除]** を選択し、削除を確定します。

## CloudWatch リソースを削除する
<a name="gs-python-cleanup-cw"></a>

以下の手順を使用して CloudWatch リソースを削除します。

**CloudWatch リソースを削除するには**

1. CloudWatch コンソールの [https://console.aws.amazon.com/cloudwatch/](https://console.aws.amazon.com/cloudwatch/) を開いてください。

1. ナビゲーションバーで [**ログ**] を選択します。

1. 「**/aws/kinesis-analytics/MyApplication**」ロググループを選択してください。

1. [**ロググループの削除**]を選択し、削除を確認してください。