Amazon에서 Flink 작업 작업 EMR - Amazon EMR

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

Amazon에서 Flink 작업 작업 EMR

콘솔, ResourceManager 추적 UI에 있는 Flink 인터페이스 및 명령줄을 EMR통해 Amazon 의 Flink와 상호 작용하는 방법에는 여러 가지가 있습니다. 이러한 파일 중 하나를 사용하여 Flink 애플리케이션에 JAR 파일을 제출할 수 있습니다. JAR 파일을 제출하면 Flink 에서 관리하는 작업이 됩니다 JobManager. JobManager 는 Flink 세션 Application Master 데몬을 호스팅하는 YARN 노드에 있습니다.

Flink 애플리케이션을 장기 실행 클러스터 또는 임시 클러스터에서 YARN 작업으로 실행할 수 있습니다. 장기 실행 클러스터에서는 Amazon 에서 실행되는 하나의 Flink 클러스터에 여러 Flink 작업을 제출할 수 있습니다EMR. 임시 클러스터에서 Flink 작업을 실행하는 경우 Amazon EMR 클러스터는 Flink 애플리케이션을 실행하는 데 걸리는 시간 동안만 존재하므로 사용된 리소스 및 시간에 대해서만 요금이 부과됩니다. Amazon EMR AddSteps API 작업, 작업에 대한 단계 인수, 또는 create-cluster 명령을 RunJobFlow 통해 Flink 작업을 제출할 수 있습니다 AWS CLI add-steps.

여러 클라이언트가 YARN API 작업을 통해 작업을 제출할 수 있는 Flink 애플리케이션을 시작하려면 클러스터를 생성하거나 Flink 애플리케이션을 기존 클러스터에 추가해야 합니다. 새 클러스터를 생성하는 방법에 대한 지침은 Flink를 포함하는 클러스터 생성 부분을 참조하세요. 기존 클러스터에서 YARN 세션을 시작하려면 콘솔, AWS CLI또는 Java 에서 다음 단계를 사용합니다SDK.

참고

flink-yarn-session 명령은 실행을 간소화하기 위해 Amazon EMR 버전 5.5.0에 yarn-session.sh 스크립트의 래퍼로 추가되었습니다. 이전 버전의 Amazon 를 사용하는 경우 콘솔의 bash -c "/usr/lib/flink/bin/yarn-session.sh -d" 인수 또는 AWS CLI 명령의 인수를 EMR대체Args합니다.

콘솔에서 기존 클러스터의 Flink 작업을 제출하는 방법

기존 클러스터에서 flink-yarn-session 명령을 사용하여 Flink 세션을 제출합니다.

  1. https://console.aws.amazon.com/emr 에서 Amazon EMR 콘솔을 엽니다.

  2. 클러스터 목록에서 이전에 시작한 클러스터를 선택합니다.

  3. 클러스터 세부 정보 페이지에서 단계, Add Step(단계 추가)을 선택합니다.

  4. 아래 지침에 따라 파라미터를 입력하고 추가를 선택합니다.

    파라미터 설명

    단계 유형

    사용자 지정 JAR

    이름

    단계 식별을 위한 이름. 예: <example-flink-step-name>.

    Jar location(Jar 위치)

    command-runner.jar

    인수

    애플리케이션에 따라 여러 가지 인수를 사용하는 flink-yarn-session 명령. 예: flink-yarn-session -d 는 YARN 클러스터 내에서 분리된 상태()로 Flink 세션을 시작합니다-d. 인수 세부 정보는 최신 Flink 설명서의 YARN 설정을 참조하세요.

를 사용하여 기존 클러스터에서 Flink 작업을 제출하려면 AWS CLI
  • add-steps 명령을 사용하여 장기 실행 클러스터에 Flink 작업을 추가합니다. 다음 예제 명령은 클러스터 내에서 YARN 분리된 상태()로 Flink 세션을 시작Args="flink-yarn-session", "-d"하도록 지정합니다-d. 인수 세부 정보는 최신 Flink 설명서의 YARN 설정을 참조하세요.

    aws emr add-steps --cluster-id <j-XXXXXXXX> --steps Type=CUSTOM_JAR,Name=<example-flink-step-name>,Jar=command-runner.jar,Args="flink-yarn-session","-d"

장기 실행 클러스터에 기존 Flink 애플리케이션이 이미 있는 경우 클러스터의 Flink 애플리케이션 ID를 지정하여 클러스터에 작업을 제출할 수 있습니다. 애플리케이션 ID를 가져오려면 yarn application -list에서 AWS CLI 또는 YarnClient API 작업을 통해 를 실행합니다.

$ yarn application -list 16/09/07 19:32:13 INFO client.RMProxy: Connecting to ResourceManager at ip-10-181-83-19.ec2.internal/10.181.83.19:8032 Total number of applications (application-types: [] and states: [SUBMITTED, ACCEPTED, RUNNING]):1 Application-Id Application-Name Application-Type User Queue State Final-State Progress Tracking-URL application_1473169569237_0002 Flink session with 14 TaskManagers (detached) Apache Flink hadoop default RUNNING UNDEFINED 100% http://ip-10-136-154-194.ec2.internal:33089

이 Flink 세션의 애플리케이션 ID는 AWS CLI 또는 에서 애플리케이션에 작업을 제출하는 데 사용할 수 application_1473169569237_0002있는 입니다SDK.

예 SDK Java용
List<StepConfig> stepConfigs = new ArrayList<StepConfig>(); HadoopJarStepConfig flinkWordCountConf = new HadoopJarStepConfig() .withJar("command-runner.jar") .withArgs("flink", "run", "-m", "yarn-cluster", "-yid", "application_1473169569237_0002", "-yn", "2", "/usr/lib/flink/examples/streaming/WordCount.jar", "--input", "s3://amzn-s3-demo-bucket/pg11.txt", "--output", "s3://amzn-s3-demo-bucket/alice2/"); StepConfig flinkRunWordCount = new StepConfig() .withName("Flink add a wordcount step") .withActionOnFailure("CONTINUE") .withHadoopJarStep(flinkWordCountConf); stepConfigs.add(flinkRunWordCount); AddJobFlowStepsResult res = emr.addJobFlowSteps(new AddJobFlowStepsRequest() .withJobFlowId("myClusterId") .withSteps(stepConfigs));
예 AWS CLI
aws emr add-steps --cluster-id <j-XXXXXXXX> \ --steps Type=CUSTOM_JAR,Name=Flink_Submit_To_Long_Running,Jar=command-runner.jar,\ Args="flink","run","-m","yarn-cluster","-yid","application_1473169569237_0002",\ "/usr/lib/flink/examples/streaming/WordCount.jar",\ "--input","s3://amzn-s3-demo-bucket/pg11.txt","--output","s3://amzn-s3-demo-bucket/alice2/" \ --region <region-code>

다음 예제에서는 Flink 작업을 실행한 다음 완료 시 종료하는 임시 클러스터를 시작합니다.

예 SDK Java용
import java.util.ArrayList; import java.util.List; import com.amazonaws.AmazonClientException; import com.amazonaws.auth.AWSCredentials; import com.amazonaws.auth.AWSStaticCredentialsProvider; import com.amazonaws.auth.profile.ProfileCredentialsProvider; import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduce; import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClientBuilder; import com.amazonaws.services.elasticmapreduce.model.*; public class Main_test { public static void main(String[] args) { AWSCredentials credentials_profile = null; try { credentials_profile = new ProfileCredentialsProvider("default").getCredentials(); } catch (Exception e) { throw new AmazonClientException( "Cannot load credentials from .aws/credentials file. " + "Make sure that the credentials file exists and the profile name is specified within it.", e); } AmazonElasticMapReduce emr = AmazonElasticMapReduceClientBuilder.standard() .withCredentials(new AWSStaticCredentialsProvider(credentials_profile)) .withRegion(Regions.US_WEST_1) .build(); List<StepConfig> stepConfigs = new ArrayList<StepConfig>(); HadoopJarStepConfig flinkWordCountConf = new HadoopJarStepConfig() .withJar("command-runner.jar") .withArgs("bash", "-c", "flink", "run", "-m", "yarn-cluster", "-yn", "2", "/usr/lib/flink/examples/streaming/WordCount.jar", "--input", "s3://path/to/input-file.txt", "--output", "s3://path/to/output/"); StepConfig flinkRunWordCountStep = new StepConfig() .withName("Flink add a wordcount step and terminate") .withActionOnFailure("CONTINUE") .withHadoopJarStep(flinkWordCountConf); stepConfigs.add(flinkRunWordCountStep); Application flink = new Application().withName("Flink"); RunJobFlowRequest request = new RunJobFlowRequest() .withName("flink-transient") .withReleaseLabel("emr-5.20.0") .withApplications(flink) .withServiceRole("EMR_DefaultRole") .withJobFlowRole("EMR_EC2_DefaultRole") .withLogUri("s3://path/to/my/logfiles") .withInstances(new JobFlowInstancesConfig() .withEc2KeyName("myEc2Key") .withEc2SubnetId("subnet-12ab3c45") .withInstanceCount(3) .withKeepJobFlowAliveWhenNoSteps(false) .withMasterInstanceType("m4.large") .withSlaveInstanceType("m4.large")) .withSteps(stepConfigs); RunJobFlowResult result = emr.runJobFlow(request); System.out.println("The cluster ID is " + result.toString()); } }
예 AWS CLI

create-cluster 하위 명령을 사용하여 Flink 작업 완료 시 종료되는 임시 클러스터를 생성합니다.

aws emr create-cluster --release-label emr-5.2.1 \ --name "Flink_Transient" \ --applications Name=Flink \ --configurations file://./configurations.json \ --region us-east-1 \ --log-uri s3://myLogUri \ --auto-terminate --instance-type m5.xlarge \ --instance-count 2 \ --service-role EMR_DefaultRole_V2 \ --ec2-attributes KeyName=<YourKeyName>,InstanceProfile=EMR_EC2_DefaultRole \ --steps Type=CUSTOM_JAR,Jar=command-runner.jar,Name=Flink_Long_Running_Session,\ Args="bash","-c","\"flink run -m yarn-cluster /usr/lib/flink/examples/streaming/WordCount.jar --input s3://amzn-s3-demo-bucket/pg11.txt --output s3://amzn-s3-demo-bucket/alice/""