/** * Converts the given list of {@link EmrClusterDefinitionApplication} into a list of {@link Application} * * @param emrClusterDefinitionApplications list of {@link EmrClusterDefinitionApplication} * * @return list {@link Application} */ public List<Application> getApplications(List<EmrClusterDefinitionApplication> emrClusterDefinitionApplications) { List<Application> applications = new ArrayList<>(); for (EmrClusterDefinitionApplication emrClusterDefinitionApplication : emrClusterDefinitionApplications) { Application application = new Application(); application.setName(emrClusterDefinitionApplication.getName()); application.setVersion(emrClusterDefinitionApplication.getVersion()); application.setArgs(emrClusterDefinitionApplication.getArgs()); List<Parameter> additionalInfoList = emrClusterDefinitionApplication.getAdditionalInfoList(); if (!CollectionUtils.isEmpty(additionalInfoList)) { application.setAdditionalInfo(getMap(additionalInfoList)); } applications.add(application); } return applications; }
@Test public void test() throws Exception { RunJobFlowRequest request = new RunJobFlowRequest() .withName("Digdag Test") .withReleaseLabel("emr-5.2.0") .withApplications(Stream.of("Hadoop", "Hive", "Spark", "Flink") .map(s -> new Application().withName(s)) .collect(toList())) .withJobFlowRole("EMR_EC2_DefaultRole") .withServiceRole("EMR_DefaultRole") .withVisibleToAllUsers(true) .withLogUri(tmpS3FolderUri + "/logs/") .withInstances(new JobFlowInstancesConfig() .withEc2KeyName("digdag-test") .withInstanceCount(1) .withKeepJobFlowAliveWhenNoSteps(true) .withMasterInstanceType("m3.xlarge") .withSlaveInstanceType("m3.xlarge")); RunJobFlowResult result = emr.runJobFlow(request); String clusterId = result.getJobFlowId(); clusterIds.add(clusterId); Id attemptId = pushAndStart(server.endpoint(), projectDir, "emr", ImmutableMap.of( "test_s3_folder", tmpS3FolderUri.toString(), "test_cluster", clusterId, "outfile", outfile.toString())); expect(Duration.ofMinutes(30), attemptSuccess(server.endpoint(), attemptId)); validateTdSparkQueryOutput(); assertThat(Files.exists(outfile), is(true)); }
protected String fireEMRJob(String paramsStr,String clusterId){ StepFactory stepFactory = new StepFactory(); AmazonElasticMapReduceClient emr = new AmazonElasticMapReduceClient(); emr.setRegion(Region.getRegion(Regions.fromName(System.getenv().get("AWS_REGION")))); Application sparkConfig = new Application() .withName("Spark"); String[] params = paramsStr.split(","); StepConfig enabledebugging = new StepConfig() .withName("Enable debugging") .withActionOnFailure("TERMINATE_JOB_FLOW") .withHadoopJarStep(stepFactory.newEnableDebuggingStep()); HadoopJarStepConfig sparkStepConf = new HadoopJarStepConfig() .withJar("command-runner.jar") .withArgs(params); final StepConfig sparkStep = new StepConfig() .withName("Spark Step") .withActionOnFailure("CONTINUE") .withHadoopJarStep(sparkStepConf); AddJobFlowStepsRequest request = new AddJobFlowStepsRequest(clusterId) .withSteps(new ArrayList<StepConfig>(){{add(sparkStep);}}); AddJobFlowStepsResult result = emr.addJobFlowSteps(request); return result.getStepIds().get(0); }