我正在Hadoop上实现PageRank算法,正如标题所述,我在尝试执行代码时想到了以下错误:
地图中密钥的类型不匹配:预期的org.apache.hadoop.io.Text,收到的org.apache.hadoop.io.LongWritable
在我的输入文件中,我将图形节点ID存储为键,并将一些关于它们的信息存储为值。我的输入文件具有以下格式:
1 \ t 3.4,2,5,6,67
4 \ t 4.2,77,2,7,83
……
试图理解错误说明了什么,我尝试将LongWritable用作主要变量类型,如下面的代码所示。这意味着我有:
map
减少
但是,我也尝试过:
map <文本,文本,文本,文本>
reduce <文本,文本,文本,文本>
并且:
reduce
而且我总是想出同样的错误。我想我很难理解错误中预期和收到的含义。这是否意味着我的地图函数从我的输入文件中期望LongWritable并获得了Text?我使用的输入文件的格式或变量类型是否存在问题?
这是完整的代码,您能否告诉我要更改的内容以及在哪里进行更改?:
import java.io.IOException; import java.util.*; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.lang.Object.*; import org.apache.commons.cli.ParseException; import org.apache.commons.lang.StringUtils; import org.apache.commons.configuration.Configuration; import org.apache.hadoop.security.Credentials; import org.apache.log4j.*; import org.apache.commons.logging.*; import org.apache.hadoop.mapred.*; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class Pagerank { public static class PRMap extends Mapper<LongWritable, LongWritable, LongWritable, LongWritable> { public void map(LongWritable lineNum, LongWritable line, OutputCollector<LongWritable, LongWritable> outputCollector, Reporter reporter) throws IOException, InterruptedException { if (line.toString().length() == 0) { return; } Text key = new Text(); Text value = new Text(); LongWritable valuel = new LongWritable(); StringTokenizer spline = new StringTokenizer(line.toString(),"\t"); key.set(spline.nextToken()); value.set(spline.nextToken()); valuel.set(Long.parseLong(value.toString())); outputCollector.collect(lineNum,valuel); String info = value.toString(); String splitter[] = info.split(","); if(splitter.length >= 3) { float f = Float.parseFloat(splitter[0]); float pagerank = f / (splitter.length - 2); for(int i=2;i<splitter.length;i++) { LongWritable key2 = new LongWritable(); LongWritable value2 = new LongWritable(); long l; l = Long.parseLong(splitter[i]); key2.set(l); //key2.set(splitter[i]); value2.set((long)f); outputCollector.collect(key2, value2); } } } } public static class PRReduce extends Reducer<LongWritable,LongWritable,LongWritable,LongWritable> { private Text result = new Text(); public void reduce(LongWritable key, Iterator<LongWritable> values,OutputCollector<LongWritable, LongWritable> results, Reporter reporter) throws IOException, InterruptedException { float pagerank = 0; String allinone = ","; while(values.hasNext()) { LongWritable temp = values.next(); String converted = temp.toString(); String[] splitted = converted.split(","); if(splitted.length > 1) { for(int i=1;i<splitted.length;i++) { allinone = allinone.concat(splitted[i]); if(i != splitted.length - 1) allinone = allinone.concat(","); } } else { float f = Float.parseFloat(splitted[0]); pagerank = pagerank + f; } } String last = Float.toString(pagerank); last = last.concat(allinone); LongWritable value = new LongWritable(); value.set(Long.parseLong(last)); results.collect(key, value); } } public static void main(String[] args) throws Exception { org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: wordcount <in> <out>"); System.exit(2); } Job job = new Job(conf, "pagerank_itr0"); job.setJarByClass(Pagerank.class); job.setMapperClass(Pagerank.PRMap.class); job.setReducerClass(Pagerank.PRReduce.class); job.setOutputKeyClass(LongWritable.class); job.setOutputValueClass(LongWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); job.waitForCompletion(true); } }
您没有在作业配置中设置映射器输出类。尝试使用以下方法从Job中设置键和值类:
setMapOutputKeyClass(); setMapOutputValueClass();
setMapOutputKeyClass();
setMapOutputValueClass();