

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# Managed Service for Apache Flink アプリケーションの作成と操作の例
<a name="examples-collapsibles"></a>

このセクションでは、 Managed Service for Apache Flink でのアプリケーションの作成と操作の例を示します。これらの例は、 Managed Service for Apache Flink アプリケーションを作成し、結果をテストするために役立つコード例と詳しい手順が含まれます。

例に進む前に、以下に目を通しておくことをお勧めします。
+ [仕組み](how-it-works.md)
+ [チュートリアル: Managed Service for Apache Flink で DataStream API 使用の概要](getting-started.md)

**注記**  
これらの例では、米国東部 (バージニア北部) リージョン (us-east-1) を使用していることを前提としています。別のリージョンを使用している場合は、アプリケーションコード、コマンド、IAM ロールを適切に更新してください。　

**Topics**
+ [Managed Service for Apache Flink での Java の例](examples-new-java.md)
+ [Managed Service for Apache Flink での Python の例](examples-new-python.md)
+ [Managed Service for Apache Flink での Scala の例](examples-new-scala.md)

# Managed Service for Apache Flink での Java の例
<a name="examples-new-java"></a>

次の例では、Java で記述したアプリケーションを作成する方法について説明します。



**注記**  
ほとんどの例は、ローカル、開発マシン、選択した IDE、Amazon Managed Service for Apache Flink の両方で実行されるように設計されています。これらは、アプリケーションパラメータを渡すために使用できるメカニズムと、両方の環境でアプリケーションを変更せずに実行するために依存関係を正しく設定する方法を示しています。

## カスタムの TypeInfo を定義するシリアル化パフォーマンスの向上
<a name="improving-serialization-performance-java"></a>

この例は、レコードまたは状態オブジェクトでカスタム TypeInfo を定義して、シリアル化が効率の低い Kryo シリアル化にフォールバックしないようにする方法を示します。これは、オブジェクトに `List` または `Map` が含まれている場合などに必要です。詳細については、Apache Flink ドキュメントの「[Data Types & Serialization](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#data-types--serialization)」を参照してください。この例では、オブジェクトのシリアル化が効率の低い Kryo シリアル化にフォールバックするかどうかをテストする方法も示しています。

コード例: [CustomTypeInfo](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/Serialization/CustomTypeInfo)

## DataStream API の使用を開始する
<a name="getting-started-datastream-java"></a>

この例は、`DataStream` API を使用して Kinesis データストリームから読み取り、別の Kinesis データストリームに書き込むシンプルなアプリケーションを示しています。この例では、正しい依存関係でファイルをセットアップし、uber-JAR をビルドして設定パラメータを解析する方法を示しています。これにより、アプリケーションをローカル、IDE、Amazon Managed Service for Apache Flink の両方で実行できます。

コード例: [GettingStarted](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/GettingStarted)

## Table API と SQL の使用を開始する
<a name="getting-started-table-java"></a>

この例は、`Table` API と SQL を使用したシンプルなアプリケーションを示しています。同じ Java アプリケーションで `DataStream` API を `Table` API または SQL と統合する方法を示します。また、`DataGen` コネクタを使用して、外部データジェネレーターを必要とせずに、Flink アプリケーション自体の中でランダムなテストデータを生成する方法も示します。

完全な例: [GettingStartedTable](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/GettingStartedTable)

## S3Sink を使用する (DataStream API)
<a name="s3-sink-java"></a>

この例は、`DataStream` API の `FileSink` を使用して S3 バケットに JSON ファイルを書き込む方法を示します。

コード例: [S3Sink](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/S3Sink)

## Kinesis ソース、標準または EFO コンシューマー、シンクを使用する (DataStream API)
<a name="kinesis-EFO-sink-java"></a>

この例は、標準コンシューマーまたは EFO のいずれかを使用して Kinesis データストリームから消費するソースを設定する方法と、Kinesis データストリームへのシンクを設定する方法を示します。

コード例: [KinesisConnectors](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KinesisConnectors)

## Amazon Data Firehose シンクを使用する (DataStream API)
<a name="firehose-sink-java"></a>

この例は、Amazon Data Firehose (旧称 Kinesis Data Firehose) にデータを送信する方法を示します。

コード例: [KinesisFirehoseSink](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KinesisFirehoseSink)

## Prometheus シンクコネクタを使用する
<a name="prometheus-sink-java"></a>

この例は、[Prometheus シンクコネクタ](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/prometheus/)を使用して時系列データを Prometheus に書き込む方法を示します。

コード例: [PrometheusSink](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/PrometheusSink)

## ウィンドウイング集約を使用する (DataStream API)
<a name="windowing-aggregations-java"></a>

この例は、`DataStream` API の 4 タイプのウィンドウイング集約を示しています。

1. 処理時間に基づくスライディングウィンドウ

1. イベント時間に基づくスライディングウィンドウ

1. 処理時間に基づくタンブリングウィンドウ

1. イベント時間に基づくタンブリングウィンドウ

コード例: [Windowing](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/Windowing) 

## カスタムメトリクスを使用する
<a name="custom-metrics-java"></a>

この例は、Flink アプリケーションにカスタムメトリクスを追加して CloudWatch メトリクスに送信する方法を示します。

コード例: [CustomMetrics](https://github.com/dzikosc/amazon-managed-service-for-apache-flink-examples/tree/main/java/CustomMetrics)

## Kafka 設定プロバイダーを使用して、ランタイム中に mTLS のカスタムキーストアとトラストストアを取得する
<a name="kafka-keystore-mTLS"></a>

この例は、Kafka 設定プロバイダーを使用して、Kafka コネクタの mTLS 認証用の証明書を持つカスタムキーストアとトラストストアを設定する方法を示します。この手法により、Amazon S3 から必要なカスタム証明書をロードし、アプリケーションの起動 AWS Secrets Manager 時に からシークレットをロードできます。

コード例: [Kafka-mTLS-Keystore-ConfigProviders](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KafkaConfigProviders/Kafka-mTLS-Keystore-ConfigProviders)

## Kafka 設定プロバイダーを使用して、ランタイム中に SASL/SCRAM 認証のシークレットを取得する
<a name="kafka-secrets"></a>

この例では、Kafka 設定プロバイダーを使用して Amazon S3 から認証情報を取得し AWS Secrets Manager 、信頼ストアをダウンロードして、Kafka コネクタで SASL/SCRAM 認証を設定する方法を示します。 Amazon S3 この手法により、Amazon S3 から必要なカスタム証明書をロードし、アプリケーションの起動 AWS Secrets Manager 時に からシークレットをロードできます。

コード例: [Kafka-SASL\$1SSL-ConfigProviders](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KafkaConfigProviders/Kafka-SASL_SSL-ConfigProviders)

## Table API/SQL で Kafka 設定プロバイダーを使用して、ランタイム中に mTLS のカスタムキーストアとトラストストアを取得する
<a name="kafka-custom-keystore"></a>

この例は、Table API/SQL で Kafka 設定プロバイダーを使用して、Kafka コネクタの mTLS 認証用の証明書を持つカスタムキーストアとトラストストアを設定する方法を示します。この手法により、Amazon S3 から必要なカスタム証明書をロードし、アプリケーションの起動 AWS Secrets Manager 時に からシークレットをロードできます。

コード例: [Kafka-mTLS-Keystore-Sql-ConfigProviders](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KafkaConfigProviders/Kafka-mTLS-Keystore-Sql-ConfigProviders)

## サイド出力を使用してストリームを分割する
<a name="side-output"></a>

この例は、Apache Flink の[サイド出力](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/side_output/)を活用して、指定された属性でストリームを分割する方法を示します。このパターンは、ストリーミングアプリケーションでデッドレターキュー (DLQ) の概念を実装しようとする場合に特に役立ちます。

コード例: [SideOutputs](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/SideOutputs)

## 非同期 I/O を使用して外部エンドポイントを呼び出す
<a name="async-i-o"></a>

この例は、[Apache Flink 非同期 I/O](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/dev/datastream/operators/asyncio/) を使用して外部エンドポイントをノンブロッキング方式で呼び出し、復元可能なエラーを再試行する方法を示します。

コード例: [AsyncIO](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/AsyncIO)

# Managed Service for Apache Flink での Python の例
<a name="examples-new-python"></a>

次の例では、Python で記述したアプリケーションを作成する方法について説明します。

**注記**  
ほとんどの例は、ローカル、開発マシン、選択した IDE、Amazon Managed Service for Apache Flink の両方で実行されるように設計されています。これらは、アプリケーションパラメータを渡すために使用できるシンプルなメカニズムと、両方の環境でアプリケーションを変更せずに実行するために依存関係を正しく設定する方法を示しています。

**プロジェクトの依存関係**

ほとんどの PyFlink の例では、Flink コネクタなど、JAR ファイルとして 1 つ以上の依存関係が必要です。これらの依存関係は、Amazon Managed Service for Apache Flink にデプロイされたときにアプリケーションにパッケージ化されている必要があります。

次の例では、開発とテストのためにアプリケーションをローカルで実行し、必要な依存関係を正しくパッケージ化できるツールが既に含まれています。このツールでは、Java JDK11 と Apache Maven を使用する必要があります。具体的な手順については、各例に含まれる README を参照してください。

**例**

## PyFlink の使用を開始する
<a name="getting-started-pyflink"></a>

この例は、Python コードに埋め込まれた SQL を使用する PyFlink アプリケーションの基本構造を示します。このプロジェクトは、コネクタなどの JAR 依存関係が含まれた PyFlink アプリケーション用のスケルトンも提供します。README セクションには、開発のために Python アプリケーションをローカルで実行する方法に関する詳細なガイダンスが記載されています。この例は、この例での唯一の JAR 依存関係である Kinesis SQL コネクタを PyFlink アプリケーションに含める方法も示しています。

コード例: [GettingStarted](https://github.com/dzikosc/amazon-managed-service-for-apache-flink-examples/tree/main/python/GettingStarted)

## Python の依存関係を追加する
<a name="add-python-dependencies"></a>

この例は、Python 依存関係を PyFlink アプリケーションに追加する最も一般的な方法を示します。この方法は、Boto3 などの単純な依存関係、または PyArrow などの C ライブラリを含む複雑な依存関係に対して機能します。

コード例: [PythonDependencies](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/python/PythonDependencies)

## ウィンドウイング集約を使用する (DataStream API)
<a name="windowing-aggregations-python"></a>

この例は、Python アプリケーションに埋め込まれた SQL の 4 タイプのウィンドウイング集約を示しています。

1. 処理時間に基づくスライディングウィンドウ

1. イベント時間に基づくスライディングウィンドウ

1. 処理時間に基づくタンブリングウィンドウ

1. イベント時間に基づくタンブリングウィンドウ

コード例: [Windowing](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/python/Windowing)

## S3 シンクを使用する
<a name="s3-sink-python"></a>

この例は、Python アプリケーションに埋め込まれた SQL を使用して、出力を JSON ファイルとして Amazon S3 に書き込む方法を示します。ファイルを Amazon S3 に書き込んでローテーションするには、S3 シンクのチェックポイントを有効にする必要があります。

コード例: [S3Sink](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/python/S3Sink)

## ユーザー定義関数 (UDF) を使用する
<a name="UDF-python"></a>

この例は、ユーザー定義関数を定義し、Python で実装して、Python アプリケーションで実行される SQL コードで使用する方法を示します。

コード例: [UDF](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/python/UDF)

## Amazon Data Firehose シンクを使用する
<a name="Firehose-sink-python"></a>

この例は、SQL を使用して Amazon Data Firehose にデータを送信する方法を示します。

コード例: [FirehoseSink](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/python/FirehoseSink)

# Managed Service for Apache Flink での Scala の例
<a name="examples-new-scala"></a>

以下の例では、Scala を使用した Apache Flink を使用してアプリケーションを作成する方法を示しています。



## マルチステップアプリケーションをセットアップする
<a name="getting-started-scala"></a>

この例は、Scala で Flink アプリケーションをセットアップする方法を示します。依存関係を含めて uber-JAR をビルド築するように SBT プロジェクトを設定する方法を示します。

コード例: [GettingStarted](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/scala/GettingStarted)