Java 类org.apache.kafka.common.serialization.StringSerializer 实例源码

项目:testcontainers-java-module-confluent-platform    文件:HelloProducer.java   
public void createProducer(String bootstrapServer) {
  long numberOfEvents = 5;

  Properties props = new Properties();
  props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
  props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

  KafkaProducer<String, String> producer = new KafkaProducer<>(
      props);

  for (int i = 0; i < numberOfEvents; i++) {
    String key = "testContainers";
    String value = "AreAwesome";
    ProducerRecord<String, String> record = new ProducerRecord<>(
        "hello_world_topic", key, value);
    try {
      producer.send(record).get();
    } catch (InterruptedException | ExecutionException e) {
      e.printStackTrace();
    }
    System.out.printf("key = %s, value = %s\n", key, value);
  }

  producer.close();
}
项目:ja-micro    文件:KafkaPublisher.java   
public void initialize(String servers) {
    if (isInitialized.get()) {
        logger.warn("Already initialized");
        return;
    }

    Properties props = new Properties();
    props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, servers);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, SixtPartitioner.class.getName());
    props.put(ProducerConfig.RETRIES_CONFIG, "3");
    props.put(ProducerConfig.ACKS_CONFIG, "all");

    properties.forEach(props::put);

    realProducer = new KafkaProducer<>(props);
    isInitialized.set(true);
}
项目:docker-kafka-streams    文件:MetricsResource.java   
/**
 * Metrics for a machine
 *
 * @param machine
 * @return the metric
 */
@GET
@Path("{machine}")
@Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
public Response getMachineMetric(@PathParam("machine") String machine) {
    LOGGER.log(Level.INFO, "Fetching metrics for machine {0}", machine);

    KafkaStreams ks = GlobalAppState.getInstance().getKafkaStreams();
    HostInfo thisInstance = GlobalAppState.getInstance().getHostPortInfo();

    Metrics metrics = null;

    StreamsMetadata metadataForMachine = ks.metadataForKey(storeName, machine, new StringSerializer());

    if (metadataForMachine.host().equals(thisInstance.host()) && metadataForMachine.port() == thisInstance.port()) {
        LOGGER.log(Level.INFO, "Querying local store for machine {0}", machine);
        metrics = getLocalMetrics(machine);
    } else {
        //LOGGER.log(Level.INFO, "Querying remote store for machine {0}", machine);
        String url = "http://" + metadataForMachine.host() + ":" + metadataForMachine.port() + "/metrics/remote/" + machine;
        metrics = Utils.getRemoteStoreState(url, 2, TimeUnit.SECONDS);
        LOGGER.log(Level.INFO, "Metric from remote store at {0} == {1}", new Object[]{url, metrics});
    }

    return Response.ok(metrics).build();
}
项目:j1st-mqtt    文件:KafkaApplicationCommunicator.java   
@Override
public void init(AbstractConfiguration config, ApplicationListenerFactory factory) {
    init(config);

    logger.trace("Initializing Kafka consumer ...");

    // consumer config
    Properties props = new Properties();
    props.put("bootstrap.servers", config.getString("bootstrap.servers"));
    props.put("group.id", config.getString("group.id"));
    props.put("enable.auto.commit", "true");
    props.put("key.serializer", StringSerializer.class.getName());
    props.put("value.serializer", InternalMessageSerializer.class.getName());

    // consumer
    this.consumer = new KafkaConsumer<>(props);

    // consumer worker
    this.worker = new KafkaApplicationWorker(this.consumer, APPLICATION_TOPIC, factory.newListener());
    this.executor.submit(this.worker);
}
项目:j1st-mqtt    文件:KafkaBrokerCommunicator.java   
@Override
public void init(AbstractConfiguration config, String brokerId, BrokerListenerFactory factory) {
    init(config);

    BROKER_TOPIC = BROKER_TOPIC_PREFIX + "." + brokerId;

    logger.trace("Initializing Kafka consumer ...");

    // consumer config
    Properties props = new Properties();
    props.put("bootstrap.servers", config.getString("bootstrap.servers"));
    props.put("group.id", UUIDs.shortUuid());
    props.put("enable.auto.commit", "true");
    props.put("key.serializer", StringSerializer.class.getName());
    props.put("value.serializer", InternalMessageSerializer.class.getName());

    // consumer
    this.consumer = new KafkaConsumer<>(props);

    // consumer worker
    this.worker = new KafkaBrokerWorker(this.consumer, BROKER_TOPIC, factory.newListener());
    this.executor.submit(this.worker);
}
项目:j1st-mqtt    文件:KafkaCommunicator.java   
protected void init(AbstractConfiguration config) {
    BROKER_TOPIC_PREFIX = config.getString("communicator.broker.topic");
    APPLICATION_TOPIC = config.getString("communicator.application.topic");

    logger.trace("Initializing Kafka producer ...");

    // producer config
    Properties props = new Properties();
    props.put("bootstrap.servers", config.getString("bootstrap.servers"));
    props.put("acks", config.getString("acks"));
    props.put("key.serializer", StringSerializer.class.getName());
    props.put("value.serializer", InternalMessageSerializer.class.getName());

    // producer
    this.producer = new KafkaProducer<>(props);

    // consumer executor
    this.executor = Executors.newSingleThreadExecutor();
}
项目:kafka-webview    文件:WebKafkaConsumerTest.java   
public void publishDummyData() {
    final String topic = "TestTopic";

    // Create publisher
    final Map<String, Object> config = new HashMap<>();
    config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

    final KafkaProducer<String, String> producer = new KafkaProducer<>(config);
    for (int charCode = 65; charCode < 91; charCode++) {
        final char[] key = new char[1];
        key[0] = (char) charCode;

        producer.send(new ProducerRecord<>(topic, new String(key), new String(key)));
    }
    producer.flush();
    producer.close();
}
项目:post-kafka-rewind-consumer-offset    文件:KafkaSimpleProducer.java   
public static void main(String[] args) {
    Configuration config = ConfigurationProvider.getConfiguration();

    String bootstrapServers = config.getOrDefault("kafka.bootstrap_servers", "localhost:9092");

    Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

    Producer<String, String> producer = new KafkaProducer<>(properties);

    IntStream.rangeClosed(1, 100)
            .boxed()
            .map(number -> new ProducerRecord<>(
                    "topic-1",
                    number.toString(),
                    number.toString()))
            .map(record -> producer.send(record))
            .forEach(result -> printMetadata(result));
    producer.close();
}
项目:kafka-streams-example    文件:MetricsResource.java   
/**
 * Metrics for a machine
 *
 * @param machine
 * @return the metric
 */
@GET
@Path("{machine}")
@Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
public Response getMachineMetric(@PathParam("machine") String machine) {
    LOGGER.log(Level.INFO, "Fetching metrics for machine {0}", machine);

    KafkaStreams ks = GlobalAppState.getInstance().getKafkaStreams();
    HostInfo thisInstance = GlobalAppState.getInstance().getHostPortInfo();

    Metrics metrics = null;

    StreamsMetadata metadataForMachine = ks.metadataForKey(storeName, machine, new StringSerializer());

    if (metadataForMachine.host().equals(thisInstance.host()) && metadataForMachine.port() == thisInstance.port()) {
        LOGGER.log(Level.INFO, "Querying local store for machine {0}", machine);
        metrics = getLocalMetrics(machine);
    } else {
        //LOGGER.log(Level.INFO, "Querying remote store for machine {0}", machine);
        String url = "http://" + metadataForMachine.host() + ":" + metadataForMachine.port() + "/metrics/remote/" + machine;
        metrics = Utils.getRemoteStoreState(url, 2, TimeUnit.SECONDS);
        LOGGER.log(Level.INFO, "Metric from remote store at {0} == {1}", new Object[]{url, metrics});
    }

    return Response.ok(metrics).build();
}
项目:talk-kafka-messaging-logs    文件:ProduceConsumeStringAvroRecord.java   
private static void produceRecords(String bootstrapServers) {
    Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());

    Producer<String, byte[]> producer = new KafkaProducer<>(properties);

    IntStream.rangeClosed(1, 100).boxed()
            .map(number -> new ProducerRecord<>(
                    TOPIC, //topic
                    number.toString(), //key
                    UserAvroSerdes.serialize(new User(String.format("user-%s", number.toString()))))) //value
            .forEach(record -> producer.send(record));
    producer.close();
}
项目:talk-kafka-messaging-logs    文件:ProduceConsumeIntegerStringRecord.java   
private static void produceRecords(String bootstrapServers) {
    Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

    Producer<Integer, String> producer = new KafkaProducer<>(properties);

    IntStream.rangeClosed(1, 100).boxed()
            .map(number ->
                    new ProducerRecord<>(
                            TOPIC,
                            number, //Key
                            String.format("record-%s", number))) //Value
            .forEach(record -> producer.send(record));
    producer.close();
}
项目:talk-kafka-messaging-logs    文件:KafkaProducer.java   
public static void main(String[] args) {
    final ActorSystem system = ActorSystem.create("KafkaProducerSystem");

    final Materializer materializer = ActorMaterializer.create(system);

    final ProducerSettings<byte[], String> producerSettings =
            ProducerSettings
                    .create(system, new ByteArraySerializer(), new StringSerializer())
                    .withBootstrapServers("localhost:9092");

    CompletionStage<Done> done =
            Source.range(1, 100)
                    .map(n -> n.toString())
                    .map(elem ->
                            new ProducerRecord<byte[], String>(
                                    "topic1-ts",
                                    0,
                                    Instant.now().getEpochSecond(),
                                    null,
                                    elem))
                    .runWith(Producer.plainSink(producerSettings), materializer);

    done.whenComplete((d, ex) -> System.out.println("sent"));
}
项目:talk-kafka-messaging-logs    文件:Compaction.java   
private static void produceRecords(String bootstrapServers) {
    Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

    Producer<Integer, String> producer = new KafkaProducer<>(properties);

    IntStream.rangeClosed(1, 10000).boxed()
            .map(number ->
                    new ProducerRecord<>(
                            TOPIC,
                            1, //Key
                            String.format("record-%s", number))) //Value
            .forEach(record -> producer.send(record));
    producer.close();
}
项目:talk-kafka-messaging-logs    文件:Retention.java   
private static void produceRecords(String bootstrapServers) {
    Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

    Producer<Integer, String> producer = new KafkaProducer<>(properties);

    IntStream
            .rangeClosed(1, 100000).boxed()
            .map(number ->
                    new ProducerRecord<>(
                            TOPIC,
                            1, //Key
                            String.format("record-%s", number))) //Value
            .forEach(record -> producer.send(record));
    producer.close();
}
项目:talk-kafka-messaging-logs    文件:KafkaSlowProducer.java   
private static void produceRecords(String bootstrapServers) {
    Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    properties.put(ProducerConfig.ACKS_CONFIG, "all");
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

    Producer<Integer, String> producer = new KafkaProducer<>(properties);

    IntStream.rangeClosed(1, 100).boxed()
            .map(number ->
                    new ProducerRecord<>(
                            TOPIC,
                            number, //Key
                            String.format("record-%s", number))) //Value
            .forEach(record -> {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                producer.send(record);
            });
    producer.close();
}
项目:kafka-0.11.0.0-src-with-comment    文件:KafkaConfigBackingStore.java   
KafkaBasedLog<String, byte[]> setupAndCreateKafkaBasedLog(String topic, final WorkerConfig config) {
    Map<String, Object> producerProps = new HashMap<>();
    producerProps.putAll(config.originals());
    producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
    producerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);

    Map<String, Object> consumerProps = new HashMap<>();
    consumerProps.putAll(config.originals());
    consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());

    Map<String, Object> adminProps = new HashMap<>(config.originals());
    NewTopic topicDescription = TopicAdmin.defineTopic(topic).
            compacted().
            partitions(1).
            replicationFactor(config.getShort(DistributedConfig.CONFIG_STORAGE_REPLICATION_FACTOR_CONFIG)).
            build();

    return createKafkaBasedLog(topic, producerProps, consumerProps, new ConsumeCallback(), topicDescription, adminProps);
}
项目:kafka-0.11.0.0-src-with-comment    文件:KStreamAggregationIntegrationTest.java   
private void produceMessages(final long timestamp)
    throws ExecutionException, InterruptedException {
    IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
        streamOneInput,
        Arrays.asList(
            new KeyValue<>(1, "A"),
            new KeyValue<>(2, "B"),
            new KeyValue<>(3, "C"),
            new KeyValue<>(4, "D"),
            new KeyValue<>(5, "E")),
        TestUtils.producerConfig(
            CLUSTER.bootstrapServers(),
            IntegerSerializer.class,
            StringSerializer.class,
            new Properties()),
        timestamp);
}
项目:kafka-0.11.0.0-src-with-comment    文件:KStreamRepartitionJoinTest.java   
private void produceStreamTwoInputTo(final String streamTwoInput)
    throws ExecutionException, InterruptedException {
    IntegrationTestUtils.produceKeyValuesSynchronously(
        streamTwoInput,
        Arrays.asList(
            new KeyValue<>(1, "A"),
            new KeyValue<>(2, "B"),
            new KeyValue<>(3, "C"),
            new KeyValue<>(4, "D"),
            new KeyValue<>(5, "E")),
        TestUtils.producerConfig(
            CLUSTER.bootstrapServers(),
            IntegerSerializer.class,
            StringSerializer.class,
            new Properties()),
        mockTime);
}
项目:kafka-0.11.0.0-src-with-comment    文件:GlobalKTableIntegrationTest.java   
private void produceTopicValues(final String topic) throws java.util.concurrent.ExecutionException, InterruptedException {
    IntegrationTestUtils.produceKeyValuesSynchronously(
            topic,
            Arrays.asList(
                    new KeyValue<>("a", 1L),
                    new KeyValue<>("b", 2L),
                    new KeyValue<>("c", 3L),
                    new KeyValue<>("d", 4L),
                    new KeyValue<>("e", 5L)),
            TestUtils.producerConfig(
                    CLUSTER.bootstrapServers(),
                    StringSerializer.class,
                    LongSerializer.class,
                    new Properties()),
            mockTime);
}
项目:kafka-0.11.0.0-src-with-comment    文件:GlobalKTableIntegrationTest.java   
private void produceGlobalTableValues() throws java.util.concurrent.ExecutionException, InterruptedException {
    IntegrationTestUtils.produceKeyValuesSynchronously(
            globalOne,
            Arrays.asList(
                    new KeyValue<>(1L, "F"),
                    new KeyValue<>(2L, "G"),
                    new KeyValue<>(3L, "H"),
                    new KeyValue<>(4L, "I"),
                    new KeyValue<>(5L, "J")),
            TestUtils.producerConfig(
                    CLUSTER.bootstrapServers(),
                    LongSerializer.class,
                    StringSerializer.class,
                    new Properties()),
            mockTime);
}
项目:kafka-0.11.0.0-src-with-comment    文件:KStreamAggregationDedupIntegrationTest.java   
private void produceMessages(long timestamp)
    throws ExecutionException, InterruptedException {
    IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
        streamOneInput,
        Arrays.asList(
            new KeyValue<>(1, "A"),
            new KeyValue<>(2, "B"),
            new KeyValue<>(3, "C"),
            new KeyValue<>(4, "D"),
            new KeyValue<>(5, "E")),
        TestUtils.producerConfig(
            CLUSTER.bootstrapServers(),
            IntegerSerializer.class,
            StringSerializer.class,
            new Properties()),
        timestamp);
}
项目:kafka-0.11.0.0-src-with-comment    文件:JoinIntegrationTest.java   
@BeforeClass
public static void setupConfigsAndUtils() throws Exception {
    PRODUCER_CONFIG.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
    PRODUCER_CONFIG.put(ProducerConfig.ACKS_CONFIG, "all");
    PRODUCER_CONFIG.put(ProducerConfig.RETRIES_CONFIG, 0);
    PRODUCER_CONFIG.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
    PRODUCER_CONFIG.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

    RESULT_CONSUMER_CONFIG.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
    RESULT_CONSUMER_CONFIG.put(ConsumerConfig.GROUP_ID_CONFIG, APP_ID + "-result-consumer");
    RESULT_CONSUMER_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    RESULT_CONSUMER_CONFIG.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
    RESULT_CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

    STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
    STREAMS_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    STREAMS_CONFIG.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
    STREAMS_CONFIG.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
    STREAMS_CONFIG.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    STREAMS_CONFIG.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
    STREAMS_CONFIG.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
    STREAMS_CONFIG.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
}
项目:kafka-0.11.0.0-src-with-comment    文件:ResetIntegrationTest.java   
private void prepareInputData() throws Exception {
    CLUSTER.deleteAndRecreateTopics(INPUT_TOPIC, OUTPUT_TOPIC, OUTPUT_TOPIC_2, OUTPUT_TOPIC_2_RERUN);

    final Properties producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(), LongSerializer.class, StringSerializer.class);

    mockTime.sleep(10);
    IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(0L, "aaa")), producerConfig, mockTime.milliseconds());
    mockTime.sleep(10);
    IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "bbb")), producerConfig, mockTime.milliseconds());
    mockTime.sleep(10);
    IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(0L, "ccc")), producerConfig, mockTime.milliseconds());
    mockTime.sleep(10);
    IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "ddd")), producerConfig, mockTime.milliseconds());
    mockTime.sleep(10);
    IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(0L, "eee")), producerConfig, mockTime.milliseconds());
    mockTime.sleep(10);
    IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "fff")), producerConfig, mockTime.milliseconds());
    mockTime.sleep(1);
    IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(0L, "ggg")), producerConfig, mockTime.milliseconds());
    mockTime.sleep(1);
    IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "hhh")), producerConfig, mockTime.milliseconds());
    mockTime.sleep(1);
    IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(0L, "iii")), producerConfig, mockTime.milliseconds());
    mockTime.sleep(1);
    IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "jjj")), producerConfig, mockTime.milliseconds());
}
项目:kafka-0.11.0.0-src-with-comment    文件:QueryableStateIntegrationTest.java   
@Override
public void run() {
    final Properties producerConfig = new Properties();
    producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
    producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
    producerConfig.put(ProducerConfig.RETRIES_CONFIG, 10);
    producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

    try (final KafkaProducer<String, String> producer =
                 new KafkaProducer<>(producerConfig, new StringSerializer(), new StringSerializer())) {

        while (getCurrIteration() < numIterations && !shutdown) {
            for (final String value : inputValues) {
                producer.send(new ProducerRecord<String, String>(topic, value));
            }
            incrementInteration();
        }
    }
}
项目:kafka-0.11.0.0-src-with-comment    文件:WindowedStreamPartitionerTest.java   
@Test
public void testWindowedSerializerNoArgConstructors() {
    Map<String, String> props = new HashMap<>();
    // test key[value].serializer.inner.class takes precedence over serializer.inner.class
    WindowedSerializer<StringSerializer> windowedSerializer = new WindowedSerializer<>();
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "host:1");
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
    props.put("key.serializer.inner.class", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("serializer.inner.class", "org.apache.kafka.common.serialization.StringSerializer");
    windowedSerializer.configure(props, true);
    Serializer<?> inner = windowedSerializer.innerSerializer();
    assertNotNull("Inner serializer should be not null", inner);
    assertTrue("Inner serializer type should be StringSerializer", inner instanceof StringSerializer);
    // test serializer.inner.class
    props.put("serializer.inner.class", "org.apache.kafka.common.serialization.ByteArraySerializer");
    props.remove("key.serializer.inner.class");
    props.remove("value.serializer.inner.class");
    WindowedSerializer<?> windowedSerializer1 = new WindowedSerializer<>();
    windowedSerializer1.configure(props, false);
    Serializer<?> inner1 = windowedSerializer1.innerSerializer();
    assertNotNull("Inner serializer should be not null", inner1);
    assertTrue("Inner serializer type should be ByteArraySerializer", inner1 instanceof ByteArraySerializer);
}
项目:kafka-0.11.0.0-src-with-comment    文件:WindowedStreamPartitionerTest.java   
@Test
public void testWindowedDeserializerNoArgConstructors() {
    Map<String, String> props = new HashMap<>();
    // test key[value].deserializer.inner.class takes precedence over serializer.inner.class
    WindowedDeserializer<StringSerializer> windowedDeserializer = new WindowedDeserializer<>();
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "host:1");
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
    props.put("key.deserializer.inner.class", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("deserializer.inner.class", "org.apache.kafka.common.serialization.StringDeserializer");
    windowedDeserializer.configure(props, true);
    Deserializer<?> inner = windowedDeserializer.innerDeserializer();
    assertNotNull("Inner deserializer should be not null", inner);
    assertTrue("Inner deserializer type should be StringDeserializer", inner instanceof StringDeserializer);
    // test deserializer.inner.class
    props.put("deserializer.inner.class", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
    props.remove("key.deserializer.inner.class");
    props.remove("value.deserializer.inner.class");
    WindowedDeserializer<?> windowedDeserializer1 = new WindowedDeserializer<>();
    windowedDeserializer1.configure(props, false);
    Deserializer<?> inner1 = windowedDeserializer1.innerDeserializer();
    assertNotNull("Inner deserializer should be not null", inner1);
    assertTrue("Inner deserializer type should be ByteArrayDeserializer", inner1 instanceof ByteArrayDeserializer);
}
项目:kafka-0.11.0.0-src-with-comment    文件:KafkaProducerTest.java   
@Test
public void testInterceptorConstructClose() throws Exception {
    try {
        Properties props = new Properties();
        // test with client ID assigned by KafkaProducer
        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
        props.setProperty(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, MockProducerInterceptor.class.getName());
        props.setProperty(MockProducerInterceptor.APPEND_STRING_PROP, "something");

        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(
                props, new StringSerializer(), new StringSerializer());
        assertEquals(1, MockProducerInterceptor.INIT_COUNT.get());
        assertEquals(0, MockProducerInterceptor.CLOSE_COUNT.get());

        // Cluster metadata will only be updated on calling onSend.
        Assert.assertNull(MockProducerInterceptor.CLUSTER_META.get());

        producer.close();
        assertEquals(1, MockProducerInterceptor.INIT_COUNT.get());
        assertEquals(1, MockProducerInterceptor.CLOSE_COUNT.get());
    } finally {
        // cleanup since we are using mutable static variables in MockProducerInterceptor
        MockProducerInterceptor.resetCounters();
    }
}
项目:kafka-0.11.0.0-src-with-comment    文件:KafkaProducerTest.java   
@Test
public void testPartitionerClose() throws Exception {
    try {
        Properties props = new Properties();
        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
        props.setProperty(ProducerConfig.PARTITIONER_CLASS_CONFIG, MockPartitioner.class.getName());

        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(
                props, new StringSerializer(), new StringSerializer());
        assertEquals(1, MockPartitioner.INIT_COUNT.get());
        assertEquals(0, MockPartitioner.CLOSE_COUNT.get());

        producer.close();
        assertEquals(1, MockPartitioner.INIT_COUNT.get());
        assertEquals(1, MockPartitioner.CLOSE_COUNT.get());
    } finally {
        // cleanup since we are using mutable static variables in MockPartitioner
        MockPartitioner.resetCounters();
    }
}
项目:seldon-core    文件:KafkaRequestResponseProducer.java   
@Autowired
public KafkaRequestResponseProducer(@Value("${seldon.kafka.enable}") boolean kafkaEnabled) 
{
    if (kafkaEnabled)
    {
        enabled = true;
        String kafkaHostPort = System.getenv(ENV_VAR_SELDON_KAFKA_SERVER);
        logger.info(String.format("using %s[%s]", ENV_VAR_SELDON_KAFKA_SERVER, kafkaHostPort));
        if (kafkaHostPort == null) {
            logger.warn("*WARNING* SELDON_KAFKA_SERVER environment variable not set!");
            kafkaHostPort = "localhost:9093";
        }
        logger.info("Starting kafka client with server "+kafkaHostPort);
        Properties props = new Properties();
        props.put("bootstrap.servers", kafkaHostPort);
        props.put("client.id", "RequestResponseProducer");
        props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "1000");
        props.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, "1000");
        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG,"20"); //NB need to investigate issues of Kafka not able to get metadata
        producer = new KafkaProducer<>(props, new StringSerializer(), new RequestResponseSerializer());
    }
    else
        logger.warn("Kafka not enabled");
}
项目:fluid    文件:KafkaSinkTest.java   
@Test
public void testSinkWithString() throws InterruptedException {
    KafkaUsage usage = new KafkaUsage();
    String topic = UUID.randomUUID().toString();
    CountDownLatch latch = new CountDownLatch(1);
    List<String> values = new ArrayList<>();
    usage.consumeStrings(topic, 10, 10, TimeUnit.SECONDS,
        latch::countDown,
        (k, v) -> values.contains(v));

    KafkaSink<String> sink = new KafkaSink<>(vertx,
        getKafkaConfig()
            .put("topic", topic)
            .put("value.serializer", StringSerializer.class.getName())
            .put("value.deserializer", StringDeserializer.class.getName())
    );


    Stream<String> stream = new Random().longs(10).mapToObj(Long::toString);
    Source.fromPayloads(stream)
        .onPayload(values::add)
        .to(sink);

  assertThat(latch.await(1, TimeUnit.MINUTES)).isTrue();
}
项目:SkyEye    文件:KafkaManager.java   
public KafkaManager(final LoggerContext loggerContext, final String name, final String topic, final String zkServers, final String mail, final  String rpc,
                    final String app, final String host, final Property[] properties) {
    super(loggerContext, name);
    this.topic = topic;
    this.zkServers = zkServers;
    this.mail = mail;
    this.rpc = rpc;
    this.app = app;
    this.orginApp = app;
    this.host = host;
    this.checkAndSetConfig(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
    this.checkAndSetConfig(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    // 设置分区类, 使用自定义的KeyModPartitioner,同样的key进入相同的partition
    this.checkAndSetConfig(ProducerConfig.PARTITIONER_CLASS_CONFIG, KeyModPartitioner.class.getName());
    // xml配置里面的参数
    for (final Property property : properties) {
        this.config.put(property.getName(), property.getValue());
    }
    // 由于容器部署需要从外部获取host
    this.config.put(ProducerConfig.CLIENT_ID_CONFIG, this.app + Constants.MIDDLE_LINE + this.host + Constants.MIDDLE_LINE + "log4j2");
}
项目:rmap    文件:SimpleKafkaIT.java   
@Test
public void testAutoCommit() throws Exception {
    LOG.info("Start testAutoCommit");
    ContainerProperties containerProps = new ContainerProperties("topic3", "topic4");
    final CountDownLatch latch = new CountDownLatch(4);
    containerProps.setMessageListener((MessageListener<Integer, String>) message -> {
        LOG.info("received: " + message);
        latch.countDown();
    });
    KafkaMessageListenerContainer<Integer, String> container = createContainer(containerProps,
            IntegerDeserializer.class, StringDeserializer.class);
    container.setBeanName("testAutoCommit");
    container.start();
    Thread.sleep(5000); // wait a bit for the container to start
    KafkaTemplate<Integer, String> template = createTemplate(IntegerSerializer.class, StringSerializer.class);
    template.setDefaultTopic("topic3");
    template.sendDefault(0, "foo");
    template.sendDefault(2, "bar");
    template.sendDefault(0, "baz");
    template.sendDefault(2, "qux");
    template.flush();
    assertTrue(latch.await(60, TimeUnit.SECONDS));
    container.stop();
    LOG.info("Stop testAutoCommit");
}
项目:wombat    文件:KafkaOutputter.java   
public KafkaOutputter(String kafkaBroker, int kafkaPort,
        String kafkaTopicBase) {
    sortedMapper = new ObjectMapper();
    sortedMapper.configure(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS, true);

    unsortedMapper = new ObjectMapper();

    // Kafka producer
    Properties props = new Properties();
    props.put("bootstrap.servers", kafkaBroker + ":" + kafkaPort);
    props.put("request.required.acks", "all");


    this.kafkaTopicBase = kafkaTopicBase;

    kafkaProducer = new KafkaProducer<String, String>(props,
            new StringSerializer(),
            new StringSerializer());

    sortedMapper = new ObjectMapper();
    sortedMapper.configure(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS, true);

    unsortedMapper = new ObjectMapper();

    responses = new ArrayList<Future<RecordMetadata>>();
}
项目:OBP-JVM    文件:SimpleTransport.java   
/**
 * Configure the kafka producer.
 *
 * @return props to use for producer when no properties file is given to ctor.
 */
protected Map<String, Object> producer()
{
  Map<String, Object> props = new HashMap<>();

  props.put("bootstrap.servers", "localhost:9092");
  props.put("acks", "all");
  props.put("retries", 0);
  props.put("batch.size", 16384);
  props.put("linger.ms", 1);
  props.put("buffer.memory", 33554432);
  props.put("key.serializer", StringSerializer.class.getName());
  props.put("value.serializer", StringSerializer.class.getName());

  return props;
}
项目:kafka    文件:KafkaProducerTest.java   
@Test
public void testInterceptorConstructClose() throws Exception {
    try {
        Properties props = new Properties();
        // test with client ID assigned by KafkaProducer
        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
        props.setProperty(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, MockProducerInterceptor.class.getName());
        props.setProperty(MockProducerInterceptor.APPEND_STRING_PROP, "something");

        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(
                props, new StringSerializer(), new StringSerializer());
        Assert.assertEquals(1, MockProducerInterceptor.INIT_COUNT.get());
        Assert.assertEquals(0, MockProducerInterceptor.CLOSE_COUNT.get());

        producer.close();
        Assert.assertEquals(1, MockProducerInterceptor.INIT_COUNT.get());
        Assert.assertEquals(1, MockProducerInterceptor.CLOSE_COUNT.get());
    } finally {
        // cleanup since we are using mutable static variables in MockProducerInterceptor
        MockProducerInterceptor.resetCounters();
    }
}
项目:vertx-kafka-client    文件:AdminUtilsTest.java   
@Test
public void testCreateTopic(TestContext ctx) throws Exception {
  final String topicName = "testCreateTopic";
  Properties config = kafkaCluster.useTo().getProducerProperties("the_producer");
  config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

  AdminUtils adminUtils = AdminUtils.create(vertx, zookeeperHosts, false);

  Async createAsync = ctx.async();

  adminUtils.createTopic(topicName, 1, 1,
    ctx.asyncAssertSuccess(
    res -> createAsync.complete())
  );

  createAsync.awaitSuccess(10000);

  Async deleteAsync = ctx.async();
  adminUtils.deleteTopic(topicName, ctx.asyncAssertSuccess(res -> deleteAsync.complete()));
  deleteAsync.awaitSuccess(10000);
}
项目:vertx-kafka-client    文件:AdminUtilsTest.java   
@Test
public void testCreateTopicWithTooManyReplicas(TestContext ctx) throws Exception {
  final String topicName = "testCreateTopicWithTooManyReplicas";
  Properties config = kafkaCluster.useTo().getProducerProperties("the_producer");
  config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  Async async = ctx.async();

  AdminUtils adminUtils = AdminUtils.create(vertx, zookeeperHosts, true);

  adminUtils.createTopic(topicName, 1, 2,
    ctx.asyncAssertFailure(
      res -> {
        ctx.assertEquals("Replication factor: 2 larger than available brokers: 1.", res.getLocalizedMessage(),
          "Topic creation must fail: only one Broker present, but two replicas requested");
        async.complete();
      })
  );

  async.awaitSuccess(10000);
}
项目:vertx-kafka-client    文件:AdminUtilsTest.java   
@Test
public void testTopicExists(TestContext ctx) throws Exception {
  final String topicName = "testTopicExists";
  Properties config = kafkaCluster.useTo().getProducerProperties("the_producer");
  config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  Async createAsync = ctx.async();

  AdminUtils adminUtils = AdminUtils.create(vertx, zookeeperHosts, false);

  adminUtils.createTopic(topicName, 2, 1,
    ctx.asyncAssertSuccess(
      res -> createAsync.complete())
  );

  createAsync.awaitSuccess(10000);

  Async existsAndDeleteAsync = ctx.async(2);
  adminUtils.topicExists(topicName, ctx.asyncAssertSuccess(res -> existsAndDeleteAsync.countDown()));
  adminUtils.deleteTopic(topicName, ctx.asyncAssertSuccess(res -> existsAndDeleteAsync.countDown()));

  existsAndDeleteAsync.awaitSuccess(10000);
}
项目:vertx-kafka-client    文件:AdminUtilsTest.java   
@Test
public void testTopicExistsNonExisting(TestContext ctx) throws Exception {
  final String topicName = "testTopicExistsNonExisting";
  Properties config = kafkaCluster.useTo().getProducerProperties("the_producer");
  config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  Async createAsync = ctx.async();

  AdminUtils adminUtils = AdminUtils.create(vertx, zookeeperHosts, true);

  adminUtils.topicExists(topicName, ctx.asyncAssertSuccess(res -> {
      ctx.assertFalse(res, "Topic must not exist");
      createAsync.complete();
    })
  );
  createAsync.awaitSuccess(10000);
}
项目:vertx-kafka-client    文件:AdminUtilsTest.java   
@Test
public void testDeleteTopic(TestContext ctx) throws Exception {
  final String topicName = "testDeleteTopic";
  Properties config = kafkaCluster.useTo().getProducerProperties("the_producer");
  config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  Async async = ctx.async();

  AdminUtils adminUtils = AdminUtils.create(vertx, zookeeperHosts, false);

  adminUtils.createTopic(topicName, 1, 1,
    ctx.asyncAssertSuccess(
      res -> async.complete())
  );

  async.awaitSuccess(10000);

  Async deleteAsync = ctx.async();
  adminUtils.deleteTopic(topicName, ctx.asyncAssertSuccess(res -> deleteAsync.complete()));
  deleteAsync.awaitSuccess(10000);
}