private void addCustomBootstrapActionConfig(EmrClusterDefinition emrClusterDefinition, ArrayList<BootstrapActionConfig> bootstrapActions) { // Add Custom bootstrap script support if needed if (!CollectionUtils.isEmpty(emrClusterDefinition.getCustomBootstrapActionAll())) { for (ScriptDefinition scriptDefinition : emrClusterDefinition.getCustomBootstrapActionAll()) { BootstrapActionConfig customActionConfigAll = getBootstrapActionConfig(scriptDefinition.getScriptName(), scriptDefinition.getScriptLocation()); ArrayList<String> argList = new ArrayList<>(); if (!CollectionUtils.isEmpty(scriptDefinition.getScriptArguments())) { for (String argument : scriptDefinition.getScriptArguments()) { // Trim the argument argList.add(argument.trim()); } } // Set arguments to bootstrap action customActionConfigAll.getScriptBootstrapAction().setArgs(argList); bootstrapActions.add(customActionConfigAll); } } }
private void addDaemonBootstrapActionConfig(EmrClusterDefinition emrClusterDefinition, ArrayList<BootstrapActionConfig> bootstrapActions) { // Add daemon Configuration support if needed if (!CollectionUtils.isEmpty(emrClusterDefinition.getDaemonConfigurations())) { BootstrapActionConfig daemonBootstrapActionConfig = getBootstrapActionConfig(ConfigurationValue.EMR_CONFIGURE_DAEMON.getKey(), configurationHelper.getProperty(ConfigurationValue.EMR_CONFIGURE_DAEMON)); // Add arguments to the bootstrap script ArrayList<String> argList = new ArrayList<>(); for (Parameter daemonConfig : emrClusterDefinition.getDaemonConfigurations()) { argList.add(daemonConfig.getName() + "=" + daemonConfig.getValue()); } // Add the bootstrap action with arguments daemonBootstrapActionConfig.getScriptBootstrapAction().setArgs(argList); bootstrapActions.add(daemonBootstrapActionConfig); } }
/** * Create the bootstrap action configuration List from all the bootstrapping scripts specified. * * @param emrClusterDefinition the EMR definition name value. * * @return list of bootstrap action configurations that contains all the bootstrap actions for the given configuration. */ private ArrayList<BootstrapActionConfig> getBootstrapActionConfigList(EmrClusterDefinition emrClusterDefinition) { // Create the list ArrayList<BootstrapActionConfig> bootstrapActions = new ArrayList<>(); // Add encryption script support if needed if (emrClusterDefinition.isEncryptionEnabled() != null && emrClusterDefinition.isEncryptionEnabled()) { bootstrapActions.add(getBootstrapActionConfig(ConfigurationValue.EMR_ENCRYPTION_SCRIPT.getKey(), getEncryptionScriptLocation())); } // Add bootstrap actions. addDaemonBootstrapActionConfig(emrClusterDefinition, bootstrapActions); addHadoopBootstrapActionConfig(emrClusterDefinition, bootstrapActions); addCustomBootstrapActionConfig(emrClusterDefinition, bootstrapActions); addCustomMasterBootstrapActionConfig(emrClusterDefinition, bootstrapActions); // Return the object return bootstrapActions; }
private void addCustomMasterBootstrapActionConfig(EmrClusterDefinition emrClusterDefinition, ArrayList<BootstrapActionConfig> bootstrapActions) { // Add Master custom bootstrap script support if needed if (!CollectionUtils.isEmpty(emrClusterDefinition.getCustomBootstrapActionMaster())) { for (ScriptDefinition scriptDefinition : emrClusterDefinition.getCustomBootstrapActionMaster()) { BootstrapActionConfig bootstrapActionConfig = getBootstrapActionConfig(scriptDefinition.getScriptName(), configurationHelper.getProperty(ConfigurationValue.EMR_CONDITIONAL_SCRIPT)); // Add arguments to the bootstrap script ArrayList<String> argList = new ArrayList<>(); // Execute this script only on the master node. argList.add(configurationHelper.getProperty(ConfigurationValue.EMR_NODE_CONDITION)); argList.add(scriptDefinition.getScriptLocation()); if (!CollectionUtils.isEmpty(scriptDefinition.getScriptArguments())) { for (String argument : scriptDefinition.getScriptArguments()) { // Trim the argument argList.add(argument.trim()); } } bootstrapActionConfig.getScriptBootstrapAction().setArgs(argList); bootstrapActions.add(bootstrapActionConfig); } } }
private void addHadoopBootstrapActionConfig(EmrClusterDefinition emrClusterDefinition, ArrayList<BootstrapActionConfig> bootstrapActions) { // Add hadoop Configuration support if needed if (!CollectionUtils.isEmpty(emrClusterDefinition.getHadoopConfigurations())) { ArrayList<String> argList = new ArrayList<>(); BootstrapActionConfig hadoopBootstrapActionConfig = getBootstrapActionConfig(ConfigurationValue.EMR_CONFIGURE_HADOOP.getKey(), configurationHelper.getProperty(ConfigurationValue.EMR_CONFIGURE_HADOOP)); // If config files are available, add them as arguments for (Object hadoopConfigObject : emrClusterDefinition.getHadoopConfigurations()) { // If the Config Files are available, add them as arguments if (hadoopConfigObject instanceof ConfigurationFiles) { for (ConfigurationFile configurationFile : ((ConfigurationFiles) hadoopConfigObject).getConfigurationFiles()) { argList.add(configurationFile.getFileNameShortcut()); argList.add(configurationFile.getConfigFileLocation()); } } // If the key value pairs are available, add them as arguments if (hadoopConfigObject instanceof KeyValuePairConfigurations) { for (KeyValuePairConfiguration keyValuePairConfiguration : ((KeyValuePairConfigurations) hadoopConfigObject) .getKeyValuePairConfigurations()) { argList.add(keyValuePairConfiguration.getKeyValueShortcut()); argList.add(keyValuePairConfiguration.getAttribKey() + "=" + keyValuePairConfiguration.getAttribVal()); } } } // Add the bootstrap action with arguments hadoopBootstrapActionConfig.getScriptBootstrapAction().setArgs(argList); bootstrapActions.add(hadoopBootstrapActionConfig); } }
/** * Create the BootstrapActionConfig object from the bootstrap script. * * @param scriptDescription bootstrap script name to be displayed. * @param bootstrapScript location of the bootstrap script. * * @return bootstrap action configuration that contains all the bootstrap actions for the given configuration. */ private BootstrapActionConfig getBootstrapActionConfig(String scriptDescription, String bootstrapScript) { // Create the BootstrapActionConfig object BootstrapActionConfig bootstrapConfig = new BootstrapActionConfig(); ScriptBootstrapActionConfig bootstrapConfigScript = new ScriptBootstrapActionConfig(); // Set the bootstrapScript bootstrapConfig.setName(scriptDescription); bootstrapConfigScript.setPath(bootstrapScript); bootstrapConfig.setScriptBootstrapAction(bootstrapConfigScript); // Return the object return bootstrapConfig; }
private BootstrapActionConfig bootstrapAction(int index, JsonNode action, String tag, Filer filer, RemoteFile runner, ParameterCompiler parameterCompiler) throws IOException { String script; String name; FileReference reference; Config config; if (action.isTextual()) { script = action.asText(); reference = fileReference("bootstrap", script); name = reference.filename(); config = request.getConfig().getFactory().create(); } else if (action.isObject()) { config = request.getConfig().getFactory().create(action); script = config.get("path", String.class); reference = fileReference("bootstrap", script); name = config.get("name", String.class, reference.filename()); } else { throw new ConfigException("Invalid bootstrap action: " + action); } RemoteFile file = filer.prepareRemoteFile(tag, "bootstrap", Integer.toString(index), reference, false); CommandRunnerConfiguration configuration = CommandRunnerConfiguration.builder() .workingDirectory(bootstrapWorkingDirectory(index)) .env(parameterCompiler.parameters(config.getNestedOrGetEmpty("env"), (key, value) -> value)) .addDownload(DownloadConfig.of(file, 0777)) .addAllDownload(config.getListOrEmpty("files", String.class).stream() .map(r -> fileReference("file", r)) .map(r -> filer.prepareRemoteFile(tag, "bootstrap", Integer.toString(index), r, false, bootstrapWorkingDirectory(index))) .collect(toList())) .addCommand(file.localPath()) .addAllCommand(parameterCompiler.parameters(config, "args")) .build(); FileReference configurationFileReference = ImmutableFileReference.builder() .type(FileReference.Type.DIRECT) .contents(objectMapper.writeValueAsBytes(configuration)) .filename("config.json") .build(); RemoteFile remoteConfigurationFile = filer.prepareRemoteFile(tag, "bootstrap", Integer.toString(index), configurationFileReference, false); return new BootstrapActionConfig() .withName(name) .withScriptBootstrapAction(new ScriptBootstrapActionConfig() .withPath(runner.s3Uri().toString()) .withArgs(remoteConfigurationFile.s3Uri().toString())); }
/** * This method uses method the AWS Java to launch an Apache HBase cluster on Amazon EMR. * * @param client - AmazonElasticMapReduce client that interfaces directly with the Amazon EMR Web Service * @param clusterIdentifier - identifier of an existing cluster * @param amiVersion - AMI to use for launching this cluster * @param keypair - A keypair for SSHing into the Amazon EMR master node * @param masterInstanceType - Master node Amazon EC2 instance type * @param coreInstanceType - core nodes Amazon EC2 instance type * @param logUri - An Amazon S3 bucket for your * @param numberOfNodes - total number of nodes in this cluster including master node * @return */ public static String createCluster(AmazonElasticMapReduce client, String clusterIdentifier, String amiVersion, String keypair, String masterInstanceType, String coreInstanceType, String logUri, int numberOfNodes) { if (clusterExists(client, clusterIdentifier)) { LOG.info("Cluster " + clusterIdentifier + " is available"); return clusterIdentifier; } //Error checking if (amiVersion == null || amiVersion.isEmpty()) throw new RuntimeException("ERROR: Please specify an AMI Version"); if (keypair == null || keypair.isEmpty()) throw new RuntimeException("ERROR: Please specify a valid Amazon Key Pair"); if (masterInstanceType == null || masterInstanceType.isEmpty()) throw new RuntimeException("ERROR: Please specify a Master Instance Type"); if (logUri == null || logUri.isEmpty()) throw new RuntimeException("ERROR: Please specify a valid Amazon S3 bucket for your logs."); if (numberOfNodes < 0) throw new RuntimeException("ERROR: Please specify at least 1 node"); RunJobFlowRequest request = new RunJobFlowRequest() .withAmiVersion(amiVersion) .withBootstrapActions(new BootstrapActionConfig() .withName("Install HBase") .withScriptBootstrapAction(new ScriptBootstrapActionConfig() .withPath("s3://elasticmapreduce/bootstrap-actions/setup-hbase"))) .withName("Job Flow With HBAse Actions") .withSteps(new StepConfig() //enable debugging step .withName("Enable debugging") .withActionOnFailure("TERMINATE_CLUSTER") .withHadoopJarStep(new StepFactory().newEnableDebuggingStep()), //Start HBase step - after installing it with a bootstrap action createStepConfig("Start HBase","TERMINATE_CLUSTER", "/home/hadoop/lib/hbase.jar", getHBaseArgs()), //add HBase backup step createStepConfig("Modify backup schedule","TERMINATE_JOB_FLOW", "/home/hadoop/lib/hbase.jar", getHBaseBackupArgs())) .withLogUri(logUri) .withInstances(new JobFlowInstancesConfig() .withEc2KeyName(keypair) .withInstanceCount(numberOfNodes) .withKeepJobFlowAliveWhenNoSteps(true) .withMasterInstanceType(masterInstanceType) .withSlaveInstanceType(coreInstanceType)); RunJobFlowResult result = client.runJobFlow(request); String state = null; while (!(state = clusterState(client, result.getJobFlowId())).equalsIgnoreCase("waiting")) { try { Thread.sleep(10 * 1000); LOG.info(result.getJobFlowId() + " is " + state + ". Waiting for cluster to become available."); } catch (InterruptedException e) { } if (state.equalsIgnoreCase("TERMINATED_WITH_ERRORS")){ LOG.error("Could not create EMR Cluster"); System.exit(-1); } } LOG.info("Created cluster " + result.getJobFlowId()); LOG.info("Cluster " + clusterIdentifier + " is available"); return result.getJobFlowId(); }
void init() { requireNonNull(this.AWSAccessKey); requireNonNull(this.AWSAccessKey); requireNonNull(this.jarLocation); requireNonNull(this.jarArguments); requireNonNull(this.slavesInstanceType); requireNonNull(this.hadoopVersion); requireNonNull(this.jobFlowName); if (this.nInstances < 1) { throw new IllegalArgumentException( "the number of instance is lower than 1"); } if (this.masterInstanceType == null) { this.masterInstanceType = this.slavesInstanceType; } // Set the hadoop jar step final HadoopJarStepConfig hadoopJarStep = new HadoopJarStepConfig() .withJar(this.jarLocation.trim()).withArgs(this.jarArguments); // Set step config final StepConfig stepConfig = new StepConfig() .withName(this.jobFlowName + "-step").withHadoopJarStep(hadoopJarStep) .withActionOnFailure("TERMINATE_JOB_FLOW"); // Set the instance final JobFlowInstancesConfig instances = new JobFlowInstancesConfig().withInstanceCount(this.nInstances) .withMasterInstanceType(this.masterInstanceType) .withSlaveInstanceType(this.slavesInstanceType) .withHadoopVersion(this.hadoopVersion); // Configure hadoop final ScriptBootstrapActionConfig scriptBootstrapAction = new ScriptBootstrapActionConfig() .withPath( "s3n://eu-west-1.elasticmapreduce/bootstrap-actions/configure-hadoop") .withArgs("--site-key-value", "mapreduce.tasktracker.map.tasks.maximum=" + this.taskTrackerMaxMapTasks); final BootstrapActionConfig bootstrapActions = new BootstrapActionConfig().withName("Configure hadoop") .withScriptBootstrapAction(scriptBootstrapAction); // Enable debugging StepFactory stepFactory = new StepFactory(); StepConfig enableDebugging = new StepConfig().withName("Enable Debugging") .withActionOnFailure("TERMINATE_JOB_FLOW") .withHadoopJarStep(stepFactory.newEnableDebuggingStep()); // Run flow this.runFlowRequest = new RunJobFlowRequest().withName(this.jobFlowName); // Enable or not debugging if (this.enableDebugging) { this.runFlowRequest.withInstances(instances).withSteps(enableDebugging, stepConfig); } else { this.runFlowRequest.withInstances(instances).withSteps(stepConfig); } // Limit the number of task in a task tracker if (this.taskTrackerMaxMapTasks > 0) { this.runFlowRequest.withBootstrapActions(bootstrapActions); } if (this.logPathname != null && !"".equals(this.logPathname)) { this.runFlowRequest.withLogUri(this.logPathname); } // Set EC2 Key name if (this.ec2KeyName != null) { this.runFlowRequest.getInstances().setEc2KeyName(this.ec2KeyName); } }