public void testTotalOrderWithCustomSerialization() throws Exception { TotalOrderPartitioner<String, NullWritable> partitioner = new TotalOrderPartitioner<String, NullWritable>(); Configuration conf = new Configuration(); conf.setStrings(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, JavaSerialization.class.getName(), WritableSerialization.class.getName()); conf.setClass(MRJobConfig.KEY_COMPARATOR, JavaSerializationComparator.class, Comparator.class); Path p = TestTotalOrderPartitioner.<String>writePartitionFile( "totalordercustomserialization", conf, splitJavaStrings); conf.setClass(MRJobConfig.MAP_OUTPUT_KEY_CLASS, String.class, Object.class); try { partitioner.setConf(conf); NullWritable nw = NullWritable.get(); for (Check<String> chk : testJavaStrings) { assertEquals(chk.data.toString(), chk.part, partitioner.getPartition(chk.data, nw, splitJavaStrings.length + 1)); } } finally { p.getFileSystem(conf).delete(p, true); } }
/** * Tests read/write of Integer via native JavaleSerialization. * @throws Exception If fails. */ public void testIntJavaSerialization() throws Exception { HadoopSerialization ser = new HadoopSerializationWrapper(new JavaSerialization(), Integer.class); ByteArrayOutputStream buf = new ByteArrayOutputStream(); DataOutput out = new DataOutputStream(buf); ser.write(out, 3); ser.write(out, -5); ser.close(); DataInput in = new DataInputStream(new ByteArrayInputStream(buf.toByteArray())); assertEquals(3, ((Integer)ser.read(in, null)).intValue()); assertEquals(-5, ((Integer)ser.read(in, null)).intValue()); }
public AppWorkerContainer(AppConfig config) { this.config = config ; this.appContainerInfoHolder = new AppContainerInfoHolder(config.getAppWorkerContainerId()) ; try { Configuration rpcConf = new Configuration() ; rpcConf.set( CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, JavaSerialization.class.getName() + "," + WritableSerialization.class.getName() + "," + AvroSerialization.class.getName() ) ; rpcClient = new RPCClient(config.appHostName, config.appRpcPort) ; ipcService = IPCService.newBlockingStub(rpcClient.getRPCChannel()) ; Class<AppWorker> appWorkerClass = (Class<AppWorker>) Class.forName(config.worker) ; worker = appWorkerClass.newInstance() ; } catch(Throwable error) { LOGGER.error("Error" , error); onDestroy() ; } }
/** * Runs map reduce to do the sweeping on the mob files. * The running of the sweep tool on the same column family are mutually exclusive. * The HBase major compaction and running of the sweep tool on the same column family * are mutually exclusive. * These synchronization is done by the Zookeeper. * So in the beginning of the running, we need to make sure only this sweep tool is the only one * that is currently running in this column family, and in this column family there're no major * compaction in progress. * @param tn The current table name. * @param family The descriptor of the current column family. * @throws IOException * @throws ClassNotFoundException * @throws InterruptedException * @throws KeeperException */ public void sweep(TableName tn, HColumnDescriptor family) throws IOException, ClassNotFoundException, InterruptedException, KeeperException { Configuration conf = new Configuration(this.conf); // check whether the current user is the same one with the owner of hbase root String currentUserName = UserGroupInformation.getCurrentUser().getShortUserName(); FileStatus[] hbaseRootFileStat = fs.listStatus(new Path(conf.get(HConstants.HBASE_DIR))); if (hbaseRootFileStat.length > 0) { String owner = hbaseRootFileStat[0].getOwner(); if (!owner.equals(currentUserName)) { String errorMsg = "The current user[" + currentUserName + "] doesn't have the privilege." + " Please make sure the user is the root of the target HBase"; LOG.error(errorMsg); throw new IOException(errorMsg); } } else { LOG.error("The target HBase doesn't exist"); throw new IOException("The target HBase doesn't exist"); } String familyName = family.getNameAsString(); Job job = null; try { Scan scan = new Scan(); // Do not retrieve the mob data when scanning scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE)); scan.setFilter(new ReferenceOnlyFilter()); scan.setCaching(10000); scan.setCacheBlocks(false); scan.setMaxVersions(family.getMaxVersions()); conf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, JavaSerialization.class.getName() + "," + WritableSerialization.class.getName()); job = prepareJob(tn, familyName, scan, conf); job.getConfiguration().set(TableInputFormat.SCAN_COLUMN_FAMILY, familyName); // Record the compaction start time. // In the sweep tool, only the mob file whose modification time is older than // (startTime - delay) could be handled by this tool. // The delay is one day. It could be configured as well, but this is only used // in the test. job.getConfiguration().setLong(MobConstants.MOB_COMPACTION_START_DATE, compactionStartTime); job.setPartitionerClass(MobFilePathHashPartitioner.class); submit(job, tn, familyName); if (job.waitForCompletion(true)) { // Archive the unused mob files. removeUnusedFiles(job, tn, family); } } finally { cleanup(job, tn, familyName); } }
@Test public void testRun() throws Exception { byte[] mobValueBytes = new byte[100]; //get the path where mob files lie in Path mobFamilyPath = MobUtils.getMobFamilyPath(TEST_UTIL.getConfiguration(), TableName.valueOf(tableName), family); Put put = new Put(Bytes.toBytes(row)); put.add(Bytes.toBytes(family), Bytes.toBytes(qf), 1, mobValueBytes); Put put2 = new Put(Bytes.toBytes(row + "ignore")); put2.add(Bytes.toBytes(family), Bytes.toBytes(qf), 1, mobValueBytes); table.put(put); table.put(put2); table.flushCommits(); admin.flush(tableName); FileStatus[] fileStatuses = TEST_UTIL.getTestFileSystem().listStatus(mobFamilyPath); //check the generation of a mob file assertEquals(1, fileStatuses.length); String mobFile1 = fileStatuses[0].getPath().getName(); Configuration configuration = new Configuration(TEST_UTIL.getConfiguration()); configuration.setFloat(MobConstants.MOB_COMPACTION_INVALID_FILE_RATIO, 0.1f); configuration.setStrings(TableInputFormat.INPUT_TABLE, tableName); configuration.setStrings(TableInputFormat.SCAN_COLUMN_FAMILY, family); configuration.setStrings("mob.compaction.visited.dir", "jobWorkingNamesDir"); configuration.setStrings(SweepJob.WORKING_FILES_DIR_KEY, "compactionFileDir"); configuration.setStrings(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, JavaSerialization.class.getName()); configuration.set("mob.compaction.visited.dir", "compactionVisitedDir"); configuration.setLong(MobConstants.MOB_COMPACTION_START_DATE, System.currentTimeMillis() + 24 * 3600 * 1000); //use the same counter when mocking Counter counter = new GenericCounter(); Reducer<Text, KeyValue, Writable, Writable>.Context ctx = mock(Reducer.Context.class); when(ctx.getConfiguration()).thenReturn(configuration); when(ctx.getCounter(Matchers.any(SweepCounter.class))).thenReturn(counter); when(ctx.nextKey()).thenReturn(true).thenReturn(false); when(ctx.getCurrentKey()).thenReturn(new Text(mobFile1)); byte[] refBytes = Bytes.toBytes(mobFile1); long valueLength = refBytes.length; byte[] newValue = Bytes.add(Bytes.toBytes(valueLength), refBytes); KeyValue kv2 = new KeyValue(Bytes.toBytes(row), Bytes.toBytes(family), Bytes.toBytes(qf), 1, KeyValue.Type.Put, newValue); List<KeyValue> list = new ArrayList<KeyValue>(); list.add(kv2); when(ctx.getValues()).thenReturn(list); SweepReducer reducer = new SweepReducer(); reducer.run(ctx); FileStatus[] filsStatuses2 = TEST_UTIL.getTestFileSystem().listStatus(mobFamilyPath); String mobFile2 = filsStatuses2[0].getPath().getName(); //new mob file is generated, old one has been archived assertEquals(1, filsStatuses2.length); assertEquals(false, mobFile2.equalsIgnoreCase(mobFile1)); //test sequence file String workingPath = configuration.get("mob.compaction.visited.dir"); FileStatus[] statuses = TEST_UTIL.getTestFileSystem().listStatus(new Path(workingPath)); Set<String> files = new TreeSet<String>(); for (FileStatus st : statuses) { files.addAll(getKeyFromSequenceFile(TEST_UTIL.getTestFileSystem(), st.getPath(), configuration)); } assertEquals(1, files.size()); assertEquals(true, files.contains(mobFile1)); }