@Test public void test() throws Exception { RunJobFlowRequest request = new RunJobFlowRequest() .withName("Digdag Test") .withReleaseLabel("emr-5.2.0") .withApplications(Stream.of("Hadoop", "Hive", "Spark", "Flink") .map(s -> new Application().withName(s)) .collect(toList())) .withJobFlowRole("EMR_EC2_DefaultRole") .withServiceRole("EMR_DefaultRole") .withVisibleToAllUsers(true) .withLogUri(tmpS3FolderUri + "/logs/") .withInstances(new JobFlowInstancesConfig() .withEc2KeyName("digdag-test") .withInstanceCount(1) .withKeepJobFlowAliveWhenNoSteps(true) .withMasterInstanceType("m3.xlarge") .withSlaveInstanceType("m3.xlarge")); RunJobFlowResult result = emr.runJobFlow(request); String clusterId = result.getJobFlowId(); clusterIds.add(clusterId); Id attemptId = pushAndStart(server.endpoint(), projectDir, "emr", ImmutableMap.of( "test_s3_folder", tmpS3FolderUri.toString(), "test_cluster", clusterId, "outfile", outfile.toString())); expect(Duration.ofMinutes(30), attemptSuccess(server.endpoint(), attemptId)); validateTdSparkQueryOutput(); assertThat(Files.exists(outfile), is(true)); }
/** * Creates the job flow instance configuration containing specification of the number and type of Amazon EC2 instances. * * @param emrClusterDefinition the EMR cluster definition that contains all the EMR parameters * * @return the job flow instance configuration */ private JobFlowInstancesConfig getJobFlowInstancesConfig(EmrClusterDefinition emrClusterDefinition) { // Create a new job flow instances configuration object. JobFlowInstancesConfig jobFlowInstancesConfig = new JobFlowInstancesConfig(); // Set up master/slave security group jobFlowInstancesConfig.setEmrManagedMasterSecurityGroup(emrClusterDefinition.getMasterSecurityGroup()); jobFlowInstancesConfig.setEmrManagedSlaveSecurityGroup(emrClusterDefinition.getSlaveSecurityGroup()); // Add additional security groups to master nodes. jobFlowInstancesConfig.setAdditionalMasterSecurityGroups(emrClusterDefinition.getAdditionalMasterSecurityGroups()); // Add additional security groups to slave nodes. jobFlowInstancesConfig.setAdditionalSlaveSecurityGroups(emrClusterDefinition.getAdditionalSlaveSecurityGroups()); // Fill-in the ssh key. if (StringUtils.isNotBlank(emrClusterDefinition.getSshKeyPairName())) { jobFlowInstancesConfig.setEc2KeyName(emrClusterDefinition.getSshKeyPairName()); } // Fill in configuration for the instance groups in a cluster. jobFlowInstancesConfig.setInstanceGroups(getInstanceGroupConfigs(emrClusterDefinition.getInstanceDefinitions())); // Fill in instance fleet configuration. jobFlowInstancesConfig.setInstanceFleets(getInstanceFleets(emrClusterDefinition.getInstanceFleets())); // Fill-in subnet id. if (StringUtils.isNotBlank(emrClusterDefinition.getSubnetId())) { // Use collection of subnet IDs when instance fleet configuration is specified. Otherwise, we expect a single EC2 subnet ID to be passed here. if (CollectionUtils.isNotEmpty(jobFlowInstancesConfig.getInstanceFleets())) { jobFlowInstancesConfig.setEc2SubnetIds(herdStringHelper.splitAndTrim(emrClusterDefinition.getSubnetId(), ",")); } else { jobFlowInstancesConfig.setEc2SubnetId(emrClusterDefinition.getSubnetId()); } } // Fill in optional keep alive flag. if (emrClusterDefinition.isKeepAlive() != null) { jobFlowInstancesConfig.setKeepJobFlowAliveWhenNoSteps(emrClusterDefinition.isKeepAlive()); } // Fill in optional termination protection flag. if (emrClusterDefinition.isTerminationProtection() != null) { jobFlowInstancesConfig.setTerminationProtected(emrClusterDefinition.isTerminationProtection()); } // Fill in optional Hadoop version flag. if (StringUtils.isNotBlank(emrClusterDefinition.getHadoopVersion())) { jobFlowInstancesConfig.setHadoopVersion(emrClusterDefinition.getHadoopVersion()); } // Return the object. return jobFlowInstancesConfig; }
@Test public void createEmrClusterAssertCallRunEmrJobFlowWithInstanceFleetAndMultipleSubnets() throws Exception { // Create objects required for testing. final String clusterName = "clusterName"; final String clusterId = "clusterId"; final String name = STRING_VALUE; final String instanceFleetType = STRING_VALUE_2; final Integer targetOnDemandCapacity = INTEGER_VALUE; final Integer targetSpotCapacity = INTEGER_VALUE_2; final List<EmrClusterDefinitionInstanceTypeConfig> emrClusterDefinitionInstanceTypeConfigs = null; final EmrClusterDefinitionLaunchSpecifications emrClusterDefinitionLaunchSpecifications = null; final EmrClusterDefinitionInstanceFleet emrClusterDefinitionInstanceFleet = new EmrClusterDefinitionInstanceFleet(name, instanceFleetType, targetOnDemandCapacity, targetSpotCapacity, emrClusterDefinitionInstanceTypeConfigs, emrClusterDefinitionLaunchSpecifications); // Create an EMR cluster definition with instance fleet configuration and multiple EC2 subnet IDs. EmrClusterDefinition emrClusterDefinition = new EmrClusterDefinition(); emrClusterDefinition.setInstanceFleets(Arrays.asList(emrClusterDefinitionInstanceFleet)); emrClusterDefinition.setSubnetId(String.format("%s , %s ", EC2_SUBNET, EC2_SUBNET_2)); emrClusterDefinition.setNodeTags(Arrays.asList(new NodeTag("tagName", "tagValue"))); when(mockEmrOperations.runEmrJobFlow(any(), any())).then(new Answer<String>() { @Override public String answer(InvocationOnMock invocation) throws Throwable { // Assert that the given EMR cluster definition produced the correct RunJobFlowRequest. RunJobFlowRequest runJobFlowRequest = invocation.getArgument(1); JobFlowInstancesConfig jobFlowInstancesConfig = runJobFlowRequest.getInstances(); assertEquals(0, CollectionUtils.size(jobFlowInstancesConfig.getInstanceGroups())); final List<InstanceTypeConfig> expectedInstanceTypeConfigs = null; assertEquals(Arrays.asList( new InstanceFleetConfig().withName(name).withInstanceFleetType(instanceFleetType).withTargetOnDemandCapacity(targetOnDemandCapacity) .withTargetSpotCapacity(targetSpotCapacity).withInstanceTypeConfigs(expectedInstanceTypeConfigs).withLaunchSpecifications(null)), jobFlowInstancesConfig.getInstanceFleets()); assertNull(jobFlowInstancesConfig.getEc2SubnetId()); assertEquals(2, CollectionUtils.size(jobFlowInstancesConfig.getEc2SubnetIds())); assertTrue(jobFlowInstancesConfig.getEc2SubnetIds().contains(EC2_SUBNET)); assertTrue(jobFlowInstancesConfig.getEc2SubnetIds().contains(EC2_SUBNET_2)); assertEquals(herdStringHelper.getRequiredConfigurationValue(ConfigurationValue.EMR_DEFAULT_EC2_NODE_IAM_PROFILE_NAME), runJobFlowRequest.getJobFlowRole()); assertEquals(herdStringHelper.getRequiredConfigurationValue(ConfigurationValue.EMR_DEFAULT_SERVICE_IAM_ROLE_NAME), runJobFlowRequest.getServiceRole()); List<StepConfig> stepConfigs = runJobFlowRequest.getSteps(); assertEquals(0, stepConfigs.size()); List<Tag> tags = runJobFlowRequest.getTags(); assertEquals(1, tags.size()); { Tag tag = tags.get(0); assertEquals("tagName", tag.getKey()); assertEquals("tagValue", tag.getValue()); } return clusterId; } }); assertEquals(clusterId, emrDao.createEmrCluster(clusterName, emrClusterDefinition, new AwsParamsDto())); }
/** * This method uses method the AWS Java to launch an Apache HBase cluster on Amazon EMR. * * @param client - AmazonElasticMapReduce client that interfaces directly with the Amazon EMR Web Service * @param clusterIdentifier - identifier of an existing cluster * @param amiVersion - AMI to use for launching this cluster * @param keypair - A keypair for SSHing into the Amazon EMR master node * @param masterInstanceType - Master node Amazon EC2 instance type * @param coreInstanceType - core nodes Amazon EC2 instance type * @param logUri - An Amazon S3 bucket for your * @param numberOfNodes - total number of nodes in this cluster including master node * @return */ public static String createCluster(AmazonElasticMapReduce client, String clusterIdentifier, String amiVersion, String keypair, String masterInstanceType, String coreInstanceType, String logUri, int numberOfNodes) { if (clusterExists(client, clusterIdentifier)) { LOG.info("Cluster " + clusterIdentifier + " is available"); return clusterIdentifier; } //Error checking if (amiVersion == null || amiVersion.isEmpty()) throw new RuntimeException("ERROR: Please specify an AMI Version"); if (keypair == null || keypair.isEmpty()) throw new RuntimeException("ERROR: Please specify a valid Amazon Key Pair"); if (masterInstanceType == null || masterInstanceType.isEmpty()) throw new RuntimeException("ERROR: Please specify a Master Instance Type"); if (logUri == null || logUri.isEmpty()) throw new RuntimeException("ERROR: Please specify a valid Amazon S3 bucket for your logs."); if (numberOfNodes < 0) throw new RuntimeException("ERROR: Please specify at least 1 node"); RunJobFlowRequest request = new RunJobFlowRequest() .withAmiVersion(amiVersion) .withBootstrapActions(new BootstrapActionConfig() .withName("Install HBase") .withScriptBootstrapAction(new ScriptBootstrapActionConfig() .withPath("s3://elasticmapreduce/bootstrap-actions/setup-hbase"))) .withName("Job Flow With HBAse Actions") .withSteps(new StepConfig() //enable debugging step .withName("Enable debugging") .withActionOnFailure("TERMINATE_CLUSTER") .withHadoopJarStep(new StepFactory().newEnableDebuggingStep()), //Start HBase step - after installing it with a bootstrap action createStepConfig("Start HBase","TERMINATE_CLUSTER", "/home/hadoop/lib/hbase.jar", getHBaseArgs()), //add HBase backup step createStepConfig("Modify backup schedule","TERMINATE_JOB_FLOW", "/home/hadoop/lib/hbase.jar", getHBaseBackupArgs())) .withLogUri(logUri) .withInstances(new JobFlowInstancesConfig() .withEc2KeyName(keypair) .withInstanceCount(numberOfNodes) .withKeepJobFlowAliveWhenNoSteps(true) .withMasterInstanceType(masterInstanceType) .withSlaveInstanceType(coreInstanceType)); RunJobFlowResult result = client.runJobFlow(request); String state = null; while (!(state = clusterState(client, result.getJobFlowId())).equalsIgnoreCase("waiting")) { try { Thread.sleep(10 * 1000); LOG.info(result.getJobFlowId() + " is " + state + ". Waiting for cluster to become available."); } catch (InterruptedException e) { } if (state.equalsIgnoreCase("TERMINATED_WITH_ERRORS")){ LOG.error("Could not create EMR Cluster"); System.exit(-1); } } LOG.info("Created cluster " + result.getJobFlowId()); LOG.info("Cluster " + clusterIdentifier + " is available"); return result.getJobFlowId(); }
void init() { requireNonNull(this.AWSAccessKey); requireNonNull(this.AWSAccessKey); requireNonNull(this.jarLocation); requireNonNull(this.jarArguments); requireNonNull(this.slavesInstanceType); requireNonNull(this.hadoopVersion); requireNonNull(this.jobFlowName); if (this.nInstances < 1) { throw new IllegalArgumentException( "the number of instance is lower than 1"); } if (this.masterInstanceType == null) { this.masterInstanceType = this.slavesInstanceType; } // Set the hadoop jar step final HadoopJarStepConfig hadoopJarStep = new HadoopJarStepConfig() .withJar(this.jarLocation.trim()).withArgs(this.jarArguments); // Set step config final StepConfig stepConfig = new StepConfig() .withName(this.jobFlowName + "-step").withHadoopJarStep(hadoopJarStep) .withActionOnFailure("TERMINATE_JOB_FLOW"); // Set the instance final JobFlowInstancesConfig instances = new JobFlowInstancesConfig().withInstanceCount(this.nInstances) .withMasterInstanceType(this.masterInstanceType) .withSlaveInstanceType(this.slavesInstanceType) .withHadoopVersion(this.hadoopVersion); // Configure hadoop final ScriptBootstrapActionConfig scriptBootstrapAction = new ScriptBootstrapActionConfig() .withPath( "s3n://eu-west-1.elasticmapreduce/bootstrap-actions/configure-hadoop") .withArgs("--site-key-value", "mapreduce.tasktracker.map.tasks.maximum=" + this.taskTrackerMaxMapTasks); final BootstrapActionConfig bootstrapActions = new BootstrapActionConfig().withName("Configure hadoop") .withScriptBootstrapAction(scriptBootstrapAction); // Enable debugging StepFactory stepFactory = new StepFactory(); StepConfig enableDebugging = new StepConfig().withName("Enable Debugging") .withActionOnFailure("TERMINATE_JOB_FLOW") .withHadoopJarStep(stepFactory.newEnableDebuggingStep()); // Run flow this.runFlowRequest = new RunJobFlowRequest().withName(this.jobFlowName); // Enable or not debugging if (this.enableDebugging) { this.runFlowRequest.withInstances(instances).withSteps(enableDebugging, stepConfig); } else { this.runFlowRequest.withInstances(instances).withSteps(stepConfig); } // Limit the number of task in a task tracker if (this.taskTrackerMaxMapTasks > 0) { this.runFlowRequest.withBootstrapActions(bootstrapActions); } if (this.logPathname != null && !"".equals(this.logPathname)) { this.runFlowRequest.withLogUri(this.logPathname); } // Set EC2 Key name if (this.ec2KeyName != null) { this.runFlowRequest.getInstances().setEc2KeyName(this.ec2KeyName); } }