Java 类org.apache.kafka.clients.producer.internals.DefaultPartitioner 实例源码

项目:kafka-0.11.0.0-src-with-comment    文件:RecordCollectorTest.java   
@Test
public void testStreamPartitioner() {

    final RecordCollectorImpl collector = new RecordCollectorImpl(
            new MockProducer<>(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer),
            "RecordCollectorTest-TestStreamPartitioner");

    collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
    collector.send("topic1", "9", "0", null, stringSerializer, stringSerializer, streamPartitioner);
    collector.send("topic1", "27", "0", null, stringSerializer, stringSerializer, streamPartitioner);
    collector.send("topic1", "81", "0", null, stringSerializer, stringSerializer, streamPartitioner);
    collector.send("topic1", "243", "0", null, stringSerializer, stringSerializer, streamPartitioner);

    collector.send("topic1", "28", "0", null, stringSerializer, stringSerializer, streamPartitioner);
    collector.send("topic1", "82", "0", null, stringSerializer, stringSerializer, streamPartitioner);
    collector.send("topic1", "244", "0", null, stringSerializer, stringSerializer, streamPartitioner);

    collector.send("topic1", "245", "0", null, stringSerializer, stringSerializer, streamPartitioner);

    final Map<TopicPartition, Long> offsets = collector.offsets();

    assertEquals((Long) 4L, offsets.get(new TopicPartition("topic1", 0)));
    assertEquals((Long) 2L, offsets.get(new TopicPartition("topic1", 1)));
    assertEquals((Long) 0L, offsets.get(new TopicPartition("topic1", 2)));
}
项目:kafka-0.11.0.0-src-with-comment    文件:RecordCollectorTest.java   
@SuppressWarnings("unchecked")
@Test
public void shouldRetryWhenTimeoutExceptionOccursOnSend() throws Exception {
    final AtomicInteger attempt = new AtomicInteger(0);
    final RecordCollectorImpl collector = new RecordCollectorImpl(
            new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
                @Override
                public synchronized Future<RecordMetadata> send(final ProducerRecord record, final Callback callback) {
                    if (attempt.getAndIncrement() == 0) {
                        throw new TimeoutException();
                    }
                    return super.send(record, callback);
                }
            },
            "test");

    collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
    final Long offset = collector.offsets().get(new TopicPartition("topic1", 0));
    assertEquals(Long.valueOf(0L), offset);
}
项目:streamx    文件:HdfsSinkConnectorConfig.java   
@Override
public boolean visible(String name, Map<String, Object> connectorConfigs) {
  String partitionerName = (String) connectorConfigs.get(PARTITIONER_CLASS_CONFIG);
  try {
    @SuppressWarnings("unchecked")
    Class<? extends Partitioner> partitioner = (Class<? extends Partitioner>) Class.forName(partitionerName);
    if (classNameEquals(partitionerName, DefaultPartitioner.class)) {
      return false;
    } else if (FieldPartitioner.class.isAssignableFrom(partitioner)) {
      // subclass of FieldPartitioner
      return name.equals(PARTITION_FIELD_NAME_CONFIG);
    } else if (TimeBasedPartitioner.class.isAssignableFrom(partitioner)) {
      // subclass of TimeBasedPartitioner
      if (classNameEquals(partitionerName, DailyPartitioner.class) || classNameEquals(partitionerName, HourlyPartitioner.class)) {
        return name.equals(LOCALE_CONFIG) || name.equals(TIMEZONE_CONFIG);
      } else {
        return name.equals(PARTITION_DURATION_MS_CONFIG) || name.equals(PATH_FORMAT_CONFIG) || name.equals(LOCALE_CONFIG) || name.equals(TIMEZONE_CONFIG);
      }
    } else {
      throw new ConfigException("Not a valid partitioner class: " + partitionerName);
    }
  } catch (ClassNotFoundException e) {
    throw new ConfigException("Partitioner class not found: " + partitionerName);
  }
}
项目:otj-logging    文件:PartitionSpreadTest.java   
@Test
public void testPartitionSpread() throws Exception {
    Multiset<Integer> results = TreeMultiset.create();
    Cluster c = Cluster.empty();
    try (Partitioner p = new DefaultPartitioner()) {
        PartitionKeyGenerator pkg = new PartitionKeyGenerator();

        mockPartitions(c);

        for (int i = 0; i < messages; i++) {
            results.add(p.partition("test", null, pkg.next(), null, null, c));
        }

        int expected = messages / partitions;
        double threshold = expected * 0.05;

        for (Multiset.Entry<Integer> e : results.entrySet()) {
            int offBy = Math.abs(e.getCount() - expected);
            assertTrue("Partition " + e.getElement() + " had " + e.getCount() + " elements, expected " + expected + ", threshold is " + threshold,
                    offBy < threshold);
        }
    }
}
项目:kafka-0.11.0.0-src-with-comment    文件:WindowedStreamPartitionerTest.java   
@Test
public void testCopartitioning() {

    Random rand = new Random();

    DefaultPartitioner defaultPartitioner = new DefaultPartitioner();

    WindowedSerializer<Integer> windowedSerializer = new WindowedSerializer<>(intSerializer);
    WindowedStreamPartitioner<Integer, String> streamPartitioner = new WindowedStreamPartitioner<>(topicName, windowedSerializer);

    for (int k = 0; k < 10; k++) {
        Integer key = rand.nextInt();
        byte[] keyBytes = intSerializer.serialize(topicName, key);

        String value = key.toString();
        byte[] valueBytes = stringSerializer.serialize(topicName, value);

        Integer expected = defaultPartitioner.partition("topic", key, keyBytes, value, valueBytes, cluster);

        for (int w = 1; w < 10; w++) {
            TimeWindow window = new TimeWindow(10 * w, 20 * w);

            Windowed<Integer> windowedKey = new Windowed<>(key, window);
            Integer actual = streamPartitioner.partition(windowedKey, value, infos.size());

            assertEquals(expected, actual);
        }
    }
}
项目:kafka-0.11.0.0-src-with-comment    文件:RecordCollectorTest.java   
@Test
public void testSpecificPartition() {

    final RecordCollectorImpl collector = new RecordCollectorImpl(
            new MockProducer<>(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer),
            "RecordCollectorTest-TestSpecificPartition");

    collector.send("topic1", "999", "0", 0, null, stringSerializer, stringSerializer);
    collector.send("topic1", "999", "0", 0, null, stringSerializer, stringSerializer);
    collector.send("topic1", "999", "0", 0, null, stringSerializer, stringSerializer);

    collector.send("topic1", "999", "0", 1, null, stringSerializer, stringSerializer);
    collector.send("topic1", "999", "0", 1, null, stringSerializer, stringSerializer);

    collector.send("topic1", "999", "0", 2, null, stringSerializer, stringSerializer);

    final Map<TopicPartition, Long> offsets = collector.offsets();

    assertEquals((Long) 2L, offsets.get(new TopicPartition("topic1", 0)));
    assertEquals((Long) 1L, offsets.get(new TopicPartition("topic1", 1)));
    assertEquals((Long) 0L, offsets.get(new TopicPartition("topic1", 2)));

    // ignore StreamPartitioner
    collector.send("topic1", "999", "0", 0, null, stringSerializer, stringSerializer);
    collector.send("topic1", "999", "0", 1, null, stringSerializer, stringSerializer);
    collector.send("topic1", "999", "0", 2, null, stringSerializer, stringSerializer);

    assertEquals((Long) 3L, offsets.get(new TopicPartition("topic1", 0)));
    assertEquals((Long) 2L, offsets.get(new TopicPartition("topic1", 1)));
    assertEquals((Long) 1L, offsets.get(new TopicPartition("topic1", 2)));
}
项目:kafka-0.11.0.0-src-with-comment    文件:RecordCollectorTest.java   
@SuppressWarnings("unchecked")
@Test(expected = StreamsException.class)
public void shouldThrowStreamsExceptionAfterMaxAttempts() throws Exception {
    final RecordCollector collector = new RecordCollectorImpl(
            new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
                @Override
                public synchronized Future<RecordMetadata> send(final ProducerRecord record, final Callback callback) {
                    throw new TimeoutException();
                }
            },
            "test");

    collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);

}
项目:kafka-0.11.0.0-src-with-comment    文件:RecordCollectorTest.java   
@SuppressWarnings("unchecked")
@Test(expected = StreamsException.class)
public void shouldThrowStreamsExceptionOnSubsequentCallIfASendFails() throws Exception {
    final RecordCollector collector = new RecordCollectorImpl(
            new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
                @Override
                public synchronized Future<RecordMetadata> send(final ProducerRecord record, final Callback callback) {
                    callback.onCompletion(null, new Exception());
                    return null;
                }
            },
            "test");
    collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
    collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
}
项目:kafka-0.11.0.0-src-with-comment    文件:RecordCollectorTest.java   
@SuppressWarnings("unchecked")
@Test(expected = StreamsException.class)
public void shouldThrowStreamsExceptionOnFlushIfASendFailed() throws Exception {
    final RecordCollector collector = new RecordCollectorImpl(
            new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
                @Override
                public synchronized Future<RecordMetadata> send(final ProducerRecord record, final Callback callback) {
                    callback.onCompletion(null, new Exception());
                    return null;
                }
            },
            "test");
    collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
    collector.flush();
}
项目:kafka-0.11.0.0-src-with-comment    文件:RecordCollectorTest.java   
@SuppressWarnings("unchecked")
@Test(expected = StreamsException.class)
public void shouldThrowStreamsExceptionOnCloseIfASendFailed() throws Exception {
    final RecordCollector collector = new RecordCollectorImpl(
            new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
                @Override
                public synchronized Future<RecordMetadata> send(final ProducerRecord record, final Callback callback) {
                    callback.onCompletion(null, new Exception());
                    return null;
                }
            },
            "test");
    collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
    collector.close();
}
项目:kafka-0.11.0.0-src-with-comment    文件:RecordCollectorTest.java   
@SuppressWarnings("unchecked")
@Test(expected = StreamsException.class)
public void shouldThrowIfTopicIsUnknown() {
    final RecordCollector collector = new RecordCollectorImpl(
        new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
            @Override
            public List<PartitionInfo> partitionsFor(final String topic) {
                return Collections.EMPTY_LIST;
            }

        },
        "test");
    collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
}
项目:kafka-0.11.0.0-src-with-comment    文件:MockProducerTest.java   
@Test
public void testPartitioner() throws Exception {
    PartitionInfo partitionInfo0 = new PartitionInfo(topic, 0, null, null, null);
    PartitionInfo partitionInfo1 = new PartitionInfo(topic, 1, null, null, null);
    Cluster cluster = new Cluster(null, new ArrayList<Node>(0), asList(partitionInfo0, partitionInfo1),
            Collections.<String>emptySet(), Collections.<String>emptySet());
    MockProducer<String, String> producer = new MockProducer<>(cluster, true, new DefaultPartitioner(), new StringSerializer(), new StringSerializer());
    ProducerRecord<String, String> record = new ProducerRecord<>(topic, "key", "value");
    Future<RecordMetadata> metadata = producer.send(record);
    assertEquals("Partition should be correct", 1, metadata.get().partition());
    producer.clear();
    assertEquals("Clear should erase our history", 0, producer.history().size());
}
项目:kafka    文件:MockProducerTest.java   
@Test
public void testPartitioner() throws Exception {
    PartitionInfo partitionInfo0 = new PartitionInfo(topic, 0, null, null, null);
    PartitionInfo partitionInfo1 = new PartitionInfo(topic, 1, null, null, null);
    Cluster cluster = new Cluster(new ArrayList<Node>(0), asList(partitionInfo0, partitionInfo1), Collections.<String>emptySet());
    MockProducer<String, String> producer = new MockProducer<String, String>(cluster, true, new DefaultPartitioner(), new StringSerializer(), new StringSerializer());
    ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, "key", "value");
    Future<RecordMetadata> metadata = producer.send(record);
    assertEquals("Partition should be correct", 1, metadata.get().partition());
    producer.clear();
    assertEquals("Clear should erase our history", 0, producer.history().size());
}
项目:eventasia    文件:EventasiaKafkaConfig.java   
@Bean
public Map<String, Object> producerConfigs() {
    //FIXME: 12factorize
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokerList());
    props.put(ProducerConfig.RETRIES_CONFIG, retries);
    props.put(ProducerConfig.ACKS_CONFIG, acks);
    //props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
    //props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
    //props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
    props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, DefaultPartitioner.class);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    return props;
}
项目:kafka-0.11.0.0-src-with-comment    文件:DefaultStreamPartitioner.java   
public DefaultStreamPartitioner(final Serializer<K> keySerializer, final Cluster cluster, final String topic) {
    this.keySerializer = keySerializer;
    this.cluster = cluster;
    this.topic = topic;
    this.defaultPartitioner = new DefaultPartitioner();
}
项目:kafka-0.11.0.0-src-with-comment    文件:MockProducer.java   
/**
 * Create a new mock producer with invented metadata the given autoComplete setting and key\value serializers.
 *
 * Equivalent to {@link #MockProducer(Cluster, boolean, Partitioner, Serializer, Serializer)} new MockProducer(Cluster.empty(), autoComplete, new DefaultPartitioner(), keySerializer, valueSerializer)}
 */
public MockProducer(final boolean autoComplete,
                    final Serializer<K> keySerializer,
                    final Serializer<V> valueSerializer) {
    this(Cluster.empty(), autoComplete, new DefaultPartitioner(), keySerializer, valueSerializer);
}
项目:kafka    文件:MockProducer.java   
/**
 * Create a new mock producer with invented metadata the given autoComplete setting and key\value serializers
 *
 * Equivalent to {@link #MockProducer(Cluster, boolean, Partitioner, Serializer, Serializer)} new MockProducer(Cluster.empty(), autoComplete, new DefaultPartitioner(), keySerializer, valueSerializer)}
 */
public MockProducer(boolean autoComplete, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
    this(Cluster.empty(), autoComplete, new DefaultPartitioner(), keySerializer, valueSerializer);
}