將中的 EmrCluster 資源用AWSSDK於 Java - AWS Data Pipeline

AWS Data Pipeline 不再提供給新客戶。現有客戶 AWS Data Pipeline 可繼續正常使用此服務。進一步了解

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

將中的 EmrCluster 資源用AWSSDK於 Java

下列範例示範如何使用EmrClusterEmrActivity建立 Amazon EMR 4.x 叢集,以使用 Java SDK 執行星火步驟:

public class dataPipelineEmr4 { public static void main(String[] args) { AWSCredentials credentials = null; credentials = new ProfileCredentialsProvider("/path/to/AwsCredentials.properties","default").getCredentials(); DataPipelineClient dp = new DataPipelineClient(credentials); CreatePipelineRequest createPipeline = new CreatePipelineRequest().withName("EMR4SDK").withUniqueId("unique"); CreatePipelineResult createPipelineResult = dp.createPipeline(createPipeline); String pipelineId = createPipelineResult.getPipelineId(); PipelineObject emrCluster = new PipelineObject() .withName("EmrClusterObj") .withId("EmrClusterObj") .withFields( new Field().withKey("releaseLabel").withStringValue("emr-4.1.0"), new Field().withKey("coreInstanceCount").withStringValue("3"), new Field().withKey("applications").withStringValue("spark"), new Field().withKey("applications").withStringValue("Presto-Sandbox"), new Field().withKey("type").withStringValue("EmrCluster"), new Field().withKey("keyPair").withStringValue("myKeyName"), new Field().withKey("masterInstanceType").withStringValue("m3.xlarge"), new Field().withKey("coreInstanceType").withStringValue("m3.xlarge") ); PipelineObject emrActivity = new PipelineObject() .withName("EmrActivityObj") .withId("EmrActivityObj") .withFields( new Field().withKey("step").withStringValue("command-runner.jar,spark-submit,--executor-memory,1g,--class,org.apache.spark.examples.SparkPi,/usr/lib/spark/lib/spark-examples.jar,10"), new Field().withKey("runsOn").withRefValue("EmrClusterObj"), new Field().withKey("type").withStringValue("EmrActivity") ); PipelineObject schedule = new PipelineObject() .withName("Every 15 Minutes") .withId("DefaultSchedule") .withFields( new Field().withKey("type").withStringValue("Schedule"), new Field().withKey("period").withStringValue("15 Minutes"), new Field().withKey("startAt").withStringValue("FIRST_ACTIVATION_DATE_TIME") ); PipelineObject defaultObject = new PipelineObject() .withName("Default") .withId("Default") .withFields( new Field().withKey("failureAndRerunMode").withStringValue("CASCADE"), new Field().withKey("schedule").withRefValue("DefaultSchedule"), new Field().withKey("resourceRole").withStringValue("DataPipelineDefaultResourceRole"), new Field().withKey("role").withStringValue("DataPipelineDefaultRole"), new Field().withKey("pipelineLogUri").withStringValue("s3://myLogUri"), new Field().withKey("scheduleType").withStringValue("cron") ); List<PipelineObject> pipelineObjects = new ArrayList<PipelineObject>(); pipelineObjects.add(emrActivity); pipelineObjects.add(emrCluster); pipelineObjects.add(defaultObject); pipelineObjects.add(schedule); PutPipelineDefinitionRequest putPipelineDefintion = new PutPipelineDefinitionRequest() .withPipelineId(pipelineId) .withPipelineObjects(pipelineObjects); PutPipelineDefinitionResult putPipelineResult = dp.putPipelineDefinition(putPipelineDefintion); System.out.println(putPipelineResult); ActivatePipelineRequest activatePipelineReq = new ActivatePipelineRequest() .withPipelineId(pipelineId); ActivatePipelineResult activatePipelineRes = dp.activatePipeline(activatePipelineReq); System.out.println(activatePipelineRes); System.out.println(pipelineId); } }