Java 类com.amazonaws.services.elasticmapreduce.model.RunJobFlowResult 实例源码

项目:digdag    文件:EmrIT.java   
@Test
public void test()
        throws Exception
{
    RunJobFlowRequest request = new RunJobFlowRequest()
            .withName("Digdag Test")
            .withReleaseLabel("emr-5.2.0")
            .withApplications(Stream.of("Hadoop", "Hive", "Spark", "Flink")
                    .map(s -> new Application().withName(s))
                    .collect(toList()))
            .withJobFlowRole("EMR_EC2_DefaultRole")
            .withServiceRole("EMR_DefaultRole")
            .withVisibleToAllUsers(true)
            .withLogUri(tmpS3FolderUri + "/logs/")
            .withInstances(new JobFlowInstancesConfig()
                    .withEc2KeyName("digdag-test")
                    .withInstanceCount(1)
                    .withKeepJobFlowAliveWhenNoSteps(true)
                    .withMasterInstanceType("m3.xlarge")
                    .withSlaveInstanceType("m3.xlarge"));

    RunJobFlowResult result = emr.runJobFlow(request);

    String clusterId = result.getJobFlowId();

    clusterIds.add(clusterId);

    Id attemptId = pushAndStart(server.endpoint(), projectDir, "emr", ImmutableMap.of(
            "test_s3_folder", tmpS3FolderUri.toString(),
            "test_cluster", clusterId,
            "outfile", outfile.toString()));
    expect(Duration.ofMinutes(30), attemptSuccess(server.endpoint(), attemptId));

    validateTdSparkQueryOutput();

    assertThat(Files.exists(outfile), is(true));
}
项目:aws-big-data-blog    文件:EMRUtils.java   
/**
 * This method uses method the AWS Java to launch an Apache HBase cluster on Amazon EMR. 
 * 
 * @param client - AmazonElasticMapReduce client that interfaces directly with the Amazon EMR Web Service
 * @param clusterIdentifier - identifier of an existing cluster
 * @param amiVersion - AMI to use for launching this cluster
 * @param keypair - A keypair for SSHing into the Amazon EMR master node
 * @param masterInstanceType - Master node Amazon EC2 instance type 
 * @param coreInstanceType - core nodes Amazon EC2 instance type 
 * @param logUri - An Amazon S3 bucket for your 
 * @param numberOfNodes - total number of nodes in this cluster including master node
 * @return
 */
public static String createCluster(AmazonElasticMapReduce client,
        String clusterIdentifier,
        String amiVersion,
        String keypair,
        String masterInstanceType,
        String coreInstanceType,
        String logUri,
        int numberOfNodes) {

    if (clusterExists(client, clusterIdentifier)) {
        LOG.info("Cluster " + clusterIdentifier + " is available");
        return clusterIdentifier;
    }

    //Error checking
    if (amiVersion == null || amiVersion.isEmpty()) throw new RuntimeException("ERROR: Please specify an AMI Version");
    if (keypair == null || keypair.isEmpty()) throw new RuntimeException("ERROR: Please specify a valid Amazon Key Pair");
    if (masterInstanceType == null || masterInstanceType.isEmpty()) throw new RuntimeException("ERROR: Please specify a Master Instance Type");
    if (logUri == null || logUri.isEmpty()) throw new RuntimeException("ERROR: Please specify a valid Amazon S3 bucket for your logs.");
    if (numberOfNodes < 0) throw new RuntimeException("ERROR: Please specify at least 1 node");

      RunJobFlowRequest request = new RunJobFlowRequest()
        .withAmiVersion(amiVersion)
        .withBootstrapActions(new BootstrapActionConfig()
                     .withName("Install HBase")
                     .withScriptBootstrapAction(new ScriptBootstrapActionConfig()
                     .withPath("s3://elasticmapreduce/bootstrap-actions/setup-hbase")))
        .withName("Job Flow With HBAse Actions")     
        .withSteps(new StepConfig() //enable debugging step
                    .withName("Enable debugging")
                    .withActionOnFailure("TERMINATE_CLUSTER")
                    .withHadoopJarStep(new StepFactory().newEnableDebuggingStep()), 
                    //Start HBase step - after installing it with a bootstrap action
                    createStepConfig("Start HBase","TERMINATE_CLUSTER", "/home/hadoop/lib/hbase.jar", getHBaseArgs()), 
                    //add HBase backup step
                    createStepConfig("Modify backup schedule","TERMINATE_JOB_FLOW", "/home/hadoop/lib/hbase.jar", getHBaseBackupArgs()))
        .withLogUri(logUri)
        .withInstances(new JobFlowInstancesConfig()
        .withEc2KeyName(keypair)
        .withInstanceCount(numberOfNodes)
        .withKeepJobFlowAliveWhenNoSteps(true)
        .withMasterInstanceType(masterInstanceType)
        .withSlaveInstanceType(coreInstanceType));

    RunJobFlowResult result = client.runJobFlow(request);

    String state = null;
    while (!(state = clusterState(client, result.getJobFlowId())).equalsIgnoreCase("waiting")) {
        try {
            Thread.sleep(10 * 1000);
            LOG.info(result.getJobFlowId() + " is " + state + ". Waiting for cluster to become available.");
        } catch (InterruptedException e) {

        }

        if (state.equalsIgnoreCase("TERMINATED_WITH_ERRORS")){
            LOG.error("Could not create EMR Cluster");
            System.exit(-1);    
        }
    }
    LOG.info("Created cluster " + result.getJobFlowId());
    LOG.info("Cluster " + clusterIdentifier + " is available"); 
    return result.getJobFlowId();
}