Java 类com.amazonaws.services.elasticmapreduce.model.StepConfig 实例源码

项目:herd    文件:MockEmrOperationsImpl.java   
private MockEmrJobFlow createNewCluster(RunJobFlowRequest jobFlowRequest, String status, StatusChangeReason reason, StatusTimeline timeline)
{
    MockEmrJobFlow cluster = new MockEmrJobFlow();
    cluster.setJobFlowId(getNewJobFlowId());
    cluster.setJobFlowName(jobFlowRequest.getName());
    cluster.setStatus(status);
    cluster.setStatusTimeline(timeline);
    cluster.setStatusChangeReason(reason);
    emrClusters.put(cluster.getJobFlowId(), cluster);

    // Add the steps
    for (StepConfig stepConfig : jobFlowRequest.getSteps())
    {
        addClusterStep(cluster.getJobFlowId(), stepConfig);
    }

    return cluster;
}
项目:herd    文件:EmrPigStepHelper.java   
@Override
public StepConfig getEmrStepConfig(Object step)
{
    EmrPigStep pigStep = (EmrPigStep) step;

    // Default ActionOnFailure is to cancel the execution and wait
    ActionOnFailure actionOnFailure = ActionOnFailure.CANCEL_AND_WAIT;

    if (pigStep.isContinueOnError() != null && pigStep.isContinueOnError())
    {
        // Override based on user input
        actionOnFailure = ActionOnFailure.CONTINUE;
    }

    // If there are no arguments to hive script
    if (CollectionUtils.isEmpty(pigStep.getScriptArguments()))
    {
        // Just build the StepConfig object and return
        return new StepConfig().withName(pigStep.getStepName().trim()).withActionOnFailure(actionOnFailure)
            .withHadoopJarStep(new StepFactory().newRunPigScriptStep(pigStep.getScriptLocation().trim()));
    }
    // If there are arguments specified
    else
    {
        return new StepConfig().withName(pigStep.getStepName().trim()).withActionOnFailure(actionOnFailure).withHadoopJarStep(new StepFactory()
            .newRunPigScriptStep(pigStep.getScriptLocation().trim(),
                pigStep.getScriptArguments().toArray(new String[pigStep.getScriptArguments().size()])));
    }
}
项目:herd    文件:EmrHiveStepHelper.java   
@Override
public StepConfig getEmrStepConfig(Object step)
{
    EmrHiveStep emrHiveStep = (EmrHiveStep) step;

    // Default ActionOnFailure is to cancel the execution and wait
    ActionOnFailure actionOnFailure = ActionOnFailure.CANCEL_AND_WAIT;

    if (emrHiveStep.isContinueOnError() != null && emrHiveStep.isContinueOnError())
    {
        // Override based on user input
        actionOnFailure = ActionOnFailure.CONTINUE;
    }

    // If there are no arguments to hive script
    if (CollectionUtils.isEmpty(emrHiveStep.getScriptArguments()))
    {
        // Just build the StepConfig object and return
        return new StepConfig().withName(emrHiveStep.getStepName().trim()).withActionOnFailure(actionOnFailure)
            .withHadoopJarStep(new StepFactory().newRunHiveScriptStep(emrHiveStep.getScriptLocation().trim()));
    }
    // If there are arguments specified
    else
    {
        // For each argument, add "-d" option
        List<String> hiveArgs = new ArrayList<>();
        for (String hiveArg : emrHiveStep.getScriptArguments())
        {
            hiveArgs.add("-d");
            hiveArgs.add(hiveArg);
        }
        // Return the StepConfig object
        return new StepConfig().withName(emrHiveStep.getStepName().trim()).withActionOnFailure(actionOnFailure).withHadoopJarStep(
            new StepFactory().newRunHiveScriptStep(emrHiveStep.getScriptLocation().trim(), hiveArgs.toArray(new String[hiveArgs.size()])));
    }
}
项目:herd    文件:EmrHadoopJarStepHelper.java   
@Override
public StepConfig getEmrStepConfig(Object step)
{
    EmrHadoopJarStep hadoopJarStep = (EmrHadoopJarStep) step;

    return emrHelper.getEmrHadoopJarStepConfig(hadoopJarStep.getStepName(), hadoopJarStep.getJarLocation(), hadoopJarStep.getMainClass(),
        hadoopJarStep.getScriptArguments(), hadoopJarStep.isContinueOnError());
}
项目:herd    文件:EmrShellStepHelper.java   
@Override
public StepConfig getEmrStepConfig(Object step)
{
    EmrShellStep emrShellStep = (EmrShellStep) step;

    // Hadoop Jar provided by Amazon for running Shell Scripts
    String hadoopJarForShellScript = configurationHelper.getProperty(ConfigurationValue.EMR_SHELL_SCRIPT_JAR);

    // Default ActionOnFailure is to cancel the execution and wait
    ActionOnFailure actionOnFailure = ActionOnFailure.CANCEL_AND_WAIT;
    if (emrShellStep.isContinueOnError() != null && emrShellStep.isContinueOnError())
    {
        // Override based on user input
        actionOnFailure = ActionOnFailure.CONTINUE;
    }

    // Add the script location
    List<String> argsList = new ArrayList<>();
    argsList.add(emrShellStep.getScriptLocation().trim());

    // Add the script arguments
    if (!CollectionUtils.isEmpty(emrShellStep.getScriptArguments()))
    {
        for (String argument : emrShellStep.getScriptArguments())
        {
            argsList.add(argument.trim());
        }
    }

    // Return the StepConfig object
    HadoopJarStepConfig jarConfig = new HadoopJarStepConfig(hadoopJarForShellScript).withArgs(argsList);
    return new StepConfig().withName(emrShellStep.getStepName().trim()).withActionOnFailure(actionOnFailure).withHadoopJarStep(jarConfig);
}
项目:herd    文件:EmrHelper.java   
/**
 * Builds the StepConfig for the Hadoop jar step.
 *
 * @param stepName the step name.
 * @param jarLocation the location of jar.
 * @param mainClass the main class.
 * @param scriptArguments the arguments.
 * @param isContinueOnError indicate what to do on error.
 *
 * @return the stepConfig.
 */
public StepConfig getEmrHadoopJarStepConfig(String stepName, String jarLocation, String mainClass, List<String> scriptArguments, Boolean isContinueOnError)
{
    // Default ActionOnFailure is to cancel the execution and wait
    ActionOnFailure actionOnFailure = ActionOnFailure.CANCEL_AND_WAIT;

    if (isContinueOnError != null && isContinueOnError)
    {
        // Override based on user input
        actionOnFailure = ActionOnFailure.CONTINUE;
    }

    // If there are no arguments
    if (CollectionUtils.isEmpty(scriptArguments))
    {
        // Build the StepConfig object and return
        return new StepConfig().withName(stepName.trim()).withActionOnFailure(actionOnFailure)
            .withHadoopJarStep(new HadoopJarStepConfig().withJar(jarLocation.trim()).withMainClass(mainClass));
    }
    else
    {
        // If there are arguments, include the arguments in the StepConfig object
        return new StepConfig().withName(stepName.trim()).withActionOnFailure(actionOnFailure).withHadoopJarStep(
            new HadoopJarStepConfig().withJar(jarLocation.trim()).withMainClass(mainClass)
                .withArgs(scriptArguments.toArray(new String[scriptArguments.size()])));
    }
}
项目:herd    文件:EmrDaoImpl.java   
@Override
public String addEmrStep(String clusterId, StepConfig emrStepConfig, AwsParamsDto awsParamsDto) throws Exception
{
    List<StepConfig> steps = new ArrayList<>();

    steps.add(emrStepConfig);

    // Add the job flow request
    AddJobFlowStepsRequest jobFlowStepRequest = new AddJobFlowStepsRequest(clusterId, steps);
    List<String> emrStepIds = emrOperations.addJobFlowStepsRequest(getEmrClient(awsParamsDto), jobFlowStepRequest);

    return emrStepIds.get(0);
}
项目:herd    文件:EmrDaoImpl.java   
/**
 * Create the step config list of objects for hive/pig installation.
 *
 * @param emrClusterDefinition the EMR definition name value.
 *
 * @return list of step configuration that contains all the steps for the given configuration.
 */
private List<StepConfig> getStepConfig(EmrClusterDefinition emrClusterDefinition)
{
    StepFactory stepFactory = new StepFactory();
    List<StepConfig> appSteps = new ArrayList<>();

    // Create install hive step and add to the StepConfig list
    if (StringUtils.isNotBlank(emrClusterDefinition.getHiveVersion()))
    {
        StepConfig installHive =
            new StepConfig().withName("Hive " + emrClusterDefinition.getHiveVersion()).withActionOnFailure(ActionOnFailure.TERMINATE_JOB_FLOW)
                .withHadoopJarStep(stepFactory.newInstallHiveStep(emrClusterDefinition.getHiveVersion()));
        appSteps.add(installHive);
    }

    // Create install Pig step and add to the StepConfig List
    if (StringUtils.isNotBlank(emrClusterDefinition.getPigVersion()))
    {
        StepConfig installPig =
            new StepConfig().withName("Pig " + emrClusterDefinition.getPigVersion()).withActionOnFailure(ActionOnFailure.TERMINATE_JOB_FLOW)
                .withHadoopJarStep(stepFactory.newInstallPigStep(emrClusterDefinition.getPigVersion()));
        appSteps.add(installPig);
    }

    // Add the hadoop jar steps that need to be added.
    if (!CollectionUtils.isEmpty(emrClusterDefinition.getHadoopJarSteps()))
    {
        for (HadoopJarStep hadoopJarStep : emrClusterDefinition.getHadoopJarSteps())
        {
            StepConfig stepConfig = emrHelper
                .getEmrHadoopJarStepConfig(hadoopJarStep.getStepName(), hadoopJarStep.getJarLocation(), hadoopJarStep.getMainClass(),
                    hadoopJarStep.getScriptArguments(), hadoopJarStep.isContinueOnError());

            appSteps.add(stepConfig);
        }
    }

    return appSteps;
}
项目:herd    文件:EmrHelperTest.java   
@Test
public void testEmrHadoopJarStepConfig() throws Exception
{
    StepConfig stepConfig = emrHelper.getEmrHadoopJarStepConfig("step_name", "jar_location", null, null, false);

    assertNotNull("step not retuned", stepConfig);

    assertEquals("name not found", "step_name", stepConfig.getName());
    assertEquals("jar not found", "jar_location", stepConfig.getHadoopJarStep().getJar());
}
项目:herd    文件:EmrHelperTest.java   
@Test
public void testEmrHadoopJarStepConfigNoContinueOnError() throws Exception
{
    StepConfig stepConfig = emrHelper.getEmrHadoopJarStepConfig("step_name", "jar_location", null, null, null);

    assertNotNull("step not retuned", stepConfig);

    assertEquals("name not found", "step_name", stepConfig.getName());
    assertEquals("jar not found", "jar_location", stepConfig.getHadoopJarStep().getJar());
}
项目:herd    文件:EmrHelperTest.java   
@Test
public void testEmrHadoopJarStepConfigContinueOnError() throws Exception
{
    StepConfig stepConfig = emrHelper.getEmrHadoopJarStepConfig("step_name", "jar_location", null, null, true);

    assertNotNull("step not retuned", stepConfig);

    assertEquals("name not found", "step_name", stepConfig.getName());
    assertEquals("jar not found", "jar_location", stepConfig.getHadoopJarStep().getJar());
}
项目:herd    文件:EmrHelperTest.java   
@Test
public void testEmrHadoopJarStepConfigWithArguments() throws Exception
{
    List<String> arguments = new ArrayList<>();
    arguments.add("arg1");

    StepConfig stepConfig = emrHelper.getEmrHadoopJarStepConfig("step_name", "jar_location", null, arguments, false);

    assertNotNull("step not retuned", stepConfig);

    assertEquals("name not found", "step_name", stepConfig.getName());
    assertEquals("jar not found", "jar_location", stepConfig.getHadoopJarStep().getJar());
    assertNotNull("arguments not found", stepConfig.getHadoopJarStep().getArgs());
}
项目:herd    文件:MockEmrOperationsImpl.java   
private MockEmrJobFlow addClusterStep(String jobFlowId, StepConfig step)
{
    List<MockEmrJobFlow> mockSteps = getStepsByClusterId(jobFlowId);
    if (mockSteps == null)
    {
        mockSteps = new ArrayList<>();
    }

    MockEmrJobFlow mockStep = new MockEmrJobFlow();
    if (!step.getName().equalsIgnoreCase(MOCK_STEP_RUNNING_WITHOUT_ID_NAME))
    {
        mockStep.setJobFlowId(getNewJobFlowId());
    }
    mockStep.setJobFlowName(step.getName());
    if (step.getName().equalsIgnoreCase(MOCK_STEP_RUNNING_NAME) || step.getName().equalsIgnoreCase(MOCK_STEP_RUNNING_WITHOUT_ID_NAME))
    {
        mockStep.setStatus(StepState.RUNNING.toString());
    }
    else
    {
        mockStep.setStatus(StepState.PENDING.toString());
    }
    mockStep.setJarLocation(step.getHadoopJarStep().getJar());

    mockSteps.add(mockStep);
    setStepsByClusterId(jobFlowId, mockSteps);
    return mockStep;
}
项目:digdag    文件:EmrOperatorFactory.java   
private void addStep(String name, CommandRunnerConfiguration configuration)
        throws IOException
{
    FileReference configurationFileReference = ImmutableFileReference.builder()
            .type(FileReference.Type.DIRECT)
            .contents(objectMapper.writeValueAsBytes(configuration))
            .filename("config.json")
            .build();
    RemoteFile remoteConfigurationFile = prepareRemoteFile(configurationFileReference, false);

    StepConfig runStep = stepConfig(name, tag, step)
            .withHadoopJarStep(stepFactory().newScriptRunnerStep(runner.s3Uri().toString(), remoteConfigurationFile.s3Uri().toString()));

    configs.add(runStep);
}
项目:digdag    文件:EmrOperatorFactory.java   
private StepConfig stepConfig(String defaultName, String tag, Config step)
{
    String name = step.get("name", String.class, defaultName);
    return new StepConfig()
            .withName(name + " (" + tag + ")")
            // TERMINATE_JOB_FLOW | TERMINATE_CLUSTER | CANCEL_AND_WAIT | CONTINUE
            .withActionOnFailure(step.get("action_on_failure", String.class, defaultActionOnFailure));
}
项目:aws-big-data-blog    文件:EMRUtils.java   
/**
 * This is a helper method for creating step configuration information
 * @param stepName - a custom name to label this step
 * @param actionOnFailure - options are terminate cluster, terminate job flow, contiunue
 * @param jarPath - path to jar file - could be on S3 or  local file system
 * @param args list of Java args to configure custom step
 * @return
 */
private static StepConfig createStepConfig(String stepName, String actionOnFailure, String jarPath, List<String> args ) {
    //Start HBase step - after installing it with a bootstrap action
            StepConfig stepConfig = new StepConfig()
                .withName(stepName)
                .withActionOnFailure(actionOnFailure)
                .withHadoopJarStep(new HadoopJarStepConfig()
                                    .withJar(jarPath)
                                    .withArgs(args));
            return stepConfig;
}
项目:aws-big-data-blog    文件:LambdaContainer.java   
protected String fireEMRJob(String paramsStr,String clusterId){
    StepFactory stepFactory = new StepFactory();
    AmazonElasticMapReduceClient emr = new AmazonElasticMapReduceClient();
    emr.setRegion(Region.getRegion(Regions.fromName(System.getenv().get("AWS_REGION"))));
    Application sparkConfig = new Application()
            .withName("Spark");

    String[] params = paramsStr.split(",");
    StepConfig enabledebugging = new StepConfig()
            .withName("Enable debugging")
            .withActionOnFailure("TERMINATE_JOB_FLOW")
            .withHadoopJarStep(stepFactory.newEnableDebuggingStep());

    HadoopJarStepConfig sparkStepConf = new HadoopJarStepConfig()
            .withJar("command-runner.jar")
            .withArgs(params);  

    final StepConfig sparkStep = new StepConfig()
            .withName("Spark Step")
            .withActionOnFailure("CONTINUE")
            .withHadoopJarStep(sparkStepConf);


    AddJobFlowStepsRequest request = new AddJobFlowStepsRequest(clusterId)
            .withSteps(new ArrayList<StepConfig>(){{add(sparkStep);}});


    AddJobFlowStepsResult result = emr.addJobFlowSteps(request);
    return result.getStepIds().get(0);
}
项目:herd    文件:EmrDaoTest.java   
@Test
public void createEmrClusterAssertCallRunEmrJobFlowWithInstanceFleetAndMultipleSubnets() throws Exception
{
    // Create objects required for testing.
    final String clusterName = "clusterName";
    final String clusterId = "clusterId";
    final String name = STRING_VALUE;
    final String instanceFleetType = STRING_VALUE_2;
    final Integer targetOnDemandCapacity = INTEGER_VALUE;
    final Integer targetSpotCapacity = INTEGER_VALUE_2;
    final List<EmrClusterDefinitionInstanceTypeConfig> emrClusterDefinitionInstanceTypeConfigs = null;
    final EmrClusterDefinitionLaunchSpecifications emrClusterDefinitionLaunchSpecifications = null;
    final EmrClusterDefinitionInstanceFleet emrClusterDefinitionInstanceFleet =
        new EmrClusterDefinitionInstanceFleet(name, instanceFleetType, targetOnDemandCapacity, targetSpotCapacity, emrClusterDefinitionInstanceTypeConfigs,
            emrClusterDefinitionLaunchSpecifications);

    // Create an EMR cluster definition with instance fleet configuration and multiple EC2 subnet IDs.
    EmrClusterDefinition emrClusterDefinition = new EmrClusterDefinition();
    emrClusterDefinition.setInstanceFleets(Arrays.asList(emrClusterDefinitionInstanceFleet));
    emrClusterDefinition.setSubnetId(String.format("%s , %s  ", EC2_SUBNET, EC2_SUBNET_2));
    emrClusterDefinition.setNodeTags(Arrays.asList(new NodeTag("tagName", "tagValue")));

    when(mockEmrOperations.runEmrJobFlow(any(), any())).then(new Answer<String>()
    {
        @Override
        public String answer(InvocationOnMock invocation) throws Throwable
        {
            // Assert that the given EMR cluster definition produced the correct RunJobFlowRequest.
            RunJobFlowRequest runJobFlowRequest = invocation.getArgument(1);
            JobFlowInstancesConfig jobFlowInstancesConfig = runJobFlowRequest.getInstances();
            assertEquals(0, CollectionUtils.size(jobFlowInstancesConfig.getInstanceGroups()));
            final List<InstanceTypeConfig> expectedInstanceTypeConfigs = null;
            assertEquals(Arrays.asList(
                new InstanceFleetConfig().withName(name).withInstanceFleetType(instanceFleetType).withTargetOnDemandCapacity(targetOnDemandCapacity)
                    .withTargetSpotCapacity(targetSpotCapacity).withInstanceTypeConfigs(expectedInstanceTypeConfigs).withLaunchSpecifications(null)),
                jobFlowInstancesConfig.getInstanceFleets());
            assertNull(jobFlowInstancesConfig.getEc2SubnetId());
            assertEquals(2, CollectionUtils.size(jobFlowInstancesConfig.getEc2SubnetIds()));
            assertTrue(jobFlowInstancesConfig.getEc2SubnetIds().contains(EC2_SUBNET));
            assertTrue(jobFlowInstancesConfig.getEc2SubnetIds().contains(EC2_SUBNET_2));
            assertEquals(herdStringHelper.getRequiredConfigurationValue(ConfigurationValue.EMR_DEFAULT_EC2_NODE_IAM_PROFILE_NAME),
                runJobFlowRequest.getJobFlowRole());
            assertEquals(herdStringHelper.getRequiredConfigurationValue(ConfigurationValue.EMR_DEFAULT_SERVICE_IAM_ROLE_NAME),
                runJobFlowRequest.getServiceRole());
            List<StepConfig> stepConfigs = runJobFlowRequest.getSteps();
            assertEquals(0, stepConfigs.size());
            List<Tag> tags = runJobFlowRequest.getTags();
            assertEquals(1, tags.size());
            {
                Tag tag = tags.get(0);
                assertEquals("tagName", tag.getKey());
                assertEquals("tagValue", tag.getValue());
            }

            return clusterId;
        }
    });

    assertEquals(clusterId, emrDao.createEmrCluster(clusterName, emrClusterDefinition, new AwsParamsDto()));
}
项目:digdag    文件:EmrOperatorFactory.java   
List<StepConfig> stepConfigs()
{
    Preconditions.checkState(configs != null);
    return configs;
}
项目:aws-big-data-blog    文件:EMRUtils.java   
/**
 * This method uses method the AWS Java to launch an Apache HBase cluster on Amazon EMR. 
 * 
 * @param client - AmazonElasticMapReduce client that interfaces directly with the Amazon EMR Web Service
 * @param clusterIdentifier - identifier of an existing cluster
 * @param amiVersion - AMI to use for launching this cluster
 * @param keypair - A keypair for SSHing into the Amazon EMR master node
 * @param masterInstanceType - Master node Amazon EC2 instance type 
 * @param coreInstanceType - core nodes Amazon EC2 instance type 
 * @param logUri - An Amazon S3 bucket for your 
 * @param numberOfNodes - total number of nodes in this cluster including master node
 * @return
 */
public static String createCluster(AmazonElasticMapReduce client,
        String clusterIdentifier,
        String amiVersion,
        String keypair,
        String masterInstanceType,
        String coreInstanceType,
        String logUri,
        int numberOfNodes) {

    if (clusterExists(client, clusterIdentifier)) {
        LOG.info("Cluster " + clusterIdentifier + " is available");
        return clusterIdentifier;
    }

    //Error checking
    if (amiVersion == null || amiVersion.isEmpty()) throw new RuntimeException("ERROR: Please specify an AMI Version");
    if (keypair == null || keypair.isEmpty()) throw new RuntimeException("ERROR: Please specify a valid Amazon Key Pair");
    if (masterInstanceType == null || masterInstanceType.isEmpty()) throw new RuntimeException("ERROR: Please specify a Master Instance Type");
    if (logUri == null || logUri.isEmpty()) throw new RuntimeException("ERROR: Please specify a valid Amazon S3 bucket for your logs.");
    if (numberOfNodes < 0) throw new RuntimeException("ERROR: Please specify at least 1 node");

      RunJobFlowRequest request = new RunJobFlowRequest()
        .withAmiVersion(amiVersion)
        .withBootstrapActions(new BootstrapActionConfig()
                     .withName("Install HBase")
                     .withScriptBootstrapAction(new ScriptBootstrapActionConfig()
                     .withPath("s3://elasticmapreduce/bootstrap-actions/setup-hbase")))
        .withName("Job Flow With HBAse Actions")     
        .withSteps(new StepConfig() //enable debugging step
                    .withName("Enable debugging")
                    .withActionOnFailure("TERMINATE_CLUSTER")
                    .withHadoopJarStep(new StepFactory().newEnableDebuggingStep()), 
                    //Start HBase step - after installing it with a bootstrap action
                    createStepConfig("Start HBase","TERMINATE_CLUSTER", "/home/hadoop/lib/hbase.jar", getHBaseArgs()), 
                    //add HBase backup step
                    createStepConfig("Modify backup schedule","TERMINATE_JOB_FLOW", "/home/hadoop/lib/hbase.jar", getHBaseBackupArgs()))
        .withLogUri(logUri)
        .withInstances(new JobFlowInstancesConfig()
        .withEc2KeyName(keypair)
        .withInstanceCount(numberOfNodes)
        .withKeepJobFlowAliveWhenNoSteps(true)
        .withMasterInstanceType(masterInstanceType)
        .withSlaveInstanceType(coreInstanceType));

    RunJobFlowResult result = client.runJobFlow(request);

    String state = null;
    while (!(state = clusterState(client, result.getJobFlowId())).equalsIgnoreCase("waiting")) {
        try {
            Thread.sleep(10 * 1000);
            LOG.info(result.getJobFlowId() + " is " + state + ". Waiting for cluster to become available.");
        } catch (InterruptedException e) {

        }

        if (state.equalsIgnoreCase("TERMINATED_WITH_ERRORS")){
            LOG.error("Could not create EMR Cluster");
            System.exit(-1);    
        }
    }
    LOG.info("Created cluster " + result.getJobFlowId());
    LOG.info("Cluster " + clusterIdentifier + " is available"); 
    return result.getJobFlowId();
}
项目:eoulsan    文件:AWSElasticMapReduceJob.java   
void init() {

    requireNonNull(this.AWSAccessKey);
    requireNonNull(this.AWSAccessKey);
    requireNonNull(this.jarLocation);
    requireNonNull(this.jarArguments);
    requireNonNull(this.slavesInstanceType);
    requireNonNull(this.hadoopVersion);
    requireNonNull(this.jobFlowName);

    if (this.nInstances < 1) {
      throw new IllegalArgumentException(
          "the number of instance is lower than 1");
    }

    if (this.masterInstanceType == null) {
      this.masterInstanceType = this.slavesInstanceType;
    }

    // Set the hadoop jar step
    final HadoopJarStepConfig hadoopJarStep = new HadoopJarStepConfig()
        .withJar(this.jarLocation.trim()).withArgs(this.jarArguments);

    // Set step config
    final StepConfig stepConfig = new StepConfig()
        .withName(this.jobFlowName + "-step").withHadoopJarStep(hadoopJarStep)
        .withActionOnFailure("TERMINATE_JOB_FLOW");

    // Set the instance
    final JobFlowInstancesConfig instances =
        new JobFlowInstancesConfig().withInstanceCount(this.nInstances)
            .withMasterInstanceType(this.masterInstanceType)
            .withSlaveInstanceType(this.slavesInstanceType)
            .withHadoopVersion(this.hadoopVersion);

    // Configure hadoop
    final ScriptBootstrapActionConfig scriptBootstrapAction =
        new ScriptBootstrapActionConfig()
            .withPath(
                "s3n://eu-west-1.elasticmapreduce/bootstrap-actions/configure-hadoop")
            .withArgs("--site-key-value",
                "mapreduce.tasktracker.map.tasks.maximum="
                    + this.taskTrackerMaxMapTasks);

    final BootstrapActionConfig bootstrapActions =
        new BootstrapActionConfig().withName("Configure hadoop")
            .withScriptBootstrapAction(scriptBootstrapAction);

    // Enable debugging
    StepFactory stepFactory = new StepFactory();
    StepConfig enableDebugging = new StepConfig().withName("Enable Debugging")
        .withActionOnFailure("TERMINATE_JOB_FLOW")
        .withHadoopJarStep(stepFactory.newEnableDebuggingStep());

    // Run flow
    this.runFlowRequest = new RunJobFlowRequest().withName(this.jobFlowName);

    // Enable or not debugging
    if (this.enableDebugging) {
      this.runFlowRequest.withInstances(instances).withSteps(enableDebugging,
          stepConfig);
    } else {
      this.runFlowRequest.withInstances(instances).withSteps(stepConfig);
    }

    // Limit the number of task in a task tracker
    if (this.taskTrackerMaxMapTasks > 0) {
      this.runFlowRequest.withBootstrapActions(bootstrapActions);
    }

    if (this.logPathname != null && !"".equals(this.logPathname)) {
      this.runFlowRequest.withLogUri(this.logPathname);
    }

    // Set EC2 Key name
    if (this.ec2KeyName != null) {
      this.runFlowRequest.getInstances().setEc2KeyName(this.ec2KeyName);
    }
  }
项目:herd    文件:EmrStepHelper.java   
/**
 * This method gets the StepConfig object for the given Step.
 *
 * @param step the step object
 *
 * @return the step config object
 */
public abstract StepConfig getEmrStepConfig(Object step);
项目:herd    文件:EmrDao.java   
/**
 * Add an EMR Step. This method adds the step to EMR cluster based on the input.
 *
 * @param clusterId EMR cluster ID.
 * @param emrStepConfig the EMR step config to be added.
 * @param awsParamsDto the proxy details.
 * <p/>
 * There are four serializable objects supported currently. They are 1: ShellStep - For shell scripts 2: HiveStep - For hive scripts 3: HadoopJarStep - For
 * Custom Map Reduce Jar files and 4: PigStep - For Pig scripts.
 *
 * @return the step id
 */
public String addEmrStep(String clusterId, StepConfig emrStepConfig, AwsParamsDto awsParamsDto) throws Exception;