기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.
Amazon에서 Flink 작업 작업 EMR
콘솔, ResourceManager 추적 UI에 있는 Flink 인터페이스 및 명령줄을 EMR통해 Amazon 의 Flink와 상호 작용하는 방법에는 여러 가지가 있습니다. 이러한 파일 중 하나를 사용하여 Flink 애플리케이션에 JAR 파일을 제출할 수 있습니다. JAR 파일을 제출하면 Flink 에서 관리하는 작업이 됩니다 JobManager. JobManager 는 Flink 세션 Application Master 데몬을 호스팅하는 YARN 노드에 있습니다.
Flink 애플리케이션을 장기 실행 클러스터 또는 임시 클러스터에서 YARN 작업으로 실행할 수 있습니다. 장기 실행 클러스터에서는 Amazon 에서 실행되는 하나의 Flink 클러스터에 여러 Flink 작업을 제출할 수 있습니다EMR. 임시 클러스터에서 Flink 작업을 실행하는 경우 Amazon EMR 클러스터는 Flink 애플리케이션을 실행하는 데 걸리는 시간 동안만 존재하므로 사용된 리소스 및 시간에 대해서만 요금이 부과됩니다. Amazon EMR AddSteps
API 작업, 작업에 대한 단계 인수, 또는 create-cluster
명령을 RunJobFlow
통해 Flink 작업을 제출할 수 있습니다 AWS CLI add-steps
.
장기 실행 클러스터에서 Flink YARN 애플리케이션을 단계적으로 시작합니다.
여러 클라이언트가 YARN API 작업을 통해 작업을 제출할 수 있는 Flink 애플리케이션을 시작하려면 클러스터를 생성하거나 Flink 애플리케이션을 기존 클러스터에 추가해야 합니다. 새 클러스터를 생성하는 방법에 대한 지침은 Flink를 포함하는 클러스터 생성 부분을 참조하세요. 기존 클러스터에서 YARN 세션을 시작하려면 콘솔, AWS CLI또는 Java 에서 다음 단계를 사용합니다SDK.
참고
flink-yarn-session
명령은 실행을 간소화하기 위해 Amazon EMR 버전 5.5.0에 yarn-session.sh
스크립트의 래퍼로 추가되었습니다. 이전 버전의 Amazon 를 사용하는 경우 콘솔의 bash -c "/usr/lib/flink/bin/yarn-session.sh -d"
인수 또는 AWS CLI 명령의 인수를 EMR대체Args
합니다.
콘솔에서 기존 클러스터의 Flink 작업을 제출하는 방법
기존 클러스터에서 flink-yarn-session
명령을 사용하여 Flink 세션을 제출합니다.
https://console.aws.amazon.com/emr
에서 Amazon EMR 콘솔을 엽니다. -
클러스터 목록에서 이전에 시작한 클러스터를 선택합니다.
-
클러스터 세부 정보 페이지에서 단계, Add Step(단계 추가)을 선택합니다.
-
아래 지침에 따라 파라미터를 입력하고 추가를 선택합니다.
파라미터 설명 단계 유형
사용자 지정 JAR 이름
단계 식별을 위한 이름. 예: <example-flink-step-name>
.Jar location(Jar 위치)
command-runner.jar
인수
애플리케이션에 따라 여러 가지 인수를 사용하는
flink-yarn-session
명령. 예:flink-yarn-session -d
는 YARN 클러스터 내에서 분리된 상태()로 Flink 세션을 시작합니다-d
. 인수 세부 정보는 최신 Flink 설명서의 YARN 설정을참조하세요.
를 사용하여 기존 클러스터에서 Flink 작업을 제출하려면 AWS CLI
-
add-steps
명령을 사용하여 장기 실행 클러스터에 Flink 작업을 추가합니다. 다음 예제 명령은 클러스터 내에서 YARN 분리된 상태()로 Flink 세션을 시작Args="flink-yarn-session", "-d"
하도록 지정합니다-d
. 인수 세부 정보는 최신 Flink 설명서의 YARN 설정을참조하세요. aws emr add-steps --cluster-id
<j-XXXXXXXX>
--steps Type=CUSTOM_JAR,Name=<example-flink-step-name>
,Jar=command-runner.jar,Args="flink-yarn-session","-d"
장기 실행 클러스터에서 기존 Flink 애플리케이션에 작업 제출
장기 실행 클러스터에 기존 Flink 애플리케이션이 이미 있는 경우 클러스터의 Flink 애플리케이션 ID를 지정하여 클러스터에 작업을 제출할 수 있습니다. 애플리케이션 ID를 가져오려면 yarn application -list
에서 AWS CLI 또는 YarnClient
$ yarn application -list 16/09/07 19:32:13 INFO client.RMProxy: Connecting to ResourceManager at ip-10-181-83-19.ec2.internal/10.181.83.19:8032 Total number of applications (application-types: [] and states: [SUBMITTED, ACCEPTED, RUNNING]):1 Application-Id Application-Name Application-Type User Queue State Final-State Progress Tracking-URL application_1473169569237_0002 Flink session with 14 TaskManagers (detached) Apache Flink hadoop default RUNNING UNDEFINED 100% http://ip-10-136-154-194.ec2.internal:33089
이 Flink 세션의 애플리케이션 ID는 AWS CLI 또는 에서 애플리케이션에 작업을 제출하는 데 사용할 수 application_1473169569237_0002
있는 입니다SDK.
예 SDK Java용
List<StepConfig> stepConfigs = new ArrayList<StepConfig>(); HadoopJarStepConfig flinkWordCountConf = new HadoopJarStepConfig() .withJar("command-runner.jar") .withArgs("flink", "run", "-m", "yarn-cluster", "-yid", "application_1473169569237_0002", "-yn", "2", "/usr/lib/flink/examples/streaming/WordCount.jar", "--input", "s3://amzn-s3-demo-bucket/pg11.txt", "--output", "s3://amzn-s3-demo-bucket/alice2/"); StepConfig flinkRunWordCount = new StepConfig() .withName("Flink add a wordcount step") .withActionOnFailure("CONTINUE") .withHadoopJarStep(flinkWordCountConf); stepConfigs.add(flinkRunWordCount); AddJobFlowStepsResult res = emr.addJobFlowSteps(new AddJobFlowStepsRequest() .withJobFlowId("
myClusterId
") .withSteps(stepConfigs));
예 AWS CLI
aws emr add-steps --cluster-id
<j-XXXXXXXX>
\ --steps Type=CUSTOM_JAR,Name=Flink_Submit_To_Long_Running,Jar=command-runner.jar,\ Args="flink","run","-m","yarn-cluster","-yid","application_1473169569237_0002",\ "/usr/lib/flink/examples/streaming/WordCount.jar",\ "--input","s3://amzn-s3-demo-bucket/pg11.txt","--output","s3://amzn-s3-demo-bucket/alice2/" \ --region<region-code>
임시 Flink 작업 제출
다음 예제에서는 Flink 작업을 실행한 다음 완료 시 종료하는 임시 클러스터를 시작합니다.
예 SDK Java용
import java.util.ArrayList; import java.util.List; import com.amazonaws.AmazonClientException; import com.amazonaws.auth.AWSCredentials; import com.amazonaws.auth.AWSStaticCredentialsProvider; import com.amazonaws.auth.profile.ProfileCredentialsProvider; import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduce; import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClientBuilder; import com.amazonaws.services.elasticmapreduce.model.*; public class Main_test { public static void main(String[] args) { AWSCredentials credentials_profile = null; try { credentials_profile = new ProfileCredentialsProvider("default").getCredentials(); } catch (Exception e) { throw new AmazonClientException( "Cannot load credentials from .aws/credentials file. " + "Make sure that the credentials file exists and the profile name is specified within it.", e); } AmazonElasticMapReduce emr = AmazonElasticMapReduceClientBuilder.standard() .withCredentials(new AWSStaticCredentialsProvider(credentials_profile)) .withRegion(Regions.US_WEST_1) .build(); List<StepConfig> stepConfigs = new ArrayList<StepConfig>(); HadoopJarStepConfig flinkWordCountConf = new HadoopJarStepConfig() .withJar("command-runner.jar") .withArgs("bash", "-c", "flink", "run", "-m", "yarn-cluster", "-yn", "2", "/usr/lib/flink/examples/streaming/WordCount.jar", "--input", "s3://path/to/input-file.txt", "--output", "s3://path/to/output/"); StepConfig flinkRunWordCountStep = new StepConfig() .withName("Flink add a wordcount step and terminate") .withActionOnFailure("CONTINUE") .withHadoopJarStep(flinkWordCountConf); stepConfigs.add(flinkRunWordCountStep); Application flink = new Application().withName("Flink"); RunJobFlowRequest request = new RunJobFlowRequest() .withName("flink-transient") .withReleaseLabel("emr-5.20.0") .withApplications(flink) .withServiceRole("EMR_DefaultRole") .withJobFlowRole("EMR_EC2_DefaultRole") .withLogUri("s3://path/to/my/logfiles") .withInstances(new JobFlowInstancesConfig() .withEc2KeyName("myEc2Key") .withEc2SubnetId("subnet-12ab3c45") .withInstanceCount(3) .withKeepJobFlowAliveWhenNoSteps(false) .withMasterInstanceType("m4.large") .withSlaveInstanceType("m4.large")) .withSteps(stepConfigs); RunJobFlowResult result = emr.runJobFlow(request); System.out.println("The cluster ID is " + result.toString()); } }
예 AWS CLI
create-cluster
하위 명령을 사용하여 Flink 작업 완료 시 종료되는 임시 클러스터를 생성합니다.
aws emr create-cluster --release-label emr-5.2.1 \ --name "Flink_Transient" \ --applications Name=Flink \ --configurations file://./configurations.json \ --region us-east-1 \ --log-uri s3://myLogUri \ --auto-terminate --instance-type m5.xlarge \ --instance-count 2 \ --service-role EMR_DefaultRole_V2 \ --ec2-attributes KeyName=
<YourKeyName>
,InstanceProfile=EMR_EC2_DefaultRole \ --steps Type=CUSTOM_JAR,Jar=command-runner.jar,Name=Flink_Long_Running_Session,\ Args="bash","-c","\"flink run -m yarn-cluster /usr/lib/flink/examples/streaming/WordCount.jar --input s3://amzn-s3-demo-bucket/pg11.txt --output s3://amzn-s3-demo-bucket/alice/""