private List<StepSummary> listSubmittedSteps(AmazonElasticMapReduce emr, String tag, NewCluster cluster) { List<StepSummary> steps = new ArrayList<>(); ListStepsRequest request = new ListStepsRequest().withClusterId(cluster.id()); while (steps.size() < cluster.steps()) { ListStepsResult result = emr.listSteps(request); for (StepSummary step : result.getSteps()) { if (step.getName().contains(tag)) { steps.add(step); } } if (result.getMarker() == null) { break; } request.setMarker(result.getMarker()); } // The ListSteps api returns steps in reverse order. So reverse them to submission order. Collections.reverse(steps); return steps; }
/** * Collect data for ElasticMapReduce. * * @param stats * current statistics object. * @param account * currently used credentials object. * @param region * currently used aws region. */ public static void scanElasticMapReduce(AwsStats stats, AwsAccount account, Regions region) { LOG.debug("Scan for MapReduce in region " + region.getName() + " in account " + account.getAccountId()); try { AmazonElasticMapReduce elasticMapReduce = new AmazonElasticMapReduceClient(account.getCredentials()); elasticMapReduce.setRegion(Region.getRegion(region)); List<ClusterSummary> list = elasticMapReduce.listClusters().getClusters(); int totalItems = list.size(); for (ClusterSummary cs : list) { stats.add(new AwsResource(cs.getName(), account.getAccountId(), AwsResourceType.ElasticMapReduce, region)); } LOG.info(totalItems + " ElasticMapReduce clusters in region " + region.getName() + " in account " + account.getAccountId()); } catch (AmazonServiceException ase) { if (ase.getErrorCode().contains("AccessDenied")) { LOG.info("Access denied for ElasticMapReduce in region " + region.getName() + " in account " + account.getAccountId()); } else { LOG.error("Exception of ElasticMapReduce: " + ase.getMessage()); } } }
private TaskResult run(String tag, AmazonElasticMapReduce emr, AWSKMSClient kms, Filer filer) throws IOException { ParameterCompiler parameterCompiler = new ParameterCompiler(kms, context); // Set up step compiler List<Config> steps = params.getListOrEmpty("steps", Config.class); StepCompiler stepCompiler = new StepCompiler(tag, steps, filer, parameterCompiler, objectMapper, defaultActionOnFailure); // Set up job submitter Submitter submitter; Config cluster = null; try { cluster = params.parseNestedOrGetEmpty("cluster"); } catch (ConfigException ignore) { } if (cluster != null) { // Create a new cluster submitter = newClusterSubmitter(emr, tag, stepCompiler, cluster, filer, parameterCompiler); } else { // Cluster ID? Use existing cluster. String clusterId = params.get("cluster", String.class); submitter = existingClusterSubmitter(emr, tag, stepCompiler, clusterId, filer); } // Submit EMR job SubmissionResult submission = submitter.submit(); // Wait for the steps to finish running if (!steps.isEmpty()) { waitForSteps(emr, submission); } return result(submission); }
private void waitForSteps(AmazonElasticMapReduce emr, SubmissionResult submission) { String lastStepId = Iterables.getLast(submission.stepIds()); pollingWaiter(state, "result") .withWaitMessage("EMR job still running: %s", submission.clusterId()) .withPollInterval(DurationInterval.of(Duration.ofSeconds(15), Duration.ofMinutes(5))) .awaitOnce(Step.class, pollState -> checkStepCompletion(emr, submission, lastStepId, pollState)); }
private Submitter existingClusterSubmitter(AmazonElasticMapReduce emr, String tag, StepCompiler stepCompiler, String clusterId, Filer filer) { return () -> { List<String> stepIds = pollingRetryExecutor(state, "submission") .retryUnless(AmazonServiceException.class, Aws::isDeterministicException) .withRetryInterval(DurationInterval.of(Duration.ofSeconds(30), Duration.ofMinutes(5))) .runOnce(new TypeReference<List<String>>() {}, s -> { RemoteFile runner = prepareRunner(filer, tag); // Compile steps stepCompiler.compile(runner); // Stage files to S3 filer.stageFiles(); AddJobFlowStepsRequest request = new AddJobFlowStepsRequest() .withJobFlowId(clusterId) .withSteps(stepCompiler.stepConfigs()); int steps = request.getSteps().size(); logger.info("Submitting {} EMR step(s) to {}", steps, clusterId); AddJobFlowStepsResult result = emr.addJobFlowSteps(request); logSubmittedSteps(clusterId, steps, i -> request.getSteps().get(i).getName(), i -> result.getStepIds().get(i)); return ImmutableList.copyOf(result.getStepIds()); }); return SubmissionResult.ofExistingCluster(clusterId, stepIds); }; }
private Submitter newClusterSubmitter(AmazonElasticMapReduce emr, String tag, StepCompiler stepCompiler, Config clusterConfig, Filer filer, ParameterCompiler parameterCompiler) { return () -> { // Start cluster NewCluster cluster = pollingRetryExecutor(state, "submission") .withRetryInterval(DurationInterval.of(Duration.ofSeconds(30), Duration.ofMinutes(5))) // TODO: EMR requests are not idempotent, thus retrying might produce duplicate cluster submissions. .retryUnless(AmazonServiceException.class, Aws::isDeterministicException) .runOnce(NewCluster.class, s -> submitNewClusterRequest(emr, tag, stepCompiler, clusterConfig, filer, parameterCompiler)); // Get submitted step IDs List<String> stepIds = pollingRetryExecutor(this.state, "steps") .withRetryInterval(DurationInterval.of(Duration.ofSeconds(30), Duration.ofMinutes(5))) .retryUnless(AmazonServiceException.class, Aws::isDeterministicException) .runOnce(new TypeReference<List<String>>() {}, s -> { List<StepSummary> steps = listSubmittedSteps(emr, tag, cluster); logSubmittedSteps(cluster.id(), cluster.steps(), i -> steps.get(i).getName(), i -> steps.get(i).getId()); return steps.stream().map(StepSummary::getId).collect(toList()); }); // Log cluster status while waiting for it to come up pollingWaiter(state, "bootstrap") .withWaitMessage("EMR cluster still booting") .withPollInterval(DurationInterval.of(Duration.ofSeconds(30), Duration.ofMinutes(5))) .awaitOnce(String.class, pollState -> checkClusterBootStatus(emr, cluster, pollState)); return SubmissionResult.ofNewCluster(cluster.id(), stepIds); }; }
/** * Helper method to determine if an Amazon EMR cluster exists * * @param client * The {@link AmazonElasticMapReduceClient} with read permissions * @param clusterIdentifier * The Amazon EMR cluster to check * @return true if the Amazon EMR cluster exists, otherwise false */ public static boolean clusterExists(AmazonElasticMapReduce client, String clusterIdentifier) { if (clusterIdentifier != null && !clusterIdentifier.isEmpty()) { ListClustersResult clustersList = client.listClusters(); ListIterator<ClusterSummary> iterator = clustersList.getClusters().listIterator(); ClusterSummary summary; for (summary = iterator.next() ; iterator.hasNext();summary = iterator.next()) { if (summary.getId().equals(clusterIdentifier)) { DescribeClusterRequest describeClusterRequest = new DescribeClusterRequest().withClusterId(clusterIdentifier); DescribeClusterResult result = client.describeCluster(describeClusterRequest); if (result != null) { Cluster cluster = result.getCluster(); //check if HBase is installed on this cluster if (isHBaseInstalled(client, cluster.getId())) return false; String state = cluster.getStatus().getState(); LOG.info(clusterIdentifier + " is " + state + ". "); if (state.equalsIgnoreCase("RUNNING") ||state.equalsIgnoreCase("WAITING")) { LOG.info("The cluster with id " + clusterIdentifier + " exists and is " + state); return true; } } } } } LOG.info("The cluster with id " + clusterIdentifier + " does not exist"); return false; }
/** * Helper method to determine the master node public DNS of an Amazon EMR cluster * * @param client - The {@link AmazonElasticMapReduceClient} with read permissions * @param clusterIdentifier - unique identifier for this cluster * @return public dns url */ public static String getPublicDns(AmazonElasticMapReduce client, String clusterId) { DescribeJobFlowsResult describeJobFlows=client.describeJobFlows(new DescribeJobFlowsRequest().withJobFlowIds(clusterId)); describeJobFlows.getJobFlows(); List<JobFlowDetail> jobFlows = describeJobFlows.getJobFlows(); JobFlowDetail jobflow = jobFlows.get(0); JobFlowInstancesDetail instancesDetail = jobflow.getInstances(); LOG.info("EMR cluster public DNS is "+instancesDetail.getMasterPublicDnsName()); return instancesDetail.getMasterPublicDnsName(); }
/** * Helper method to determine if HBase is installed on this cluster * @param client - The {@link AmazonElasticMapReduceClient} with read permissions * @param clusterId - unique identifier for this cluster * @return true, other throws Runtime exception */ private static boolean isHBaseInstalled(AmazonElasticMapReduce client, String clusterId) { ListBootstrapActionsResult bootstrapActions = client.listBootstrapActions(new ListBootstrapActionsRequest() .withClusterId(clusterId)); ListIterator<Command> iterator = bootstrapActions.getBootstrapActions().listIterator(); while(iterator.hasNext()) { Command command = iterator.next(); if (command.getName().equalsIgnoreCase("Install HBase")) return true; } throw new RuntimeException("ERROR: Apache HBase is not installed on this cluster!!"); }
private Optional<ClusterSummary> findClusterWithName(AmazonElasticMapReduce emr, String name) { List<ClusterSummary> csrl = emr.listClusters((new ListClustersRequest()).withClusterStates(activeClusterStates)).getClusters(); for (ClusterSummary csr : csrl) { if (csr.getName().equals(name)) return Optional.of(csr); } return Optional.empty(); }
/** * Lists existing active clusters Names * * @return cluster names */ public List<String> listActiveClusterNames() { AmazonElasticMapReduce emr = sparkEmrClientBuilder.build(); List<ClusterSummary> csrl = emr.listClusters(new ListClustersRequest().withClusterStates(activeClusterStates)).getClusters(); logClusters(csrl); List<String> res = new ArrayList<>(csrl.size()); for (ClusterSummary csr : csrl) res.add(csr.getName()); return res; }
/** * List existing active cluster IDs * * @return cluster IDs */ public List<String> listActiveClusterIds() { AmazonElasticMapReduce emr = sparkEmrClientBuilder.build(); List<ClusterSummary> csrl = emr.listClusters(new ListClustersRequest().withClusterStates(activeClusterStates)).getClusters(); logClusters(csrl); List<String> res = new ArrayList<>(csrl.size()); for (ClusterSummary csr : csrl) res.add(csr.getId()); return res; }
/** * Terminates a cluster */ public void terminateCluster() { AmazonElasticMapReduce emr = sparkEmrClientBuilder.build(); Optional<ClusterSummary> optClusterSum = findClusterWithName(emr, sparkClusterName); if (!optClusterSum.isPresent()) { log.error(String.format("The cluster with name %s , requested for deletion, does not exist.", sparkClusterName)); } else { String id = optClusterSum.get().getId(); emr.terminateJobFlows((new TerminateJobFlowsRequest()).withJobFlowIds(id)); log.info(String.format("The cluster with id %s is terminating.", id)); } }
private void checkStatus(AmazonElasticMapReduce emr, String clusterId) throws InterruptedException { log.info("."); com.amazonaws.services.elasticmapreduce.model.Cluster dcr = emr.describeCluster((new DescribeClusterRequest()).withClusterId(clusterId)).getCluster(); String state = dcr.getStatus().getState(); long timeOutTime = System.currentTimeMillis() + ((long) sparkTimeoutDurationMinutes * 60 * 1000); Boolean activated = Arrays.asList(activeClusterStates).contains(ClusterState.fromValue(state)); Boolean timedOut = System.currentTimeMillis() > timeOutTime; if (activated && timedOut) { emr.terminateJobFlows( new TerminateJobFlowsRequest().withJobFlowIds(clusterId) ); log.error("Timeout. Cluster terminated."); } else if (!activated) { Boolean hasAbnormalStep = false; StepSummary stepS = null; List<StepSummary> steps = emr.listSteps(new ListStepsRequest().withClusterId(clusterId)).getSteps(); for (StepSummary step : steps) { if (step.getStatus().getState() != StepState.COMPLETED.toString()) { hasAbnormalStep = true; stepS = step; } } if (hasAbnormalStep && stepS != null) log.error(String.format("Cluster %s terminated with an abnormal step, name %s, id %s", clusterId, stepS.getName(), stepS.getId())); else log.info("Cluster %s terminated without error.", clusterId); } else { Thread.sleep(5000); checkStatus(emr, clusterId); } }
/** * Monitor the cluster and terminates when it times out */ public void sparkMonitor() throws InterruptedException { AmazonElasticMapReduce emr = sparkEmrClientBuilder.build(); Optional<ClusterSummary> optCsr = findClusterWithName(emr, sparkClusterName); if (!optCsr.isPresent()) { log.error(String.format("The cluster with name %s does not exist.", sparkClusterName)); } else { ClusterSummary csr = optCsr.get(); log.info(String.format("found cluster with id %s, starting monitoring", csr.getId())); checkStatus(emr, csr.getId()); } }
private Optional<String> checkClusterBootStatus(AmazonElasticMapReduce emr, NewCluster cluster, TaskState state) { // Only creating a cluster, with no steps? boolean createOnly = cluster.steps() == 0; DescribeClusterResult describeClusterResult = pollingRetryExecutor(state, "describe-cluster") .withRetryInterval(DurationInterval.of(Duration.ofSeconds(30), Duration.ofMinutes(5))) .retryUnless(AmazonServiceException.class, Aws::isDeterministicException) .run(ds -> emr.describeCluster(new DescribeClusterRequest().withClusterId(cluster.id()))); ClusterStatus clusterStatus = describeClusterResult.getCluster().getStatus(); String clusterState = clusterStatus.getState(); switch (clusterState) { case "STARTING": logger.info("EMR cluster starting: {}", cluster.id()); return Optional.absent(); case "BOOTSTRAPPING": logger.info("EMR cluster bootstrapping: {}", cluster.id()); return Optional.absent(); case "RUNNING": case "WAITING": logger.info("EMR cluster up: {}", cluster.id()); return Optional.of(clusterState); case "TERMINATED_WITH_ERRORS": if (createOnly) { // TODO: log more information about the errors // TODO: inspect state change reason to figure out whether it was the boot that failed or e.g. steps submitted by another agent throw new TaskExecutionException("EMR boot failed: " + cluster.id()); } return Optional.of(clusterState); case "TERMINATING": if (createOnly) { // Keep waiting for the final state // TODO: inspect state change reason and bail early here return Optional.absent(); } return Optional.of(clusterState); case "TERMINATED": return Optional.of(clusterState); default: throw new RuntimeException("Unknown EMR cluster state: " + clusterState); } }
/** * This method uses method the AWS Java to launch an Apache HBase cluster on Amazon EMR. * * @param client - AmazonElasticMapReduce client that interfaces directly with the Amazon EMR Web Service * @param clusterIdentifier - identifier of an existing cluster * @param amiVersion - AMI to use for launching this cluster * @param keypair - A keypair for SSHing into the Amazon EMR master node * @param masterInstanceType - Master node Amazon EC2 instance type * @param coreInstanceType - core nodes Amazon EC2 instance type * @param logUri - An Amazon S3 bucket for your * @param numberOfNodes - total number of nodes in this cluster including master node * @return */ public static String createCluster(AmazonElasticMapReduce client, String clusterIdentifier, String amiVersion, String keypair, String masterInstanceType, String coreInstanceType, String logUri, int numberOfNodes) { if (clusterExists(client, clusterIdentifier)) { LOG.info("Cluster " + clusterIdentifier + " is available"); return clusterIdentifier; } //Error checking if (amiVersion == null || amiVersion.isEmpty()) throw new RuntimeException("ERROR: Please specify an AMI Version"); if (keypair == null || keypair.isEmpty()) throw new RuntimeException("ERROR: Please specify a valid Amazon Key Pair"); if (masterInstanceType == null || masterInstanceType.isEmpty()) throw new RuntimeException("ERROR: Please specify a Master Instance Type"); if (logUri == null || logUri.isEmpty()) throw new RuntimeException("ERROR: Please specify a valid Amazon S3 bucket for your logs."); if (numberOfNodes < 0) throw new RuntimeException("ERROR: Please specify at least 1 node"); RunJobFlowRequest request = new RunJobFlowRequest() .withAmiVersion(amiVersion) .withBootstrapActions(new BootstrapActionConfig() .withName("Install HBase") .withScriptBootstrapAction(new ScriptBootstrapActionConfig() .withPath("s3://elasticmapreduce/bootstrap-actions/setup-hbase"))) .withName("Job Flow With HBAse Actions") .withSteps(new StepConfig() //enable debugging step .withName("Enable debugging") .withActionOnFailure("TERMINATE_CLUSTER") .withHadoopJarStep(new StepFactory().newEnableDebuggingStep()), //Start HBase step - after installing it with a bootstrap action createStepConfig("Start HBase","TERMINATE_CLUSTER", "/home/hadoop/lib/hbase.jar", getHBaseArgs()), //add HBase backup step createStepConfig("Modify backup schedule","TERMINATE_JOB_FLOW", "/home/hadoop/lib/hbase.jar", getHBaseBackupArgs())) .withLogUri(logUri) .withInstances(new JobFlowInstancesConfig() .withEc2KeyName(keypair) .withInstanceCount(numberOfNodes) .withKeepJobFlowAliveWhenNoSteps(true) .withMasterInstanceType(masterInstanceType) .withSlaveInstanceType(coreInstanceType)); RunJobFlowResult result = client.runJobFlow(request); String state = null; while (!(state = clusterState(client, result.getJobFlowId())).equalsIgnoreCase("waiting")) { try { Thread.sleep(10 * 1000); LOG.info(result.getJobFlowId() + " is " + state + ". Waiting for cluster to become available."); } catch (InterruptedException e) { } if (state.equalsIgnoreCase("TERMINATED_WITH_ERRORS")){ LOG.error("Could not create EMR Cluster"); System.exit(-1); } } LOG.info("Created cluster " + result.getJobFlowId()); LOG.info("Cluster " + clusterIdentifier + " is available"); return result.getJobFlowId(); }
private void submitJob(AmazonElasticMapReduce emr, String mainClass, List<String> args, Map<String, String> sparkConfs, File uberJar) throws Exception { AmazonS3URI s3Jar = new AmazonS3URI(sparkS3JarFolder + "/" + uberJar.getName()); log.info(String.format("Placing uberJar %s to %s", uberJar.getPath(), s3Jar.toString())); PutObjectRequest putRequest = sparkS3PutObjectDecorator.call( new PutObjectRequest(s3Jar.getBucket(), s3Jar.getKey(), uberJar) ); sparkS3ClientBuilder.build().putObject(putRequest); // The order of these matters List<String> sparkSubmitArgs = Arrays.asList( "spark-submit", "--deploy-mode", "cluster", "--class", mainClass ); for (Map.Entry<String, String> e : sparkConfs.entrySet()) { sparkSubmitArgs.add(String.format("--conf %s = %s ", e.getKey(), e.getValue())); } sparkSubmitArgs.add(s3Jar.toString()); sparkSubmitArgs.addAll(args); StepConfig step = new StepConfig() .withActionOnFailure(ActionOnFailure.CONTINUE) .withName("Spark step") .withHadoopJarStep( new HadoopJarStepConfig() .withJar("command-runner.jar") .withArgs(sparkSubmitArgs) ); Optional<ClusterSummary> optCsr = findClusterWithName(emr, sparkClusterName); if (optCsr.isPresent()) { ClusterSummary csr = optCsr.get(); emr.addJobFlowSteps( new AddJobFlowStepsRequest() .withJobFlowId(csr.getId()) .withSteps(step)); log.info( String.format("Your job is added to the cluster with id %s.", csr.getId()) ); } else { // If the cluster wasn't started, it's assumed ot be throwaway List<StepConfig> steps = sparkRunJobFlowRequest.getSteps(); steps.add(step); RunJobFlowRequest jobFlowRequest = sparkRunJobFlowRequest .withSteps(steps) .withInstances(sparkJobFlowInstancesConfig.withKeepJobFlowAliveWhenNoSteps(false)); RunJobFlowResult res = emr.runJobFlow(jobFlowRequest); log.info("Your new cluster's id is %s.", res.getJobFlowId()); } }
/** * Submit a Spark Job with a specified main class */ public void sparkSubmitJobWithMain(String[] args, String mainClass, File uberJar) throws Exception { AmazonElasticMapReduce emr = sparkEmrClientBuilder.build(); submitJob(emr, mainClass, Arrays.asList(args), sparkSubmitConfs, uberJar); }
/** * Helper method to determine the Amazon EMR cluster state * * @param client * The {@link AmazonElasticMapReduceClient} with read permissions * @param clusterIdentifier * The Amazon EMR cluster to get the state of - e.g. j-2A98VJHDSU48M * @return The String representation of the Amazon EMR cluster state */ public static String clusterState(AmazonElasticMapReduce client, String clusterIdentifier) { DescribeClusterRequest describeClusterRequest = new DescribeClusterRequest().withClusterId(clusterIdentifier); DescribeClusterResult result= client.describeCluster(describeClusterRequest); if (result != null) { return result.getCluster().getStatus().getState(); } return null; }