/** * Handles initializing this class with objects specific to it (i.e., the parser). Common * initialization that might be leveraged by a subsclass is done in <code>doSetup</code>. Hence a * subclass may choose to override this method and call <code>doSetup</code> as well before * handling it's own custom params. * @param context */ @Override protected void setup(Context context) throws IOException { doSetup(context); Configuration conf = context.getConfiguration(); parser = new TsvParser(conf.get(ImportTsv.COLUMNS_CONF_KEY), separator); if (parser.getRowKeyColumnIndex() == -1) { throw new RuntimeException("No row key column specified"); } String tableName = conf.get(TableInputFormat.INPUT_TABLE); HTable hTable = null; try { hTable = new HTable(conf, tableName); this.startKeys = hTable.getStartKeys(); byte[] indexBytes = hTable.getTableDescriptor().getValue(Constants.INDEX_SPEC_KEY); if (indexBytes != null) { TableIndices tableIndices = new TableIndices(); tableIndices.readFields(indexBytes); this.indices = tableIndices.getIndices(); } } finally { if (hTable != null) hTable.close(); } }
/** * Handles common parameter initialization that a subclass might want to leverage. * @param context */ protected void doSetup(Context context) { Configuration conf = context.getConfiguration(); // If a custom separator has been used, // decode it back from Base64 encoding. separator = conf.get(ImportTsv.SEPARATOR_CONF_KEY); if (separator == null) { separator = ImportTsv.DEFAULT_SEPARATOR; } else { separator = new String(Base64.decode(separator)); } // Should never get 0 as we are setting this to a valid value in job configuration. ts = conf.getLong(ImportTsv.TIMESTAMP_CONF_KEY, 0); skipBadLines = context.getConfiguration().getBoolean(ImportTsv.SKIP_LINES_CONF_KEY, true); badLineCount = context.getCounter("ImportTsv", "Bad Lines"); hfileOutPath = conf.get(ImportTsv.BULK_OUTPUT_CONF_KEY); indexedTable = conf.getBoolean(IndexMapReduceUtil.IS_INDEXED_TABLE, false); }
@Test public void testTsvParser() throws BadTsvLineException { TsvParser parser = new TsvParser("col_a,col_b:qual,HBASE_ROW_KEY,col_d", "\t"); assertBytesEquals(Bytes.toBytes("col_a"), parser.getFamily(0)); assertBytesEquals(HConstants.EMPTY_BYTE_ARRAY, parser.getQualifier(0)); assertBytesEquals(Bytes.toBytes("col_b"), parser.getFamily(1)); assertBytesEquals(Bytes.toBytes("qual"), parser.getQualifier(1)); assertNull(parser.getFamily(2)); assertNull(parser.getQualifier(2)); assertEquals(2, parser.getRowKeyColumnIndex()); assertEquals(ImportTsv.TsvParser.DEFAULT_TIMESTAMP_COLUMN_INDEX, parser.getTimestampKeyColumnIndex()); byte[] line = Bytes.toBytes("val_a\tval_b\tval_c\tval_d"); ParsedLine parsed = parser.parse(line, line.length); checkParsing(parsed, Splitter.on("\t").split(Bytes.toString(line))); }
@Test public void testMROnTable() throws Exception { String TABLE_NAME = "testMROnTable"; String FAMILY = "FAM"; String INPUT_FILE = "InputFile.esv"; // Prepare the arguments required for the test. String[] args = new String[] { "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B", "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", TABLE_NAME, INPUT_FILE }; doMROnTableTest(INPUT_FILE, FAMILY, TABLE_NAME, null, args, 1); }
@Test public void testMROnTableWithTimestamp() throws Exception { String TABLE_NAME = "testMROnTableWithTimestamp"; String FAMILY = "FAM"; String INPUT_FILE = "InputFile1.csv"; // Prepare the arguments required for the test. String[] args = new String[] { "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,HBASE_TS_KEY,FAM:A,FAM:B", "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=,", TABLE_NAME, INPUT_FILE }; String data = "KEY,1234,VALUE1,VALUE2\n"; doMROnTableTest(INPUT_FILE, FAMILY, TABLE_NAME, data, args, 1); }
@Test public void testBulkOutputWithoutAnExistingTable() throws Exception { String TABLE_NAME = "testBulkOutputWithoutAnExistingTable"; String FAMILY = "FAM"; String INPUT_FILE = "InputFile2.esv"; // Prepare the arguments required for the test. String[] args = new String[] { "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B", "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", "-Dtable.columns.index=IDX1=>FAM:[A->String&10]",TABLE_NAME, INPUT_FILE }; doMROnTableTest(INPUT_FILE, FAMILY, TABLE_NAME, null, args, 1); }
@Test public void testIndexBulkLoad() throws Exception { String TABLE_NAME = "testIndexBulkLoad"; String FAMILY = "FAM"; String INPUT_FILE = "InputFile2.esv"; // Prepare the arguments required for the test. String[] args = new String[] { "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B", "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=output", "-Dtable.columns.index=IDX1=>FAM:[A->String&10]",TABLE_NAME, INPUT_FILE }; doMROnTableTest(INPUT_FILE, FAMILY, TABLE_NAME, null, args, 1); }
/** * Import local file to table. * @param conf * @param args * @param TABLE_NAME * @param INPUT_FILE * @throws IOException * @throws InterruptedException * @throws ClassNotFoundException */ protected static void importLocalFile2Table( final Configuration conf, final String[] args, final String TABLE_NAME, final String INPUT_FILE) throws IOException, InterruptedException, ClassNotFoundException { Validate.notEmpty(INPUT_FILE, "INPUT_FILE shall not be empty or null"); InputStream ips = ClassLoader.getSystemResourceAsStream(INPUT_FILE); assertNotNull(ips); FileSystem fs = FileSystem.get(conf); FSDataOutputStream op = fs.create(new Path(INPUT_FILE), true); IOUtils.write(IOUtils.toString(ips), op, HConstants.UTF8_ENCODING); IOUtils.closeQuietly(op); IOUtils.closeQuietly(ips); int length = args.length + 2; String[] newArgs = new String[length]; System.arraycopy(args, 0, newArgs, 0, args.length); newArgs[length - 2] = TABLE_NAME; // newArgs[length - 1] = INPUT_FILE_PATH + INPUT_FILE; newArgs[length - 1] = INPUT_FILE; Job job = ImportTsv.createSubmittableJob(conf, newArgs); job.waitForCompletion(true); assertTrue(job.isSuccessful()); }
/** * Sets up the actual job. * @param conf The current configuration. * @param args The command line parameters. * @return The newly created job. * @throws IOException When setting up the job fails. * @throws InterruptedException */ public static Job createSubmittableJob(Configuration conf, String[] args) throws IOException, ClassNotFoundException { HBaseAdmin admin = new IndexAdmin(conf); // Support non-XML supported characters // by re-encoding the passed separator as a Base64 string. String actualSeparator = conf.get(ImportTsv.SEPARATOR_CONF_KEY); if (actualSeparator != null) { conf.set(ImportTsv.SEPARATOR_CONF_KEY, Base64.encodeBytes(actualSeparator.getBytes())); } // See if a non-default Mapper was set String mapperClassName = conf.get(ImportTsv.MAPPER_CONF_KEY); Class mapperClass = mapperClassName != null ? Class.forName(mapperClassName) : DEFAULT_MAPPER; String tableName = args[0]; Path inputDir = new Path(args[1]); String input = conf.get(IndexUtils.TABLE_INPUT_COLS); HTableDescriptor htd = null; if (!admin.tableExists(tableName)) { htd = ImportTsv.prepareHTableDescriptor(tableName, conf.getStrings(ImportTsv.COLUMNS_CONF_KEY)); if (input != null) { htd = IndexUtils.parse(tableName, htd, input, null); } admin.createTable(htd); } conf.set(TableInputFormat.INPUT_TABLE, tableName); conf.setBoolean(IndexMapReduceUtil.IS_INDEXED_TABLE, input != null); Job job = new Job(conf, NAME + "_" + tableName); job.setJarByClass(mapperClass); FileInputFormat.setInputPaths(job, inputDir); job.setInputFormatClass(TextInputFormat.class); job.setMapperClass(mapperClass); String hfileOutPath = conf.get(ImportTsv.BULK_OUTPUT_CONF_KEY); if (hfileOutPath != null) { HTable table = new HTable(conf, tableName); job.setReducerClass(PutSortReducer.class); Path outputDir = new Path(hfileOutPath); FileOutputFormat.setOutputPath(job, outputDir); job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(Put.class); IndexHFileOutputFormat.configureIncrementalLoad(job, table); } else { // No reducers. Just write straight to table. Call initTableReducerJob // to set up the TableOutputFormat. TableMapReduceUtil.initTableReducerJob(tableName, null, job); job.setNumReduceTasks(0); } TableMapReduceUtil.addDependencyJars(job); TableMapReduceUtil.addDependencyJars(job.getConfiguration(), com.google.common.base.Function.class /* Guava used by TsvParser */); return job; }