Java 类com.amazonaws.services.elasticmapreduce.model.DescribeStepRequest 实例源码

项目:aws-big-data-blog    文件:LambdaContainer.java   
public void monitorEMRStep() throws Exception {
    List<String> stepIds = new ArrayList<String>();
    Connection conn  = new com.mysql.jdbc.Driver().connect(props.getProperty("url"), props);
    ResultSet openStepsRS = conn.createStatement().executeQuery(props.getProperty("sql.retrieveOpenSteps"));
    AmazonElasticMapReduceClient emr = new AmazonElasticMapReduceClient();
    DescribeStepRequest stepReq=new  DescribeStepRequest();
    PreparedStatement ps = conn.prepareStatement(props.getProperty("sql.updateStepStatus"));
    while(openStepsRS.next()){

        stepReq.setClusterId(openStepsRS.getString("cluster_id"));
        stepReq.setStepId(openStepsRS.getString("step_id"));
        String stepState = emr.describeStep(stepReq).getStep().getStatus().getState();

            if(stepState.equals(StepState.COMPLETED.toString())){
                ps.setString(1,StepState.COMPLETED.toString());
            }else if (stepState.equals(StepState.FAILED.toString())){
                ps.setString(1,StepState.FAILED.toString());                    
            }
            ps.setString(2,openStepsRS.getString("job_config_id"));
            ps.addBatch();
    }

    ps.executeBatch();
    ps.close();
    conn.close();
}
项目:herd    文件:EmrDaoImpl.java   
@Override
public Step getClusterStep(String clusterId, String stepId, AwsParamsDto awsParamsDto)
{
    DescribeStepRequest describeStepRequest = new DescribeStepRequest().withClusterId(clusterId).withStepId(stepId);
    return emrOperations.describeStepRequest(getEmrClient(awsParamsDto), describeStepRequest).getStep();
}