public MultiFileLineRecordReader(Configuration conf, MultiFileSplit split) throws IOException { this.split = split; fs = FileSystem.get(conf); this.paths = split.getPaths(); this.totLength = split.getLength(); this.offset = 0; //open the first file Path file = paths[count]; currentStream = fs.open(file); currentReader = new BufferedReader(new InputStreamReader(currentStream)); }
public WarcFileRecordReader(Configuration conf, InputSplit split) throws IOException { this.fs = FileSystem.get(conf); this.conf = conf; if (split instanceof FileSplit) { this.filePathList = new Path[1]; this.filePathList[0] = ((FileSplit) split).getPath(); } else if (split instanceof MultiFileSplit) { this.filePathList = ((MultiFileSplit) split).getPaths(); } else { throw new IOException("InputSplit is not a file split or a multi-file split - aborting"); } // get the total file sizes for (int i = 0; i < filePathList.length; i++) { totalFileSize += fs.getFileStatus(filePathList[i]).getLen(); } Class<? extends CompressionCodec> codecClass = null; try { codecClass = conf.getClassByName("org.apache.hadoop.io.compress.GzipCodec") .asSubclass(CompressionCodec.class); compressionCodec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf); } catch (ClassNotFoundException cnfEx) { compressionCodec = null; LOG.info("!!! ClassNotFound Exception thrown setting Gzip codec"); } openNextFile(); }
public WarcFileRecordReader(Configuration conf, InputSplit split) throws IOException { if (split instanceof FileSplit) { this.filePathList=new Path[1]; this.filePathList[0]=((FileSplit)split).getPath(); } else if (split instanceof MultiFileSplit) { this.filePathList=((MultiFileSplit)split).getPaths(); } else { throw new IOException("InputSplit is not a file split or a multi-file split - aborting"); } // Use FileSystem.get to open Common Crawl URIs using the S3 protocol. URI uri = filePathList[0].toUri(); this.fs = FileSystem.get(uri, conf); // get the total file sizes for (int i=0; i < filePathList.length; i++) { totalFileSize += fs.getFileStatus(filePathList[i]).getLen(); } Class<? extends CompressionCodec> codecClass=null; try { codecClass=conf.getClassByName("org.apache.hadoop.io.compress.GzipCodec").asSubclass(CompressionCodec.class); compressionCodec=(CompressionCodec)ReflectionUtils.newInstance(codecClass, conf); } catch (ClassNotFoundException cnfEx) { compressionCodec=null; LOG.info("!!! ClassNotFoun Exception thrown setting Gzip codec"); } openNextFile(); }
@Override public RecordReader<WordOffset,Text> getRecordReader(InputSplit split , JobConf job, Reporter reporter) throws IOException { return new MultiFileLineRecordReader(job, (MultiFileSplit)split); }
@Override public RecordReader<WordOffset, Text> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException { return new MultiFileLineRecordReader(job, (MultiFileSplit) split); }