@Override public void setup(OutputMutator output) throws ExecutionSetupException { final SequenceFileAsBinaryInputFormat inputFormat = new SequenceFileAsBinaryInputFormat(); final JobConf jobConf = new JobConf(dfs.getConf()); jobConf.setInputFormat(inputFormat.getClass()); reader = getRecordReader(inputFormat, jobConf); final Field keyField = new Field(keySchema, true, getArrowTypeForMajorType(KEY_TYPE), null); final Field valueField = new Field(valueSchema, true, getArrowTypeForMajorType(VALUE_TYPE), null); try { keyVector = output.addField(keyField, NullableVarBinaryVector.class); valueVector = output.addField(valueField, NullableVarBinaryVector.class); } catch (SchemaChangeException sce) { throw new ExecutionSetupException("Error in setting up sequencefile reader.", sce); } }
@Override public void setup(OperatorContext context, OutputMutator output) throws ExecutionSetupException { final SequenceFileAsBinaryInputFormat inputFormat = new SequenceFileAsBinaryInputFormat(); final JobConf jobConf = new JobConf(dfs.getConf()); jobConf.setInputFormat(inputFormat.getClass()); reader = getRecordReader(inputFormat, jobConf); final MaterializedField keyField = MaterializedField.create(keySchema, KEY_TYPE); final MaterializedField valueField = MaterializedField.create(valueSchema, VALUE_TYPE); try { keyVector = output.addField(keyField, NullableVarBinaryVector.class); valueVector = output.addField(valueField, NullableVarBinaryVector.class); } catch (SchemaChangeException sce) { throw new ExecutionSetupException("Error in setting up sequencefile reader.", sce); } }
@Override public int run(String[] args) throws Exception { // TODO Auto-generated method stub JobConf conf = new JobConf(GeoLintHadoopSeqFile.class); // String to use for name and output folder in HDFS String name = "GeoLintHadoop_"+System.currentTimeMillis(); // set a timeout to 30 mins as we may transfer and checksum ~4gb files conf.set("mapred.task.timeout", Integer.toString(30*60*1000)); FileInputFormat.setInputPaths(conf, new Path(args[0])); FileOutputFormat.setOutputPath(conf, new Path(name)); conf.setJobName(name); //set the mapper to this class' mapper conf.setMapperClass(GeoLintMap.class); //conf.setReducerClass(GeoLintReduce.class); conf.setInputFormat(SequenceFileAsBinaryInputFormat.class); //sets how the output is written cf. OutputFormat conf.setOutputFormat(TextOutputFormat.class); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(Text.class); //we only want one reduce task conf.setNumReduceTasks(28); JobClient.runJob(conf); return 0; }