翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
Amazon EMR 上でコンソールを通じて Flink とやりとりするにはいくつかの方法があります。ResourceManager の UI 追跡にある Flink インターフェイスと、コマンドライン上です。このいずれの場合にも、JAR ファイルを Flink アプリケーションに送信できます。送信した JAR ファイルは Flink JobManager によって管理されるジョブになります。JobManager は、Flink セッションのアプリケーションマスターデーモンをホストする YARN ノードで動作しています。
長期実行のクラスターまたは一時的なクラスターで、YARN ジョブとして Flink アプリケーションを実行できます。長期実行のジョブでは、複数の Flink ジョブを Amazon EMR で実行する 1 つの Flink クラスターに送信できます。一時的なクラスターで Flink ジョブを稼働する場合、Amazon EMR クラスターは Flink アプリケーションを実行するために必要な時間のためだけに存在します。そのため、使用したリソースと費やした時間に対してのみ課金されます。Flink ジョブは、Amazon EMR AddSteps
API オペレーション、 オペレーションへのステップ引数、 RunJobFlow
または create-cluster
コマンドを使用して AWS CLI add-steps
送信できます。
長期実行のクラスターのステップとして、Flink YARN アプリケーションを起動します
複数のクライアントが YARN API オペレーションを介して作業を送信できる Flink アプリケーションを起動するには、クラスターを作成するか、既存のクラスターに Flink アプリケーションを追加する必要があります。新しいクラスターを作成する手順については、「Flink を使用してクラスターを作成する」を参照してください。既存のクラスターで YARN セッションを開始するには、コンソール、 AWS CLI、Java SDK から次のステップに従います。
注記
Amazon EMR バージョン 5.5.0 では、実行を簡素化するための yarn-session.sh
スクリプトのラッパーとして、flink-yarn-session
コマンドが追加されました。以前のバージョンの Amazon EMR を使用している場合は、コンソールの引数、または AWS CLI コマンドの Args
を bash -c
"/usr/lib/flink/bin/yarn-session.sh -d"
で置き換えます。
コンソールを使用して Flink ジョブを既存のクラスターに送信するには
flink-yarn-session
コマンドを使用して、Flink セッションを既存のクラスターに送信します。
https://console.aws.amazon.com/emr
で Amazon EMR コンソールを開きます。 -
クラスターリストで、以前に起動したクラスターを選択します。
-
クラスターの詳細ページで、[Steps (ステップ)]、[Add Step (ステップの追加)] の順に選択します。
-
次のガイドラインに従ってパラメータを入力し、[追加] を選択します。
パラメータ 説明 ステップタイプ
カスタム JAR 名前
ステップを識別するのに役立つ名前。例えば、 <example-flink-step-name>
などです。Jar location
command-runner.jar
引数
flink-yarn-session
コマンドとアプリケーションに適切な引数。たとえば、flink-yarn-session -d
は、デタッチ状態 (-d
) で、 YARN クラスターで Flink セッションを開始します。引数の詳細については、最新の Flink ドキュメントの「YARN Setup (YARN の設定)」を参照してください。
を使用して既存のクラスターで Flink ジョブを送信するには AWS CLI
-
Flink ジョブを長期実行クラスターに追加するには、
add-steps
コマンドを使用します。次のコマンド例では、YARN クラスター内において Flink セッションが切り離された状態 (-d
) で開始されるよう、Args="flink-yarn-session", "-d"
を指定しています。引数の詳細については、最新の Flink ドキュメントの「YARN Setup (YARN の設定)」を参照してください。 aws emr add-steps --cluster-id
<j-XXXXXXXX>
--steps Type=CUSTOM_JAR,Name=<example-flink-step-name>
,Jar=command-runner.jar,Args="flink-yarn-session","-d"
長期実行クラスター上の既存の Flink アプリケーションに作業を送信する
長期実行クラスターに既存の Flink アプリケーションがある場合は、作業を送信するために、クラスターの Flink アプリケーション ID を指定できます。アプリケーション ID を取得するには、 yarn application -list
で、 AWS CLI または YarnClient
$ yarn application -list
16/09/07 19:32:13 INFO client.RMProxy: Connecting to ResourceManager at ip-10-181-83-19.ec2.internal/10.181.83.19:8032
Total number of applications (application-types: [] and states: [SUBMITTED, ACCEPTED, RUNNING]):1
Application-Id Application-Name Application-Type User Queue State Final-State Progress Tracking-URL
application_1473169569237_0002 Flink session with 14 TaskManagers (detached) Apache Flink hadoop default RUNNING UNDEFINED 100% http://ip-10-136-154-194.ec2.internal:33089
この Flink セッションのアプリケーション ID は です。これを使用してapplication_1473169569237_0002
、 AWS CLI または SDK からアプリケーションに作業を送信できます。
例 SDK for Java
List<StepConfig> stepConfigs = new ArrayList<StepConfig>(); HadoopJarStepConfig flinkWordCountConf = new HadoopJarStepConfig() .withJar("command-runner.jar") .withArgs("flink", "run", "-m", "yarn-cluster", "-yid", "application_1473169569237_0002", "-yn", "2", "/usr/lib/flink/examples/streaming/WordCount.jar", "--input", "s3://amzn-s3-demo-bucket/pg11.txt", "--output", "s3://amzn-s3-demo-bucket/alice2/"); StepConfig flinkRunWordCount = new StepConfig() .withName("Flink add a wordcount step") .withActionOnFailure("CONTINUE") .withHadoopJarStep(flinkWordCountConf); stepConfigs.add(flinkRunWordCount); AddJobFlowStepsResult res = emr.addJobFlowSteps(new AddJobFlowStepsRequest() .withJobFlowId("
myClusterId
") .withSteps(stepConfigs));
例 AWS CLI
aws emr add-steps --cluster-id
<j-XXXXXXXX>
\ --steps Type=CUSTOM_JAR,Name=Flink_Submit_To_Long_Running,Jar=command-runner.jar,\ Args="flink","run","-m","yarn-cluster","-yid","application_1473169569237_0002",\ "/usr/lib/flink/examples/streaming/WordCount.jar",\ "--input","s3://amzn-s3-demo-bucket/pg11.txt","--output","s3://amzn-s3-demo-bucket/alice2/" \ --region<region-code>
一時的な Flink ジョブを送信する
次の例では Flink ジョブを実行する一時的なクラスターを起動し、完了時に終了します。
例 SDK for Java
import java.util.ArrayList;
import java.util.List;
import com.amazonaws.AmazonClientException;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.profile.ProfileCredentialsProvider;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduce;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClientBuilder;
import com.amazonaws.services.elasticmapreduce.model.*;
public class Main_test {
public static void main(String[] args) {
AWSCredentials credentials_profile = null;
try {
credentials_profile = new ProfileCredentialsProvider("default").getCredentials();
} catch (Exception e) {
throw new AmazonClientException(
"Cannot load credentials from .aws/credentials file. " +
"Make sure that the credentials file exists and the profile name is specified within it.",
e);
}
AmazonElasticMapReduce emr = AmazonElasticMapReduceClientBuilder.standard()
.withCredentials(new AWSStaticCredentialsProvider(credentials_profile))
.withRegion(Regions.US_WEST_1)
.build();
List<StepConfig> stepConfigs = new ArrayList<StepConfig>();
HadoopJarStepConfig flinkWordCountConf = new HadoopJarStepConfig()
.withJar("command-runner.jar")
.withArgs("bash", "-c", "flink", "run", "-m", "yarn-cluster", "-yn", "2",
"/usr/lib/flink/examples/streaming/WordCount.jar", "--input", "s3://path/to/input-file.txt", "--output",
"s3://path/to/output/");
StepConfig flinkRunWordCountStep = new StepConfig()
.withName("Flink add a wordcount step and terminate")
.withActionOnFailure("CONTINUE")
.withHadoopJarStep(flinkWordCountConf);
stepConfigs.add(flinkRunWordCountStep);
Application flink = new Application().withName("Flink");
RunJobFlowRequest request = new RunJobFlowRequest()
.withName("flink-transient")
.withReleaseLabel("emr-5.20.0")
.withApplications(flink)
.withServiceRole("EMR_DefaultRole")
.withJobFlowRole("EMR_EC2_DefaultRole")
.withLogUri("s3://path/to/my/logfiles")
.withInstances(new JobFlowInstancesConfig()
.withEc2KeyName("myEc2Key")
.withEc2SubnetId("subnet-12ab3c45")
.withInstanceCount(3)
.withKeepJobFlowAliveWhenNoSteps(false)
.withMasterInstanceType("m4.large")
.withSlaveInstanceType("m4.large"))
.withSteps(stepConfigs);
RunJobFlowResult result = emr.runJobFlow(request);
System.out.println("The cluster ID is " + result.toString());
}
}
例 AWS CLI
Flink ジョブの完了時に終了する一時的なクラスターを作成するには、create-cluster
サブコマンドを使用します。
aws emr create-cluster --release-label emr-5.2.1 \ --name "Flink_Transient" \ --applications Name=Flink \ --configurations file://./configurations.json \ --region us-east-1 \ --log-uri s3://myLogUri \ --auto-terminate --instance-type m5.xlarge \ --instance-count 2 \ --service-role EMR_DefaultRole_V2 \ --ec2-attributes KeyName=
<YourKeyName>
,InstanceProfile=EMR_EC2_DefaultRole \ --steps Type=CUSTOM_JAR,Jar=command-runner.jar,Name=Flink_Long_Running_Session,\ Args="bash","-c","\"flink run -m yarn-cluster /usr/lib/flink/examples/streaming/WordCount.jar --input s3://amzn-s3-demo-bucket/pg11.txt --output s3://amzn-s3-demo-bucket/alice/""