@Test public void testArrayWriteText() throws Exception { if (!canTest()) { return; } String txtValue = "CIAO MONDO !"; template.sendBody("direct:write_text4", txtValue); Configuration conf = new Configuration(); Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "/test-camel-text4"); FileSystem fs1 = FileSystem.get(file1.toUri(), conf); ArrayFile.Reader reader = new ArrayFile.Reader(fs1, "file:///" + TEMP_DIR.toUri() + "/test-camel-text4", conf); Text value = (Text) ReflectionUtils.newInstance(reader.getValueClass(), conf); reader.next(value); assertEquals(value.toString(), txtValue); IOHelper.close(reader); }
public static void main(String argv[]) throws Exception { String usage = "Content (-local | -dfs <namenode:port>) recno segment"; if (argv.length < 3) { System.out.println("usage:" + usage); return; } Options opts = new Options(); Configuration conf = NutchConfiguration.create(); GenericOptionsParser parser = new GenericOptionsParser(conf, opts, argv); String[] remainingArgs = parser.getRemainingArgs(); FileSystem fs = FileSystem.get(conf); try { int recno = Integer.parseInt(remainingArgs[0]); String segment = remainingArgs[1]; Path file = new Path(segment, DIR_NAME); System.out.println("Reading from file: " + file); ArrayFile.Reader contents = new ArrayFile.Reader(fs, file.toString(), conf); Content content = new Content(); contents.get(recno, content); System.out.println("Retrieved " + recno + " from file " + file); System.out.println(content); contents.close(); } finally { fs.close(); } }
@Test public void testReadStringArrayFile() throws Exception { if (!canTest()) { return; } final Path file = new Path(new File("target/test/test-camel-string").getAbsolutePath()); Configuration conf = new Configuration(); FileSystem fs1 = FileSystem.get(file.toUri(), conf); ArrayFile.Writer writer = new ArrayFile.Writer(conf, fs1, "target/test/test-camel-string1", Text.class, CompressionType.NONE, new Progressable() { @Override public void progress() { } }); Text valueWritable = new Text(); String value = "CIAO!"; valueWritable.set(value); writer.append(valueWritable); writer.close(); MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class); resultEndpoint.expectedMessageCount(1); context.addRoutes(new RouteBuilder() { public void configure() { from("hdfs2:localhost/" + file.getParent().toUri() + "?fileSystemType=LOCAL&fileType=ARRAY_FILE&initialDelay=0").to("mock:result"); } }); context.start(); resultEndpoint.assertIsSatisfied(); }
@Test public void testReadStringArrayFile() throws Exception { if (!canTest()) { return; } final Path file = new Path(new File("target/test/test-camel-string").getAbsolutePath()); Configuration conf = new Configuration(); FileSystem fs1 = FileSystem.get(file.toUri(), conf); ArrayFile.Writer writer = new ArrayFile.Writer(conf, fs1, "target/test/test-camel-string1", Text.class, CompressionType.NONE, new Progressable() { @Override public void progress() { } }); Text valueWritable = new Text(); String value = "CIAO!"; valueWritable.set(value); writer.append(valueWritable); writer.close(); MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class); resultEndpoint.expectedMessageCount(1); context.addRoutes(new RouteBuilder() { public void configure() { from("hdfs:localhost/" + file.getParent().toUri() + "?fileSystemType=LOCAL&fileType=ARRAY_FILE&initialDelay=0").to("mock:result"); } }); context.start(); resultEndpoint.assertIsSatisfied(); }