public void monitorEMRStep() throws Exception { List<String> stepIds = new ArrayList<String>(); Connection conn = new com.mysql.jdbc.Driver().connect(props.getProperty("url"), props); ResultSet openStepsRS = conn.createStatement().executeQuery(props.getProperty("sql.retrieveOpenSteps")); AmazonElasticMapReduceClient emr = new AmazonElasticMapReduceClient(); DescribeStepRequest stepReq=new DescribeStepRequest(); PreparedStatement ps = conn.prepareStatement(props.getProperty("sql.updateStepStatus")); while(openStepsRS.next()){ stepReq.setClusterId(openStepsRS.getString("cluster_id")); stepReq.setStepId(openStepsRS.getString("step_id")); String stepState = emr.describeStep(stepReq).getStep().getStatus().getState(); if(stepState.equals(StepState.COMPLETED.toString())){ ps.setString(1,StepState.COMPLETED.toString()); }else if (stepState.equals(StepState.FAILED.toString())){ ps.setString(1,StepState.FAILED.toString()); } ps.setString(2,openStepsRS.getString("job_config_id")); ps.addBatch(); } ps.executeBatch(); ps.close(); conn.close(); }
@Override public Step getClusterStep(String clusterId, String stepId, AwsParamsDto awsParamsDto) { DescribeStepRequest describeStepRequest = new DescribeStepRequest().withClusterId(clusterId).withStepId(stepId); return emrOperations.describeStepRequest(getEmrClient(awsParamsDto), describeStepRequest).getStep(); }