Java 类org.apache.kafka.common.serialization.IntegerSerializer 实例源码

项目:kafka-webview    文件:WebKafkaConsumerTest.java   
public void publishDummyDataNumbers() {
    final String topic = "NumbersTopic";

    // Create publisher
    final Map<String, Object> config = new HashMap<>();
    config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
    config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
    config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

    final KafkaProducer<Integer, Integer> producer = new KafkaProducer<>(config);
    for (int value = 0; value < 10000; value++) {
        producer.send(new ProducerRecord<>(topic, value, value));
    }
    producer.flush();
    producer.close();
}
项目:talk-kafka-messaging-logs    文件:ProduceConsumeIntegerStringRecord.java   
private static void produceRecords(String bootstrapServers) {
    Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

    Producer<Integer, String> producer = new KafkaProducer<>(properties);

    IntStream.rangeClosed(1, 100).boxed()
            .map(number ->
                    new ProducerRecord<>(
                            TOPIC,
                            number, //Key
                            String.format("record-%s", number))) //Value
            .forEach(record -> producer.send(record));
    producer.close();
}
项目:talk-kafka-messaging-logs    文件:Compaction.java   
private static void produceRecords(String bootstrapServers) {
    Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

    Producer<Integer, String> producer = new KafkaProducer<>(properties);

    IntStream.rangeClosed(1, 10000).boxed()
            .map(number ->
                    new ProducerRecord<>(
                            TOPIC,
                            1, //Key
                            String.format("record-%s", number))) //Value
            .forEach(record -> producer.send(record));
    producer.close();
}
项目:talk-kafka-messaging-logs    文件:Retention.java   
private static void produceRecords(String bootstrapServers) {
    Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

    Producer<Integer, String> producer = new KafkaProducer<>(properties);

    IntStream
            .rangeClosed(1, 100000).boxed()
            .map(number ->
                    new ProducerRecord<>(
                            TOPIC,
                            1, //Key
                            String.format("record-%s", number))) //Value
            .forEach(record -> producer.send(record));
    producer.close();
}
项目:talk-kafka-messaging-logs    文件:Compaction.java   
private static void produceRecords() {
    Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());

    Producer<Integer, byte[]> producer = new KafkaProducer<>(properties);

    IntStream.rangeClosed(1, 10000).boxed()
            .map(number ->
                    new ProducerRecord<>(
                            TOPIC,
                            1, //Key
                            KafkaProducerUtil.createMessage(1000))) //Value
            .forEach(record -> {
                producer.send(record);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
    producer.close();
}
项目:talk-kafka-messaging-logs    文件:Retention.java   
private static void produceRecords(String bootstrapServers) {
    Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());

    Producer<Integer, byte[]> producer = new KafkaProducer<>(properties);

    IntStream.rangeClosed(1, 10000).boxed()
            .map(number ->
                    new ProducerRecord<>(
                            TOPIC,
                            1, //Key
                            KafkaProducerUtil.createMessage(1000))) //Value
            .forEach(record -> {
                producer.send(record);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
    producer.close();
}
项目:talk-kafka-messaging-logs    文件:KafkaSlowProducer.java   
private static void produceRecords(String bootstrapServers) {
    Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    properties.put(ProducerConfig.ACKS_CONFIG, "all");
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

    Producer<Integer, String> producer = new KafkaProducer<>(properties);

    IntStream.rangeClosed(1, 100).boxed()
            .map(number ->
                    new ProducerRecord<>(
                            TOPIC,
                            number, //Key
                            String.format("record-%s", number))) //Value
            .forEach(record -> {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                producer.send(record);
            });
    producer.close();
}
项目:kafka-0.11.0.0-src-with-comment    文件:KStreamAggregationIntegrationTest.java   
private void produceMessages(final long timestamp)
    throws ExecutionException, InterruptedException {
    IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
        streamOneInput,
        Arrays.asList(
            new KeyValue<>(1, "A"),
            new KeyValue<>(2, "B"),
            new KeyValue<>(3, "C"),
            new KeyValue<>(4, "D"),
            new KeyValue<>(5, "E")),
        TestUtils.producerConfig(
            CLUSTER.bootstrapServers(),
            IntegerSerializer.class,
            StringSerializer.class,
            new Properties()),
        timestamp);
}
项目:kafka-0.11.0.0-src-with-comment    文件:KStreamRepartitionJoinTest.java   
private void produceStreamTwoInputTo(final String streamTwoInput)
    throws ExecutionException, InterruptedException {
    IntegrationTestUtils.produceKeyValuesSynchronously(
        streamTwoInput,
        Arrays.asList(
            new KeyValue<>(1, "A"),
            new KeyValue<>(2, "B"),
            new KeyValue<>(3, "C"),
            new KeyValue<>(4, "D"),
            new KeyValue<>(5, "E")),
        TestUtils.producerConfig(
            CLUSTER.bootstrapServers(),
            IntegerSerializer.class,
            StringSerializer.class,
            new Properties()),
        mockTime);
}
项目:kafka-0.11.0.0-src-with-comment    文件:KStreamRepartitionJoinTest.java   
private void produceToStreamOne()
    throws ExecutionException, InterruptedException {
    IntegrationTestUtils.produceKeyValuesSynchronously(
        streamOneInput,
        Arrays.asList(
            new KeyValue<>(10L, 1),
            new KeyValue<>(5L, 2),
            new KeyValue<>(12L, 3),
            new KeyValue<>(15L, 4),
            new KeyValue<>(20L, 5),
            new KeyValue<Long, Integer>(70L, null)), // nulls should be filtered
        TestUtils.producerConfig(
            CLUSTER.bootstrapServers(),
            LongSerializer.class,
            IntegerSerializer.class,
            new Properties()),
        mockTime);
}
项目:kafka-0.11.0.0-src-with-comment    文件:KStreamAggregationDedupIntegrationTest.java   
private void produceMessages(long timestamp)
    throws ExecutionException, InterruptedException {
    IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
        streamOneInput,
        Arrays.asList(
            new KeyValue<>(1, "A"),
            new KeyValue<>(2, "B"),
            new KeyValue<>(3, "C"),
            new KeyValue<>(4, "D"),
            new KeyValue<>(5, "E")),
        TestUtils.producerConfig(
            CLUSTER.bootstrapServers(),
            IntegerSerializer.class,
            StringSerializer.class,
            new Properties()),
        timestamp);
}
项目:kafka-0.11.0.0-src-with-comment    文件:SimpleBenchmark.java   
private Properties setProduceConsumeProperties(final String clientId) {
    Properties props = new Properties();
    props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
    // the socket buffer needs to be large, especially when running in AWS with
    // high latency. if running locally the default is fine.
    props.put(ProducerConfig.SEND_BUFFER_CONFIG, SOCKET_SIZE_BYTES);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
    // the socket buffer needs to be large, especially when running in AWS with
    // high latency. if running locally the default is fine.
    props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, SOCKET_SIZE_BYTES);
    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, MAX_POLL_RECORDS);
    return props;
}
项目:fluid    文件:KafkaSinkTest.java   
@Test
public void testSinkWithInteger() throws InterruptedException {
    KafkaUsage usage = new KafkaUsage();
    String topic = UUID.randomUUID().toString();
    CountDownLatch latch = new CountDownLatch(1);
    AtomicInteger expected = new AtomicInteger(2);
    usage.consumeIntegers(topic, 10, 10, TimeUnit.SECONDS,
        latch::countDown,
        (k, v) -> v == expected.getAndIncrement());


    KafkaSink<Integer> sink = new KafkaSink<>(vertx,
        getKafkaConfig()
            .put("topic", topic)
            .put("value.serializer", IntegerSerializer.class.getName())
            .put("value.deserializer", IntegerDeserializer.class.getName())
    );


    Source.from(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
        .transformPayload(i -> i + 1)
        .to(sink);

    assertThat(latch.await(1, TimeUnit.MINUTES)).isTrue();
}
项目:fluid    文件:KafkaSourceTest.java   
@Test
public void testSource() throws InterruptedException {
  KafkaUsage usage = new KafkaUsage();
  String topic = UUID.randomUUID().toString();
  List<Integer> results = new ArrayList<>();
  KafkaSource<Integer> source = new KafkaSource<>(vertx,
    getKafkaConfig()
      .put("topic", topic)
      .put("value.serializer", IntegerSerializer.class.getName())
      .put("value.deserializer", IntegerDeserializer.class.getName())
  );
  source
    .transformPayload(i -> i + 1)
    .to(Sink.forEachPayload(results::add));

  AtomicInteger counter = new AtomicInteger();
  usage.produceIntegers(10, null,
    () -> new ProducerRecord<>(topic, counter.getAndIncrement()));

  await().atMost(1, TimeUnit.MINUTES).until(() -> results.size() >= 10);
  assertThat(results).containsExactly(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
}
项目:fluid    文件:KafkaSourceTest.java   
@Test
public void testCommonHeaders(TestContext context) throws InterruptedException {
  Async async = context.async();
  KafkaUsage usage = new KafkaUsage();
  String topic = UUID.randomUUID().toString();

  KafkaSource<Integer> source = new KafkaSource<>(vertx,
    getKafkaConfig()
      .put("topic", topic)
      .put("value.serializer", IntegerSerializer.class.getName())
      .put("value.deserializer", IntegerDeserializer.class.getName())
  );

  source
    .to(data -> {
      KafkaConsumerRecord record = original(data);
      assertThat(record).isNotNull();
      assertThat(key(data)).isNotNull();
      async.complete();
      return complete();
    });

  usage.produceIntegers(1, null,
    () -> new ProducerRecord<>(topic, "key", 1));
}
项目:fluid    文件:KafkaSourceTest.java   
@Test
public void testMulticastWithBufferSize() throws InterruptedException {
  KafkaUsage usage = new KafkaUsage();
  String topic = UUID.randomUUID().toString();

  KafkaSource<Integer> source = new KafkaSource<>(vertx,
    getKafkaConfig()
      .put("topic", topic)
      .put("value.serializer", IntegerSerializer.class.getName())
      .put("value.deserializer", IntegerDeserializer.class.getName())
      .put("multicast.buffer.size", 20)
  );

  assertThat(source).isNotNull();
  checkMulticast(usage, topic, source);

}
项目:fluid    文件:KafkaSourceTest.java   
@Test
public void testMulticastWithTime() throws InterruptedException {
  KafkaUsage usage = new KafkaUsage();
  String topic = UUID.randomUUID().toString();

  KafkaSource<Integer> source = new KafkaSource<>(vertx,
    getKafkaConfig()
      .put("topic", topic)
      .put("value.serializer", IntegerSerializer.class.getName())
      .put("value.deserializer", IntegerDeserializer.class.getName())
      .put("multicast.buffer.period.ms", 2000)
  );
  assertThat(source).isNotNull();
  checkMulticast(usage, topic, source);

}
项目:rmap    文件:SimpleKafkaIT.java   
@Test
public void testAutoCommit() throws Exception {
    LOG.info("Start testAutoCommit");
    ContainerProperties containerProps = new ContainerProperties("topic3", "topic4");
    final CountDownLatch latch = new CountDownLatch(4);
    containerProps.setMessageListener((MessageListener<Integer, String>) message -> {
        LOG.info("received: " + message);
        latch.countDown();
    });
    KafkaMessageListenerContainer<Integer, String> container = createContainer(containerProps,
            IntegerDeserializer.class, StringDeserializer.class);
    container.setBeanName("testAutoCommit");
    container.start();
    Thread.sleep(5000); // wait a bit for the container to start
    KafkaTemplate<Integer, String> template = createTemplate(IntegerSerializer.class, StringSerializer.class);
    template.setDefaultTopic("topic3");
    template.sendDefault(0, "foo");
    template.sendDefault(2, "bar");
    template.sendDefault(0, "baz");
    template.sendDefault(2, "qux");
    template.flush();
    assertTrue(latch.await(60, TimeUnit.SECONDS));
    container.stop();
    LOG.info("Stop testAutoCommit");
}
项目:javabase    文件:KafKaProducerAPITest.java   
/**
     * get kafkaProducer
     * Producer端的常用配置
     bootstrap.servers:Kafka集群连接串,可以由多个host:port组成
     acks:broker消息确认的模式,有三种:
     0:不进行消息接收确认,即Client端发送完成后不会等待Broker的确认
     1:由Leader确认,Leader接收到消息后会立即返回确认信息
     all:集群完整确认,Leader会等待所有in-sync的follower节点都确认收到消息后,再返回确认信息
     我们可以根据消息的重要程度,设置不同的确认模式。默认为1
     retries:发送失败时Producer端的重试次数,默认为0
     batch.size:当同时有大量消息要向同一个分区发送时,Producer端会将消息打包后进行批量发送。如果设置为0,则每条消息都独立发送。默认为16384字节
     linger.ms:发送消息前等待的毫秒数,与batch.size配合使用。在消息负载不高的情况下,配置linger.ms能够让Producer在发送消息前等待一定时间,以积累更多的消息打包发送,达到节省网络资源的目的。默认为0
     key.serializer/value.serializer:消息key/value的序列器Class,根据key和value的类型决定
     buffer.memory:消息缓冲池大小。尚未被发送的消息会保存在Producer的内存中,如果消息产生的速度大于消息发送的速度,那么缓冲池满后发送消息的请求会被阻塞。默认33554432字节(32MB)
     *
     *
     * @return
     */
    private static KafkaProducer<Integer, String> getProducer() {
        Properties properties = new Properties();
        //bootstrap.servers
//        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "123.56.118.135:9092,123.56.118.135:9093");
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "123.56.118.135:9092,123.56.118.135:9093");
        //client.id
        properties.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaProducerTest");
        //batch.size 当同时有大量消息要向同一个分区发送时,Producer端会将消息打包后进行批量发送。如果设置为0,则每条消息都独立发送。默认为16384字节
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
      //发送消息前等待的毫秒数,与batch.size配合使用。在消息负载不高的情况下,配置linger.ms能够让Producer在发送消息前等待一定时间,以积累更多的消息打包发送,达到节省网络资源的目的。默认为0
        properties.put(ProducerConfig.LINGER_MS_CONFIG,5000);
        //retries:发送失败时Producer端的重试次数,默认为0
        properties.put(ProducerConfig.RETRIES_CONFIG,0);
        //消息缓冲池大小。尚未被发送的消息会保存在Producer的内存中,如果消息产生的速度大于消息发送的速度,那么缓冲池满后发送消息的请求会被阻塞。默认33554432字节
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);
        //key 和 value serializer的类
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        return new KafkaProducer(properties);
    }
项目:incubator-pulsar    文件:ProducerExample.java   
public static void main(String[] args) {
    String topic = "persistent://sample/standalone/ns/my-topic";

    Properties props = new Properties();
    props.put("bootstrap.servers", "pulsar://localhost:6650");

    props.put("key.serializer", IntegerSerializer.class.getName());
    props.put("value.serializer", StringSerializer.class.getName());

    Producer<Integer, String> producer = new KafkaProducer<>(props);

    for (int i = 0; i < 10; i++) {
        producer.send(new ProducerRecord<Integer, String>(topic, i, Integer.toString(i)));
        log.info("Message {} sent successfully", i);
    }

    producer.close();
}
项目:KafkaExample    文件:WordCountTopology.java   
public static void main(String[] args) throws IOException {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount-processor");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka0:19092");
        props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "zookeeper0:12181/kafka");
        props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        TopologyBuilder builder = new TopologyBuilder();
        builder.addSource("SOURCE", new StringDeserializer(), new StringDeserializer(), "words")
                .addProcessor("WordCountProcessor", WordCountProcessor::new, "SOURCE")
                .addStateStore(Stores.create("Counts").withStringKeys().withIntegerValues().inMemory().build(), "WordCountProcessor")
//              .connectProcessorAndStateStores("WordCountProcessor", "Counts")
                .addSink("SINK", "count", new StringSerializer(), new IntegerSerializer(), "WordCountProcessor");

        KafkaStreams stream = new KafkaStreams(builder, props);
        stream.start();
        System.in.read();
        stream.close();
        stream.cleanUp();
    }
项目:beam    文件:KafkaIOTest.java   
MockProducerWrapper() {
  producerKey = String.valueOf(ThreadLocalRandom.current().nextLong());
  mockProducer = new MockProducer<Integer, Long>(
    false, // disable synchronous completion of send. see ProducerSendCompletionThread below.
    new IntegerSerializer(),
    new LongSerializer()) {

    // override flush() so that it does not complete all the waiting sends, giving a chance to
    // ProducerCompletionThread to inject errors.

    @Override
    public void flush() {
      while (completeNext()) {
        // there are some uncompleted records. let the completion thread handle them.
        try {
          Thread.sleep(10);
        } catch (InterruptedException e) {
          // ok to retry.
        }
      }
    }
  };

  // Add the producer to the global map so that producer factory function can access it.
  assertNull(MOCK_PRODUCER_MAP.putIfAbsent(producerKey, mockProducer));
}
项目:kafka-tests    文件:GeneratorProducer.java   
public GeneratorProducer(Properties configuration, String topic, int instances, int messagesPerGroup,
                         Duration producerSlowDown, Duration shutdownTimeout,
                         StateDao stateDao) {
    logger.info("Starting instance");

    this.topic = topic;
    this.messagesPerGroup = messagesPerGroup;
    this.producerSlowDown = producerSlowDown;
    this.shutdownTimeout = shutdownTimeout;
    this.stateDao = stateDao;

    this.producer = new KafkaProducer<>(configuration, new StringSerializer(), new IntegerSerializer());

    this.executor = Executors.newFixedThreadPool(instances,
            new ThreadFactoryBuilder()
                    .setNameFormat(getClass().getSimpleName() + "-" + instanceName + "-worker-" + "%d")
                    .setDaemon(false)
                    .build());

    IntStream.range(0, instances)
            .forEach((v) -> executor.submit(this));
}
项目:talk-kafka-messaging-logs    文件:ProducerAckOne.java   
private static void produceRecords(String bootstrapServers) {
    Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    properties.put(ProducerConfig.ACKS_CONFIG, "1");
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

    Producer<Integer, String> producer = new KafkaProducer<>(properties);

    RecordsProducer.produce("kafka_producer_ack_one_latency", producer, TOPIC);

    producer.close();
}
项目:talk-kafka-messaging-logs    文件:ProducerAckZero.java   
private static void produceRecords(String bootstrapServers) {
    Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    properties.put(ProducerConfig.ACKS_CONFIG, "0");
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

    Producer<Integer, String> producer = new KafkaProducer<>(properties);

    RecordsProducer.produce("kafka_producer_ack_zero_latency", producer, TOPIC);

    producer.close();
}
项目:talk-kafka-messaging-logs    文件:ProducerAckAll.java   
private static void produceRecords(String bootstrapServers) {
    Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    properties.put(ProducerConfig.ACKS_CONFIG, "all");
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

    Producer<Integer, String> producer = new KafkaProducer<>(properties);

    RecordsProducer.produce("kafka_producer_ack_all_latency", producer, TOPIC);

    producer.close();
}
项目:kafka-0.11.0.0-src-with-comment    文件:GlobalStateTaskTest.java   
@Test
public void shouldProcessRecordsForOtherTopic() throws Exception {
    final byte[] integerBytes = new IntegerSerializer().serialize("foo", 1);
    globalStateTask.initialize();
    globalStateTask.update(new ConsumerRecord<>("t2", 1, 1, integerBytes, integerBytes));
    assertEquals(1, sourceTwo.numReceived);
    assertEquals(0, sourceOne.numReceived);
}
项目:project-template    文件:KafkaConfig.java   
public Map<String, Object> producerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
    return props;
}
项目:post-to-kafka    文件:KafkaConfig.java   
@Bean
public Map<String, Object> producerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerAddress);
    props.put(ProducerConfig.RETRIES_CONFIG, 0);
    props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
    props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
    props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
    props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 20000000);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

    return props;
}
项目:javabase    文件:KafkaProducerTest.java   
/**
 * 构造KafkaProducer
 *
 * @return KafkaProducer
 */
private KafkaProducer<Integer, String> getProducer() {
    Properties properties = new Properties();
    //bootstrap.servers
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "123.56.118.135:9092");
    //client.id
    properties.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaProducerTest");
    //key 和 value serializer的类
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    return new KafkaProducer(properties);
}
项目:incubator-pulsar    文件:KafkaProducerTest.java   
@Test
public void testSimpleProducer() throws Exception {
    String topic = "persistent://sample/standalone/ns/testSimpleProducer";

    Consumer pulsarConsumer = pulsarClient.subscribe(topic, "my-subscription");

    Properties props = new Properties();
    props.put("bootstrap.servers", lookupUrl.toString());

    props.put("key.serializer", IntegerSerializer.class.getName());
    props.put("value.serializer", StringSerializer.class.getName());

    Producer<Integer, String> producer = new PulsarKafkaProducer<>(props);

    for (int i = 0; i < 10; i++) {
        producer.send(new ProducerRecord<Integer, String>(topic, i, "hello-" + i));
    }

    producer.flush();
    producer.close();

    for (int i = 0; i < 10; i++) {
        Message msg = pulsarConsumer.receive(1, TimeUnit.SECONDS);
        assertEquals(new String(msg.getData()), "hello-" + i);
        pulsarConsumer.acknowledge(msg);
    }
}
项目:incubator-pulsar    文件:KafkaProducerTest.java   
@Test(timeOut = 10000)
public void testProducerCallback() throws Exception {
    String topic = "persistent://sample/standalone/ns/testProducerCallback";

    Consumer pulsarConsumer = pulsarClient.subscribe(topic, "my-subscription");

    Properties props = new Properties();
    props.put("bootstrap.servers", lookupUrl.toString());

    props.put("key.serializer", IntegerSerializer.class.getName());
    props.put("value.serializer", StringSerializer.class.getName());

    Producer<Integer, String> producer = new PulsarKafkaProducer<>(props);

    CountDownLatch counter = new CountDownLatch(10);

    for (int i = 0; i < 10; i++) {
        producer.send(new ProducerRecord<Integer, String>(topic, i, "hello-" + i), (metadata, exception) -> {
            assertEquals(metadata.topic(), topic);
            assertNull(exception);

            counter.countDown();
        });
    }

    counter.await();

    for (int i = 0; i < 10; i++) {
        Message msg = pulsarConsumer.receive(1, TimeUnit.SECONDS);
        assertEquals(new String(msg.getData()), "hello-" + i);
        pulsarConsumer.acknowledge(msg);
    }

    producer.close();
}
项目:web    文件:ProducerConfiguration.java   
@Bean
public Map<String, Object> producerConfigs() {
    HashMap<String, Object> props = new HashMap<>();
    // list of host:port pairs used for establishing the initial connections
    // to the Kakfa cluster
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "www.netkiller.cn:9092");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    // value to block, after which it will throw a TimeoutException
    props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5000);

    return props;
}
项目:beam    文件:KafkaIOTest.java   
@Test
public void testSink() throws Exception {
  // Simply read from kafka source and write to kafka sink. Then verify the records
  // are correctly published to mock kafka producer.

  int numElements = 1000;

  try (MockProducerWrapper producerWrapper = new MockProducerWrapper()) {

    ProducerSendCompletionThread completionThread =
      new ProducerSendCompletionThread(producerWrapper.mockProducer).start();

    String topic = "test";

    p
      .apply(mkKafkaReadTransform(numElements, new ValueAsTimestampFn())
          .withoutMetadata())
      .apply(KafkaIO.<Integer, Long>write()
          .withBootstrapServers("none")
          .withTopic(topic)
          .withKeySerializer(IntegerSerializer.class)
          .withValueSerializer(LongSerializer.class)
          .withProducerFactoryFn(new ProducerFactoryFn(producerWrapper.producerKey)));

    p.run();

    completionThread.shutdown();

    verifyProducerRecords(producerWrapper.mockProducer, topic, numElements, false);
  }
}
项目:eventasia    文件:EventasiaKafkaConfig.java   
@Bean
public Map<String, Object> producerConfigs() {
    //FIXME: 12factorize
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokerList());
    props.put(ProducerConfig.RETRIES_CONFIG, retries);
    props.put(ProducerConfig.ACKS_CONFIG, acks);
    //props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
    //props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
    //props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
    props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, DefaultPartitioner.class);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    return props;
}
项目:hazelcast-jet    文件:KafkaTestSupport.java   
private KafkaProducer<Integer, String> getProducer() {
    if (producer == null) {
        Properties producerProps = new Properties();
        producerProps.setProperty("bootstrap.servers", BROKER_HOST + ':' + brokerPort);
        producerProps.setProperty("key.serializer", IntegerSerializer.class.getCanonicalName());
        producerProps.setProperty("value.serializer", StringSerializer.class.getCanonicalName());
        producer = new KafkaProducer<>(producerProps);
    }
    return producer;
}
项目:kafka-examples    文件:StreamingAvg.java   
public static void main(String[] args) throws Exception {
    Properties props = new Properties();

    props.put(StreamingConfig.JOB_ID_CONFIG, "moving-avg-example");
    props.put(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
    props.put(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
    props.put(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
    StreamingConfig config = new StreamingConfig(props);

    KStreamBuilder builder = new KStreamBuilder();

    KStream<String, Integer> prices = builder.stream("ks_prices");

    KTable<String, String> names = builder.table("ks_names");

    KStream<String, Integer> namedPrices = prices.leftJoin(names, (price, name) -> {
        return new NamedPrice(name, price);
    }).map((ticket, namedPrice) -> new KeyValue<String, Integer>(namedPrice.name, namedPrice.price));



    KTable<Windowed<String>, AvgValue> tempTable = namedPrices.<AvgValue, TumblingWindow>aggregateByKey(
            () -> new AvgAggregator<String, Integer, AvgValue>(),
            TumblingWindows.of("avgWindow").with(10000),
            new StringSerializer(), new AvgValueSerializer(),
            new StringDeserializer(), new AvgValueDeserializer());

    // Should work after we implement "aggregateByKey
    KTable<Windowed<String>, Double> avg = tempTable.<Double>mapValues((v) -> ((double) v.sum / v.count));

    avg.to("ks_avg_prices");


    KafkaStreaming kstream = new KafkaStreaming(builder, config);
    kstream.start();
}
项目:debezium-proto    文件:KafkaClusterTest.java   
@Test
public void shouldStartClusterAndAllowInteractiveProductionAndAutomaticConsumersToUseIt() throws Exception {
    Testing.Debug.enable();
    final String topicName = "topicA";
    final CountDownLatch completion = new CountDownLatch(1);
    final int numMessages = 3;
    final AtomicLong messagesRead = new AtomicLong(0);

    // Start a cluster and create a topic ...
    cluster.deleteDataUponShutdown(false).addBrokers(1).startup();
    cluster.createTopics(topicName);

    // Consume messages asynchronously ...
    Stopwatch sw = Stopwatch.reusable().start();
    cluster.useTo().consumeIntegers(topicName, numMessages, 10, TimeUnit.SECONDS, completion::countDown, (key, value) -> {
        messagesRead.incrementAndGet();
        return true;
    });

    // Produce some messages interactively ...
    cluster.useTo()
           .createProducer("manual", new StringSerializer(), new IntegerSerializer())
           .write(topicName, "key1", 1)
           .write(topicName, "key2", 2)
           .write(topicName, "key3", 3)
           .close();

    // Wait for the consumer to to complete ...
    if (completion.await(10, TimeUnit.SECONDS)) {
        sw.stop();
        Testing.debug("The consumer completed normally in " + sw.durations());
    } else {
        Testing.debug("Consumer did not completed normally");
    }

    assertThat(messagesRead.get()).isEqualTo(numMessages);
}
项目:debezium-proto    文件:KafkaClusterTest.java   
@Test
public void shouldStartClusterAndAllowAsynchronousProductionAndAutomaticConsumersToUseIt() throws Exception {
    Testing.Debug.enable();
    final String topicName = "topicA";
    final CountDownLatch completion = new CountDownLatch(2);
    final int numMessages = 3;
    final AtomicLong messagesRead = new AtomicLong(0);

    // Start a cluster and create a topic ...
    cluster.deleteDataUponShutdown(false).addBrokers(2).startup();
    cluster.createTopics(topicName);

    // Consume messages asynchronously ...
    Stopwatch sw = Stopwatch.reusable().start();
    cluster.useTo().consumeIntegers(topicName, numMessages, 10, TimeUnit.SECONDS, completion::countDown, (key, value) -> {
        messagesRead.incrementAndGet();
        return true;
    });

    // Produce some messages interactively ...
    cluster.useTo().produce("manual", new StringSerializer(), new IntegerSerializer(), produer -> {
        produer.write(topicName, "key1", 1);
        produer.write(topicName, "key2", 2);
        produer.write(topicName, "key3", 3);
        completion.countDown();
    });

    // Wait for the consumer to to complete ...
    if (completion.await(10, TimeUnit.SECONDS)) {
        sw.stop();
        Testing.debug("The consumer completed normally in " + sw.durations());
    } else {
        Testing.debug("Consumer did not completed normally");
    }
    assertThat(messagesRead.get()).isEqualTo(numMessages);
}
项目:kafka-0.11.0.0-src-with-comment    文件:EosTestDriver.java   
static void generate(final String kafka) throws Exception {

        Runtime.getRuntime().addShutdownHook(new Thread() {
            @Override
            public void run() {
                isRunning = false;
            }
        });

        final Properties producerProps = new Properties();
        producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "SmokeTest");
        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
        producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);

        final KafkaProducer<String, Integer> producer = new KafkaProducer<>(producerProps);

        final Random rand = new Random(System.currentTimeMillis());

        int numRecordsProduced = 0;
        while (isRunning) {
            final String key = "" + rand.nextInt(MAX_NUMBER_OF_KEYS);
            final int value = rand.nextInt(10000);

            final ProducerRecord<String, Integer> record = new ProducerRecord<>("data", key, value);

            producer.send(record, new Callback() {
                @Override
                public void onCompletion(final RecordMetadata metadata, final Exception exception) {
                    if (exception != null) {
                        exception.printStackTrace();
                        Exit.exit(1);
                    }
                }
            });

            numRecordsProduced++;
            if (numRecordsProduced % 1000 == 0) {
                System.out.println(numRecordsProduced + " records produced");
            }
            Utils.sleep(rand.nextInt(50));
        }
        producer.close();
        System.out.println(numRecordsProduced + " records produced");
    }