public int run(String[] args) throws Exception { Job job = Job.getInstance(getConf(), "map side join"); Configuration conf = job.getConfiguration(); job.setJarByClass(getClass()); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(TupleWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(TupleWritable.class); Class<? extends InputFormat> cls = null; job.setInputFormatClass(cls); // job.setInputFormatClass(CompositeInputFormat.class); // 导入路径设置为master和数据两种 TextInputFormat.addInputPaths(job, args[0]); TextInputFormat.addInputPaths(job, args[1]); conf.set(CompositeInputFormat.JOIN_EXPR, CompositeInputFormat.compose( "inner", KeyValueTextInputFormat.class, TextInputFormat.getInputPaths(job))); TextOutputFormat.setOutputPath(job, new Path(args[2])); job.setOutputFormatClass(TextOutputFormat.class); return job.waitForCompletion(true)?0:1; }
public void map(Text key, TupleWritable value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException { // Get the first two elements in the tuple and output them output.collect((Text) value.get(0), (Text) value.get(1)); }
@Override public void map(IntWritable index, TupleWritable v, OutputCollector<IntWritable, VectorWritable> out, Reporter reporter) throws IOException { // Logging if (m_isDebuggingEnabled) { for (int i = 0; i < v.size(); i++) { Vector vector = ((VectorWritable) v.get(i)).get(); m_logMapper.writeChars("map,input,key=" + index + ",value=" + vector.toString() + "\n"); } } Vector firstVector = ((VectorWritable) v.get(0)).get(); Vector secondVector = ((VectorWritable) v.get(1)).get(); // outCardinality is resulting column size n // (l x m) * (m x n) = (l x n) boolean firstIsOutFrag = secondVector.size() == m_outCardinality; // outFrag is Matrix which has the resulting column cardinality // (matrixB) Vector outFrag = firstIsOutFrag ? secondVector : firstVector; // multiplier is Matrix which has the resulting row count // (transposed matrixA) Vector multiplier = firstIsOutFrag ? firstVector : secondVector; if (m_isDebuggingEnabled) { m_logMapper.writeChars("map,firstIsOutFrag=" + firstIsOutFrag + "\n"); m_logMapper.writeChars("map,outFrag=" + outFrag + "\n"); m_logMapper.writeChars("map,multiplier=" + multiplier + "\n"); } for (Vector.Element e : multiplier.nonZeroes()) { VectorWritable outVector = new VectorWritable(); // Scalar Multiplication (Vector x Element) outVector.set(outFrag.times(e.get())); out.collect(new IntWritable(e.index()), outVector); if (m_isDebuggingEnabled) { m_logMapper.writeChars("map,collect,key=" + e.index() + ",value=" + outVector.get().toString() + "\n"); } } if (m_isDebuggingEnabled) { m_logMapper.flush(); } }
@Override public void map(IntWritable index, TupleWritable v, OutputCollector<IntWritable, VectorWritable> out, Reporter reporter) throws IOException { // Set OutputCollector reference, for close method this.out = out; // Logging if (m_isDebuggingEnabled) { for (int i = 0; i < v.size(); i++) { Vector vector = ((VectorWritable) v.get(i)).get(); m_logMapper.writeChars("map,input,key=" + index + ",value=" + vector.toString() + "\n"); } } Vector firstVector = ((VectorWritable) v.get(0)).get(); Vector secondVector = ((VectorWritable) v.get(1)).get(); // outCardinality is resulting column size l // (n x m) * (m x l) = (n x l) boolean firstIsOutFrag = secondVector.size() == m_outCardinality; // outFrag is Matrix which has the resulting column cardinality // (matrixB) Vector outFrag = firstIsOutFrag ? secondVector : firstVector; // multiplier is Matrix which has the resulting row count // (transposed matrixA) Vector multiplier = firstIsOutFrag ? firstVector : secondVector; if (m_isDebuggingEnabled) { m_logMapper.writeChars("map,firstIsOutFrag=" + firstIsOutFrag + "\n"); m_logMapper.writeChars("map,outFrag=" + outFrag + "\n"); m_logMapper.writeChars("map,multiplier=" + multiplier + "\n"); } m_tranposedMatrixA.add(outFrag); m_matrixB.add(multiplier); }