public void testCustomOffsets() { Configuration conf = new Configuration(); BinaryComparable key1 = new BytesWritable(new byte[] { 1, 2, 3, 4, 5 }); BinaryComparable key2 = new BytesWritable(new byte[] { 6, 2, 3, 7, 8 }); BinaryPartitioner.setOffsets(conf, 1, -3); BinaryPartitioner<?> partitioner = ReflectionUtils.newInstance(BinaryPartitioner.class, conf); int partition1 = partitioner.getPartition(key1, null, 10); int partition2 = partitioner.getPartition(key2, null, 10); assertEquals(partition1, partition2); BinaryPartitioner.setOffsets(conf, 1, 2); partitioner = ReflectionUtils.newInstance(BinaryPartitioner.class, conf); partition1 = partitioner.getPartition(key1, null, 10); partition2 = partitioner.getPartition(key2, null, 10); assertEquals(partition1, partition2); BinaryPartitioner.setOffsets(conf, -4, -3); partitioner = ReflectionUtils.newInstance(BinaryPartitioner.class, conf); partition1 = partitioner.getPartition(key1, null, 10); partition2 = partitioner.getPartition(key2, null, 10); assertEquals(partition1, partition2); }
public void testDefaultOffsets() { Configuration conf = new Configuration(); BinaryPartitioner<?> partitioner = ReflectionUtils.newInstance(BinaryPartitioner.class, conf); BinaryComparable key1 = new BytesWritable(new byte[] { 1, 2, 3, 4, 5 }); BinaryComparable key2 = new BytesWritable(new byte[] { 1, 2, 3, 4, 5 }); int partition1 = partitioner.getPartition(key1, null, 10); int partition2 = partitioner.getPartition(key2, null, 10); assertEquals(partition1, partition2); key1 = new BytesWritable(new byte[] { 1, 2, 3, 4, 5 }); key2 = new BytesWritable(new byte[] { 6, 2, 3, 4, 5 }); partition1 = partitioner.getPartition(key1, null, 10); partition2 = partitioner.getPartition(key2, null, 10); assertTrue(partition1 != partition2); key1 = new BytesWritable(new byte[] { 1, 2, 3, 4, 5 }); key2 = new BytesWritable(new byte[] { 1, 2, 3, 4, 6 }); partition1 = partitioner.getPartition(key1, null, 10); partition2 = partitioner.getPartition(key2, null, 10); assertTrue(partition1 != partition2); }
@Test public void testDefaultOffsets() { Configuration conf = new Configuration(); BinaryPartitioner<?> partitioner = ReflectionUtils.newInstance(BinaryPartitioner.class, conf); BinaryComparable key1 = new BytesWritable(new byte[] { 1, 2, 3, 4, 5 }); BinaryComparable key2 = new BytesWritable(new byte[] { 1, 2, 3, 4, 5 }); int partition1 = partitioner.getPartition(key1, null, 10); int partition2 = partitioner.getPartition(key2, null, 10); assertEquals(partition1, partition2); key1 = new BytesWritable(new byte[] { 1, 2, 3, 4, 5 }); key2 = new BytesWritable(new byte[] { 6, 2, 3, 4, 5 }); partition1 = partitioner.getPartition(key1, null, 10); partition2 = partitioner.getPartition(key2, null, 10); assertTrue(partition1 != partition2); key1 = new BytesWritable(new byte[] { 1, 2, 3, 4, 5 }); key2 = new BytesWritable(new byte[] { 1, 2, 3, 4, 6 }); partition1 = partitioner.getPartition(key1, null, 10); partition2 = partitioner.getPartition(key2, null, 10); assertTrue(partition1 != partition2); }
@Test public void testCustomOffsets() { Configuration conf = new Configuration(); BinaryComparable key1 = new BytesWritable(new byte[] { 1, 2, 3, 4, 5 }); BinaryComparable key2 = new BytesWritable(new byte[] { 6, 2, 3, 7, 8 }); BinaryPartitioner.setOffsets(conf, 1, -3); BinaryPartitioner<?> partitioner = ReflectionUtils.newInstance(BinaryPartitioner.class, conf); int partition1 = partitioner.getPartition(key1, null, 10); int partition2 = partitioner.getPartition(key2, null, 10); assertEquals(partition1, partition2); BinaryPartitioner.setOffsets(conf, 1, 2); partitioner = ReflectionUtils.newInstance(BinaryPartitioner.class, conf); partition1 = partitioner.getPartition(key1, null, 10); partition2 = partitioner.getPartition(key2, null, 10); assertEquals(partition1, partition2); BinaryPartitioner.setOffsets(conf, -4, -3); partitioner = ReflectionUtils.newInstance(BinaryPartitioner.class, conf); partition1 = partitioner.getPartition(key1, null, 10); partition2 = partitioner.getPartition(key2, null, 10); assertEquals(partition1, partition2); }
public void testLowerBound() { Configuration conf = new Configuration(); BinaryPartitioner.setLeftOffset(conf, 0); BinaryPartitioner<?> partitioner = ReflectionUtils.newInstance(BinaryPartitioner.class, conf); BinaryComparable key1 = new BytesWritable(new byte[] { 1, 2, 3, 4, 5 }); BinaryComparable key2 = new BytesWritable(new byte[] { 6, 2, 3, 4, 5 }); int partition1 = partitioner.getPartition(key1, null, 10); int partition2 = partitioner.getPartition(key2, null, 10); assertTrue(partition1 != partition2); }
public void testUpperBound() { Configuration conf = new Configuration(); BinaryPartitioner.setRightOffset(conf, 4); BinaryPartitioner<?> partitioner = ReflectionUtils.newInstance(BinaryPartitioner.class, conf); BinaryComparable key1 = new BytesWritable(new byte[] { 1, 2, 3, 4, 5 }); BinaryComparable key2 = new BytesWritable(new byte[] { 1, 2, 3, 4, 6 }); int partition1 = partitioner.getPartition(key1, null, 10); int partition2 = partitioner.getPartition(key2, null, 10); assertTrue(partition1 != partition2); }
/** * Read in the partition file and build indexing data structures. * If the keytype is {@link org.apache.hadoop.io.BinaryComparable} and * <tt>total.order.partitioner.natural.order</tt> is not false, a trie * of the first <tt>total.order.partitioner.max.trie.depth</tt>(2) + 1 bytes * will be built. Otherwise, keys will be located using a binary search of * the partition keyset using the {@link org.apache.hadoop.io.RawComparator} * defined for this job. The input file must be sorted with the same * comparator and contain {@link Job#getNumReduceTasks()} - 1 keys. */ @SuppressWarnings("unchecked") // keytype from conf not static public void setConf(Configuration conf) { try { this.conf = conf; String parts = getPartitionFile(conf); final Path partFile = new Path(parts); final FileSystem fs = (DEFAULT_PATH.equals(parts)) ? FileSystem.getLocal(conf) // assume in DistributedCache : partFile.getFileSystem(conf); Job job = Job.getInstance(conf); Class<K> keyClass = (Class<K>)job.getMapOutputKeyClass(); K[] splitPoints = readPartitions(fs, partFile, keyClass, conf); if (splitPoints.length != job.getNumReduceTasks() - 1) { throw new IOException("Wrong number of partitions in keyset"); } RawComparator<K> comparator = (RawComparator<K>) job.getSortComparator(); for (int i = 0; i < splitPoints.length - 1; ++i) { if (comparator.compare(splitPoints[i], splitPoints[i+1]) >= 0) { throw new IOException("Split points are out of order"); } } boolean natOrder = conf.getBoolean(NATURAL_ORDER, true); if (natOrder && BinaryComparable.class.isAssignableFrom(keyClass)) { partitions = buildTrie((BinaryComparable[])splitPoints, 0, splitPoints.length, new byte[0], // Now that blocks of identical splitless trie nodes are // represented reentrantly, and we develop a leaf for any trie // node with only one split point, the only reason for a depth // limit is to refute stack overflow or bloat in the pathological // case where the split points are long and mostly look like bytes // iii...iixii...iii . Therefore, we make the default depth // limit large but not huge. conf.getInt(MAX_TRIE_DEPTH, 200)); } else { partitions = new BinarySearchNode(splitPoints, comparator); } } catch (IOException e) { throw new IllegalArgumentException("Can't read partitions file", e); } }
public int findPartition(BinaryComparable key) { int level = getLevel(); if (key.getLength() <= level) { return child[0].findPartition(key); } return child[0xFF & key.getBytes()[level]].findPartition(key); }
/** * @param level the tree depth at this node * @param splitPoints the full split point vector, which holds * the split point or points this leaf node * should contain * @param lower first INcluded element of splitPoints * @param upper first EXcluded element of splitPoints * @return a leaf node. They come in three kinds: no split points * [and the findParttion returns a canned index], one split * point [and we compare with a single comparand], or more * than one [and we do a binary search]. The last case is * rare. */ private TrieNode LeafTrieNodeFactory (int level, BinaryComparable[] splitPoints, int lower, int upper) { switch (upper - lower) { case 0: return new UnsplitTrieNode(level, lower); case 1: return new SinglySplitTrieNode(level, splitPoints, lower); default: return new LeafTrieNode(level, splitPoints, lower, upper); } }
/** * Use (the specified slice of the array returned by) * {@link BinaryComparable#getBytes()} to partition. */ @Override public int getPartition(BinaryComparable key, V value, int numPartitions) { int length = key.getLength(); int leftIndex = (leftOffset + length) % length; int rightIndex = (rightOffset + length) % length; int hash = WritableComparator.hashBytes(key.getBytes(), leftIndex, rightIndex - leftIndex + 1); return (hash & Integer.MAX_VALUE) % numPartitions; }