这次有人应该请小我努力使用分布式cahe运行我的代码。我已经在hdfs上保存了文件,但是当我运行以下代码时:
import java.awt.image.BufferedImage; import java.awt.image.DataBufferByte; import java.awt.image.Raster; import java.io.BufferedReader; import java.io.ByteArrayInputStream; import java.io.DataInputStream; import java.io.FileNotFoundException; import java.io.FileReader; import java.io.IOException; import java.io.InputStreamReader; import java.net.URISyntaxException; import java.util.logging.Level; import java.util.logging.Logger; import javax.imageio.ImageIO; import org.apache.hadoop.filecache.*; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.*; import org.apache.hadoop.mapred.*; import java.lang.String; import java.lang.Runtime; import java.net.URI; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; public class blur2 { public static class BlurMapper extends MapReduceBase implements Mapper<Text, BytesWritable, LongWritable, BytesWritable> { OutputCollector<LongWritable, BytesWritable> goutput; int IMAGE_HEIGHT = 240; int IMAGE_WIDTH = 320; public BytesWritable Gmiu; public BytesWritable Gsigma; public BytesWritable w; byte[] bytes = new byte[IMAGE_HEIGHT*IMAGE_WIDTH*3]; public BytesWritable emit = new BytesWritable(bytes); int count = 0; int initVar = 125; public LongWritable l = new LongWritable(1); byte[] byte1 = new byte[IMAGE_HEIGHT*IMAGE_WIDTH]; byte[] byte2 = new byte[IMAGE_HEIGHT*IMAGE_WIDTH]; byte[] byte3 = new byte[IMAGE_HEIGHT*IMAGE_WIDTH]; public void map(Text key, BytesWritable file,OutputCollector<LongWritable, BytesWritable> output, Reporter reporter) throws IOException { goutput = output; BufferedImage img = ImageIO.read(new ByteArrayInputStream(file.getBytes())); Raster ras=img.getData(); DataBufferByte db= (DataBufferByte)ras.getDataBuffer(); byte[] data = db.getData(); if(count==0){ for(int i=0;i<IMAGE_HEIGHT*IMAGE_WIDTH;i++) { byte1[i]=20; byte2[i]=125; } Gmiu = new BytesWritable(data); Gsigma = new BytesWritable(byte1); w = new BytesWritable(byte2); count++; } else{ byte1 = Gmiu.getBytes(); byte2 = Gsigma.getBytes(); byte3 = w.getBytes(); for(int i=0;i<IMAGE_HEIGHT*IMAGE_WIDTH;i++) { byte pixel = data[i]; Double tempmiu=new Double(0.0); Double tempsig=new Double(0.0); double temp1=0.0; double alpha = 0.05; tempmiu = (1-alpha)*byte1[i] + alpha*pixel; temp1=temp1+(pixel-byte1[i])*(pixel-byte1[i]); tempsig=(1-alpha)*byte2[i]+ alpha*temp1; byte1[i] = tempmiu.byteValue(); byte2[i]= tempsig.byteValue(); Double w1=new Double((1-alpha)*byte3[i]+alpha*100); byte3[i] = w1.byteValue(); } Gmiu.set(byte1,0,IMAGE_HEIGHT*IMAGE_WIDTH); Gsigma.set(byte2,0,IMAGE_HEIGHT*IMAGE_WIDTH); w.set(byte3,0,IMAGE_HEIGHT*IMAGE_WIDTH); } byte1 = Gsigma.getBytes(); for(int i=0;i<IMAGE_HEIGHT*IMAGE_WIDTH;i++) { bytes[i]=byte1[i]; } byte1 = Gsigma.getBytes(); for(int i=0;i<IMAGE_HEIGHT*IMAGE_WIDTH;i++) { bytes[IMAGE_HEIGHT*IMAGE_WIDTH+i]=byte1[i]; } byte1 = w.getBytes(); for(int i=0;i<IMAGE_HEIGHT*IMAGE_WIDTH;i++) { bytes[2*IMAGE_HEIGHT*IMAGE_WIDTH+i]=byte1[i]; } emit.set(bytes,0,3*IMAGE_HEIGHT*IMAGE_WIDTH); } @Override public void close(){ try{ goutput.collect(l, emit); } catch(Exception e){ e.printStackTrace(); System.exit(-1); } } } //end of first job , this is running perfectly public static void main(String[] args) throws URISyntaxException { if(args.length!=3) { System.err.println("Usage: blurvideo input output"); System.exit(-1); } JobClient client = new JobClient(); JobConf conf = new JobConf(blur2.class); conf.setOutputValueClass(BytesWritable.class); conf.setInputFormat(SequenceFileInputFormat.class); //conf.setNumMapTasks(n) SequenceFileInputFormat.addInputPath(conf, new Path(args[0])); TextOutputFormat.setOutputPath(conf, new Path(args[1])); conf.setMapperClass(BlurMapper.class); conf.setNumReduceTasks(0); //conf.setReducerClass(org.apache.hadoop.mapred.lib.IdentityReducer.class); client.setConf(conf); try { JobClient.runJob(conf); } catch (Exception e) { e.printStackTrace(); } // exec("jar cf /home/hmobile/hadoop-0.19.2/imag /home/hmobile/hadoop-0.19.2/output"); JobClient client2 = new JobClient(); JobConf conf2 = new JobConf(blur2.class); conf2.setOutputValueClass(BytesWritable.class); conf2.setInputFormat(SequenceFileInputFormat.class); //conf.setNumMapTasks(n) SequenceFileInputFormat.addInputPath(conf2, new Path(args[0])); SequenceFileOutputFormat.setOutputPath(conf2, new Path(args[2])); conf2.setMapperClass(BlurMapper2.class); conf2.setNumReduceTasks(0); DistributedCache.addCacheFile(new URI("~/ayush/output/part-00000"), conf2);// these files are already on the hdfs DistributedCache.addCacheFile(new URI("~/ayush/output/part-00001"), conf2); client2.setConf(conf2); try { JobClient.runJob(conf2); } catch (Exception e) { e.printStackTrace(); } } public static class BlurMapper2 extends MapReduceBase implements Mapper<Text, BytesWritable, LongWritable, BytesWritable> { int IMAGE_HEIGHT = 240; int T =60; int IMAGE_WIDTH = 320; public BytesWritable Gmiu; public BytesWritable Gsigma; public BytesWritable w; byte[] bytes = new byte[IMAGE_HEIGHT*IMAGE_WIDTH]; public BytesWritable emit = new BytesWritable(bytes); int initVar = 125;int gg=0; int K=64;int k=0,k1=0,k2=0; public LongWritable l = new LongWritable(1); byte[] Gmiu1 = new byte[IMAGE_HEIGHT*IMAGE_WIDTH*K]; byte[] Gsigma1 = new byte[IMAGE_HEIGHT*IMAGE_WIDTH*K]; byte[] w1 = new byte[IMAGE_HEIGHT*IMAGE_WIDTH*K]; public Path[] localFiles=new Path[2]; private FileSystem fs; @Override public void configure(JobConf conf2) { try { fs = FileSystem.getLocal(new Configuration()); localFiles = DistributedCache.getLocalCacheFiles(conf2); //System.out.println(localFiles[0].getName()); } catch (IOException ex) { Logger.getLogger(blur2.class.getName()).log(Level.SEVERE, null, ex); } } public void map(Text key, BytesWritable file,OutputCollector<LongWritable, BytesWritable> output, Reporter reporter) throws IOException { if(gg==0){ //System.out.println(localFiles[0].getName()); String wrd; String line; for(Path f:localFiles) { if(!f.getName().endsWith("crc")) { // FSDataInputStream localFile = fs.open(f); BufferedReader br = null; try { br = new BufferedReader(new InputStreamReader(fs.open(f))); int c = 0; try { while ((line = br.readLine()) != null) { StringTokenizer itr = new StringTokenizer(line, " "); while (itr.hasMoreTokens()) { wrd = itr.nextToken(); c++; int i = Integer.parseInt(wrd, 16); Integer I = new Integer(i); byte b = I.byteValue(); if (c < IMAGE_HEIGHT * IMAGE_WIDTH) { Gmiu1[k] = b;k++; } else { if ((c >= IMAGE_HEIGHT * IMAGE_WIDTH) && (c < 2 * IMAGE_HEIGHT * IMAGE_WIDTH)) { Gsigma1[k] = b;k1++; } else { w1[k] = b;k2++; } } } } } catch (IOException ex) { Logger.getLogger(blur2.class.getName()).log(Level.SEVERE, null, ex); } } catch (FileNotFoundException ex) { Logger.getLogger(blur2.class.getName()).log(Level.SEVERE, null, ex); } finally { try { br.close(); } catch (IOException ex) { Logger.getLogger(blur2.class.getName()).log(Level.SEVERE, null, ex); } } } } gg++; } } } }
解决了很多问题,任何人都可以告诉我为什么我收到此错误:
java.io.FileNotFoundException: File does not exist: ~/ayush/output/part-00000 at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:394) at org.apache.hadoop.filecache.DistributedCache.getTimestamp(DistributedCache.java:475) at org.apache.hadoop.mapred.JobClient.configureCommandLineOptions(JobClient.java:676) at org.apache.hadoop.mapred.JobClient.submitJob(JobClient.java:774) at org.apache.hadoop.mapred.JobClient.runJob(JobClient.java:1127) at blur2.main(blur2.java:175) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at org.apache.hadoop.util.RunJar.main(RunJar.java:165) at org.apache.hadoop.mapred.JobShell.run(JobShell.java:54) at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:65) at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:79) at org.apache.hadoop.mapred.JobShell.main(JobShell.java:68)
问题在于您使用的文件名“〜/ ayush / output / part-00000”依赖于Unix shell(sh,bash,ksh)代字扩展名,将“〜”替换为主目录的路径名。
Java(以及C,C ++和大多数其他编程语言)不进行波浪号扩展。您需要将路径名提供为“ / home / ayush / output / part-00000” …或将倾斜形式扩展到的任何绝对路径名。
严格来说,应该按以下方式创建URI:
new File("/home/ayush/output/part-00000").toURI()
不 作为
new URI("/home/ayush/output/part-00000")
后者创建一个没有“协议”的URI,这可能会出现问题。