Java 类org.apache.kafka.clients.producer.ProducerConfig 实例源码

项目:flume-release-1.7.0    文件:TestKafkaSink.java   
@Test
public void testOldProperties() {
  KafkaSink kafkaSink = new KafkaSink();
  Context context = new Context();
  context.put("topic", "test-topic");
  context.put(OLD_BATCH_SIZE, "300");
  context.put(BROKER_LIST_FLUME_KEY, "localhost:9092,localhost:9092");
  context.put(REQUIRED_ACKS_FLUME_KEY, "all");
  Configurables.configure(kafkaSink, context);

  Properties kafkaProps = kafkaSink.getKafkaProps();

  assertEquals(kafkaSink.getTopic(), "test-topic");
  assertEquals(kafkaSink.getBatchSize(), 300);
  assertEquals(kafkaProps.getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG),
               "localhost:9092,localhost:9092");
  assertEquals(kafkaProps.getProperty(ProducerConfig.ACKS_CONFIG), "all");

}
项目:pepper-box    文件:PepperBoxKafkaSampler.java   
/**
 * Set default parameters and their values
 *
 * @return
 */
@Override
public Arguments getDefaultParameters() {

    Arguments defaultParameters = new Arguments();
    defaultParameters.addArgument(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, ProducerKeys.BOOTSTRAP_SERVERS_CONFIG_DEFAULT);
    defaultParameters.addArgument(ProducerKeys.ZOOKEEPER_SERVERS, ProducerKeys.ZOOKEEPER_SERVERS_DEFAULT);
    defaultParameters.addArgument(ProducerKeys.KAFKA_TOPIC_CONFIG, ProducerKeys.KAFKA_TOPIC_CONFIG_DEFAULT);
    defaultParameters.addArgument(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ProducerKeys.KEY_SERIALIZER_CLASS_CONFIG_DEFAULT);
    defaultParameters.addArgument(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ProducerKeys.VALUE_SERIALIZER_CLASS_CONFIG_DEFAULT);
    defaultParameters.addArgument(ProducerConfig.COMPRESSION_TYPE_CONFIG, ProducerKeys.COMPRESSION_TYPE_CONFIG_DEFAULT);
    defaultParameters.addArgument(ProducerConfig.BATCH_SIZE_CONFIG, ProducerKeys.BATCH_SIZE_CONFIG_DEFAULT);
    defaultParameters.addArgument(ProducerConfig.LINGER_MS_CONFIG, ProducerKeys.LINGER_MS_CONFIG_DEFAULT);
    defaultParameters.addArgument(ProducerConfig.BUFFER_MEMORY_CONFIG, ProducerKeys.BUFFER_MEMORY_CONFIG_DEFAULT);
    defaultParameters.addArgument(ProducerConfig.ACKS_CONFIG, ProducerKeys.ACKS_CONFIG_DEFAULT);
    defaultParameters.addArgument(ProducerConfig.SEND_BUFFER_CONFIG, ProducerKeys.SEND_BUFFER_CONFIG_DEFAULT);
    defaultParameters.addArgument(ProducerConfig.RECEIVE_BUFFER_CONFIG, ProducerKeys.RECEIVE_BUFFER_CONFIG_DEFAULT);
    defaultParameters.addArgument(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.PLAINTEXT.name);
    defaultParameters.addArgument(PropsKeys.MESSAGE_PLACEHOLDER_KEY, PropsKeys.MSG_PLACEHOLDER);
    defaultParameters.addArgument(ProducerKeys.KERBEROS_ENABLED, ProducerKeys.KERBEROS_ENABLED_DEFULAT);
    defaultParameters.addArgument(ProducerKeys.JAVA_SEC_AUTH_LOGIN_CONFIG, ProducerKeys.JAVA_SEC_AUTH_LOGIN_CONFIG_DEFAULT);
    defaultParameters.addArgument(ProducerKeys.JAVA_SEC_KRB5_CONFIG, ProducerKeys.JAVA_SEC_KRB5_CONFIG_DEFAULT);
    defaultParameters.addArgument(ProducerKeys.SASL_KERBEROS_SERVICE_NAME, ProducerKeys.SASL_KERBEROS_SERVICE_NAME_DEFAULT);
    defaultParameters.addArgument(ProducerKeys.SASL_MECHANISM, ProducerKeys.SASL_MECHANISM_DEFAULT);
    return defaultParameters;
}
项目:testcontainers-java-module-confluent-platform    文件:HelloProducer.java   
public void createProducer(String bootstrapServer) {
  long numberOfEvents = 5;

  Properties props = new Properties();
  props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
  props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

  KafkaProducer<String, String> producer = new KafkaProducer<>(
      props);

  for (int i = 0; i < numberOfEvents; i++) {
    String key = "testContainers";
    String value = "AreAwesome";
    ProducerRecord<String, String> record = new ProducerRecord<>(
        "hello_world_topic", key, value);
    try {
      producer.send(record).get();
    } catch (InterruptedException | ExecutionException e) {
      e.printStackTrace();
    }
    System.out.printf("key = %s, value = %s\n", key, value);
  }

  producer.close();
}
项目:kmq    文件:KmqClient.java   
public KmqClient(KmqConfig config, KafkaClients clients,
                 Class<? extends Deserializer<K>> keyDeserializer,
                 Class<? extends Deserializer<V>> valueDeserializer,
                 long msgPollTimeout) {

    this.config = config;
    this.msgPollTimeout = msgPollTimeout;

    this.msgConsumer = clients.createConsumer(config.getMsgConsumerGroupId(), keyDeserializer, valueDeserializer);
    // Using the custom partitioner, each offset-partition will contain markers only from a single queue-partition.
    this.markerProducer = clients.createProducer(
            MarkerKey.MarkerKeySerializer.class, MarkerValue.MarkerValueSerializer.class,
            Collections.singletonMap(ProducerConfig.PARTITIONER_CLASS_CONFIG, ParititionFromMarkerKey.class));

    LOG.info(String.format("Subscribing to topic: %s, using group id: %s", config.getMsgTopic(), config.getMsgConsumerGroupId()));
    msgConsumer.subscribe(Collections.singletonList(config.getMsgTopic()));
}
项目:cruise-control    文件:CruiseControlMetricsReporterSslTest.java   
@Override
public Properties overridingProps() {
  Properties props = new Properties();
  int port = findLocalPort();
  // We need to convert all the properties to the Cruise Control properties.
  setSecurityConfigs(props, "producer");
  for (String configName : ProducerConfig.configNames()) {
    Object value = props.get(configName);
    if (value != null) {
      props.remove(configName);
      props.put(appendPrefix(configName), value);
    }
  }
  props.setProperty("metric.reporters", CruiseControlMetricsReporter.class.getName());
  props.setProperty("listeners", "SSL://127.0.0.1:" + port);
  props.setProperty(CruiseControlMetricsReporterConfig.config(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG), "127.0.0.1:" + port);
  props.setProperty(CruiseControlMetricsReporterConfig.config(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG), "SSL");
  props.setProperty(CRUISE_CONTROL_METRICS_REPORTING_INTERVAL_MS_CONFIG, "100");
  props.setProperty(CRUISE_CONTROL_METRICS_TOPIC_CONFIG, TOPIC);
  return props;
}
项目:cruise-control    文件:CruiseControlMetricsReporterTest.java   
@Before
public void setUp() {
  super.setUp();
  Properties props = new Properties();
  props.setProperty(ProducerConfig.ACKS_CONFIG, "-1");
  AtomicInteger failed = new AtomicInteger(0);
  try (Producer<String, String> producer = createProducer(props)) {
    for (int i = 0; i < 10; i++) {
      producer.send(new ProducerRecord<>("TestTopic", Integer.toString(i)), new Callback() {
        @Override
        public void onCompletion(RecordMetadata recordMetadata, Exception e) {
          if (e != null) {
            failed.incrementAndGet();
          }
        }
      });
    }
  }
  assertEquals(0, failed.get());
}
项目:ja-micro    文件:KafkaPublisher.java   
public void initialize(String servers) {
    if (isInitialized.get()) {
        logger.warn("Already initialized");
        return;
    }

    Properties props = new Properties();
    props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, servers);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, SixtPartitioner.class.getName());
    props.put(ProducerConfig.RETRIES_CONFIG, "3");
    props.put(ProducerConfig.ACKS_CONFIG, "all");

    properties.forEach(props::put);

    realProducer = new KafkaProducer<>(props);
    isInitialized.set(true);
}
项目:oscm    文件:Producer.java   
public void produce(UUID key, Object value) {
    ConfigurationService configService = ServiceLocator
            .findService(ConfigurationService.class);
    Properties kafkaProps = new Properties();
    kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
            configService.getVOConfigurationSetting(
                    ConfigurationKey.KAFKA_BOOTSTRAP_SERVERS, "global")
                    .getValue());

    this.producer = new KafkaProducer<>(kafkaProps, new UUIDSerializer(),
            new DataSerializer(value.getClass()));
    try {
        producer.send(new ProducerRecord<>(TOPIC, key, value));
    } catch (Exception e) {
        LOGGER.error("Producer closed");
        e.printStackTrace();
    } finally {
        producer.close();
        LOGGER.debug("Producer closed");
    }
}
项目:kafka-webview    文件:WebKafkaConsumerTest.java   
public void publishDummyData() {
    final String topic = "TestTopic";

    // Create publisher
    final Map<String, Object> config = new HashMap<>();
    config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

    final KafkaProducer<String, String> producer = new KafkaProducer<>(config);
    for (int charCode = 65; charCode < 91; charCode++) {
        final char[] key = new char[1];
        key[0] = (char) charCode;

        producer.send(new ProducerRecord<>(topic, new String(key), new String(key)));
    }
    producer.flush();
    producer.close();
}
项目:kafka-webview    文件:WebKafkaConsumerTest.java   
public void publishDummyDataNumbers() {
    final String topic = "NumbersTopic";

    // Create publisher
    final Map<String, Object> config = new HashMap<>();
    config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
    config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
    config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

    final KafkaProducer<Integer, Integer> producer = new KafkaProducer<>(config);
    for (int value = 0; value < 10000; value++) {
        producer.send(new ProducerRecord<>(topic, value, value));
    }
    producer.flush();
    producer.close();
}
项目:eventapis    文件:Eventapis.java   
public static void main(String[] args) throws ExecutionException, InterruptedException {
        Map props = new HashMap<>();
        // list of host:port pairs used for establishing the initial connections
        // to the Kakfa cluster
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                "kafka-local:9092");
//        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
//                JsonSerializer.class);
//        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
//                JsonSerializer.class);
        // value to block, after which it will throw a TimeoutException
        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 50000);
        AdminClient adminClient = AdminClient.create(props);
        adminClient.describeCluster();
        Collection<TopicListing> topicListings = adminClient.listTopics().listings().get();
        System.out.println(topicListings);
    }
项目:kafka-streams-example    文件:Producer.java   
public Producer() {
    LOGGER.log(Level.INFO, "Kafka Producer running in thread {0}", Thread.currentThread().getName());
    Properties kafkaProps = new Properties();

    String defaultClusterValue = "localhost:9092";
    String kafkaCluster = System.getProperty(KAFKA_CLUSTER_ENV_VAR_NAME, defaultClusterValue);
    LOGGER.log(Level.INFO, "Kafka cluster {0}", kafkaCluster);

    kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaCluster);
    kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    kafkaProps.put(ProducerConfig.ACKS_CONFIG, "0");

    this.kafkaProducer = new KafkaProducer<>(kafkaProps);

}
项目:stroom-query    文件:KafkaLogbackAppender.java   
public KafkaLogbackAppender(final Properties producerConfig,
                            final String topic) {
    this.topic = topic;

    // Build properties that can be used by the kafka producer
    this.producerConfig = new Properties();
    this.producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
    this.producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0);
    this.producerConfig.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
    this.producerConfig.put(ProducerConfig.LINGER_MS_CONFIG, 1);
    this.producerConfig.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
    this.producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
            org.apache.kafka.common.serialization.StringSerializer.class.getName());
    this.producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            org.apache.kafka.common.serialization.ByteArraySerializer.class.getName());

    this.producerConfig.putAll(producerConfig);
}
项目:talk-kafka-messaging-logs    文件:ProduceConsumeLongByteArrayRecord.java   
private static void produceRecords(String bootstrapServers) {
    Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());

    Producer<Long, byte[]> producer = new KafkaProducer<>(properties);

    LongStream.rangeClosed(1, 100).boxed()
            .map(number ->
                    new ProducerRecord<>(
                            TOPIC, //topic
                            number, //key
                            String.format("record-%s", number.toString()).getBytes())) //value
            .forEach(record -> producer.send(record));
    producer.close();
}
项目:talk-kafka-messaging-logs    文件:ProduceConsumeStringAvroRecord.java   
private static void produceRecords(String bootstrapServers) {
    Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());

    Producer<String, byte[]> producer = new KafkaProducer<>(properties);

    IntStream.rangeClosed(1, 100).boxed()
            .map(number -> new ProducerRecord<>(
                    TOPIC, //topic
                    number.toString(), //key
                    UserAvroSerdes.serialize(new User(String.format("user-%s", number.toString()))))) //value
            .forEach(record -> producer.send(record));
    producer.close();
}
项目:talk-kafka-messaging-logs    文件:ProduceConsumeIntegerStringRecord.java   
private static void produceRecords(String bootstrapServers) {
    Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

    Producer<Integer, String> producer = new KafkaProducer<>(properties);

    IntStream.rangeClosed(1, 100).boxed()
            .map(number ->
                    new ProducerRecord<>(
                            TOPIC,
                            number, //Key
                            String.format("record-%s", number))) //Value
            .forEach(record -> producer.send(record));
    producer.close();
}
项目:talk-kafka-messaging-logs    文件:Compaction.java   
private static void produceRecords(String bootstrapServers) {
    Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

    Producer<Integer, String> producer = new KafkaProducer<>(properties);

    IntStream.rangeClosed(1, 10000).boxed()
            .map(number ->
                    new ProducerRecord<>(
                            TOPIC,
                            1, //Key
                            String.format("record-%s", number))) //Value
            .forEach(record -> producer.send(record));
    producer.close();
}
项目:talk-kafka-messaging-logs    文件:Retention.java   
private static void produceRecords(String bootstrapServers) {
    Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

    Producer<Integer, String> producer = new KafkaProducer<>(properties);

    IntStream
            .rangeClosed(1, 100000).boxed()
            .map(number ->
                    new ProducerRecord<>(
                            TOPIC,
                            1, //Key
                            String.format("record-%s", number))) //Value
            .forEach(record -> producer.send(record));
    producer.close();
}
项目:talk-kafka-messaging-logs    文件:Compaction.java   
private static void produceRecords() {
    Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());

    Producer<Integer, byte[]> producer = new KafkaProducer<>(properties);

    IntStream.rangeClosed(1, 10000).boxed()
            .map(number ->
                    new ProducerRecord<>(
                            TOPIC,
                            1, //Key
                            KafkaProducerUtil.createMessage(1000))) //Value
            .forEach(record -> {
                producer.send(record);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
    producer.close();
}
项目:talk-kafka-messaging-logs    文件:Retention.java   
private static void produceRecords(String bootstrapServers) {
    Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());

    Producer<Integer, byte[]> producer = new KafkaProducer<>(properties);

    IntStream.rangeClosed(1, 10000).boxed()
            .map(number ->
                    new ProducerRecord<>(
                            TOPIC,
                            1, //Key
                            KafkaProducerUtil.createMessage(1000))) //Value
            .forEach(record -> {
                producer.send(record);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
    producer.close();
}
项目:talk-kafka-messaging-logs    文件:KafkaSlowProducer.java   
private static void produceRecords(String bootstrapServers) {
    Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    properties.put(ProducerConfig.ACKS_CONFIG, "all");
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

    Producer<Integer, String> producer = new KafkaProducer<>(properties);

    IntStream.rangeClosed(1, 100).boxed()
            .map(number ->
                    new ProducerRecord<>(
                            TOPIC,
                            number, //Key
                            String.format("record-%s", number))) //Value
            .forEach(record -> {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                producer.send(record);
            });
    producer.close();
}
项目:kafka-0.11.0.0-src-with-comment    文件:KafkaConfigBackingStore.java   
KafkaBasedLog<String, byte[]> setupAndCreateKafkaBasedLog(String topic, final WorkerConfig config) {
    Map<String, Object> producerProps = new HashMap<>();
    producerProps.putAll(config.originals());
    producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
    producerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);

    Map<String, Object> consumerProps = new HashMap<>();
    consumerProps.putAll(config.originals());
    consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());

    Map<String, Object> adminProps = new HashMap<>(config.originals());
    NewTopic topicDescription = TopicAdmin.defineTopic(topic).
            compacted().
            partitions(1).
            replicationFactor(config.getShort(DistributedConfig.CONFIG_STORAGE_REPLICATION_FACTOR_CONFIG)).
            build();

    return createKafkaBasedLog(topic, producerProps, consumerProps, new ConsumeCallback(), topicDescription, adminProps);
}
项目:kafka-0.11.0.0-src-with-comment    文件:KafkaOffsetBackingStoreTest.java   
@Test
public void testStartStop() throws Exception {
    expectConfigure();
    expectStart(Collections.EMPTY_LIST);
    expectStop();

    PowerMock.replayAll();

    store.configure(DEFAULT_DISTRIBUTED_CONFIG);
    assertEquals(TOPIC, capturedTopic.getValue());
    assertEquals("org.apache.kafka.common.serialization.ByteArraySerializer", capturedProducerProps.getValue().get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG));
    assertEquals("org.apache.kafka.common.serialization.ByteArraySerializer", capturedProducerProps.getValue().get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG));
    assertEquals("org.apache.kafka.common.serialization.ByteArrayDeserializer", capturedConsumerProps.getValue().get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG));
    assertEquals("org.apache.kafka.common.serialization.ByteArrayDeserializer", capturedConsumerProps.getValue().get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG));

    assertEquals(TOPIC, capturedNewTopic.getValue().name());
    assertEquals(TOPIC_PARTITIONS, capturedNewTopic.getValue().numPartitions());
    assertEquals(TOPIC_REPLICATION_FACTOR, capturedNewTopic.getValue().replicationFactor());

    store.start();
    store.stop();

    PowerMock.verifyAll();
}
项目:kafka-0.11.0.0-src-with-comment    文件:KafkaConfigBackingStoreTest.java   
@Test
public void testStartStop() throws Exception {
    expectConfigure();
    expectStart(Collections.EMPTY_LIST, Collections.EMPTY_MAP);
    expectStop();

    PowerMock.replayAll();

    configStorage.setupAndCreateKafkaBasedLog(TOPIC, DEFAULT_DISTRIBUTED_CONFIG);

    assertEquals(TOPIC, capturedTopic.getValue());
    assertEquals("org.apache.kafka.common.serialization.StringSerializer", capturedProducerProps.getValue().get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG));
    assertEquals("org.apache.kafka.common.serialization.ByteArraySerializer", capturedProducerProps.getValue().get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG));
    assertEquals("org.apache.kafka.common.serialization.StringDeserializer", capturedConsumerProps.getValue().get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG));
    assertEquals("org.apache.kafka.common.serialization.ByteArrayDeserializer", capturedConsumerProps.getValue().get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG));

    assertEquals(TOPIC, capturedNewTopic.getValue().name());
    assertEquals(1, capturedNewTopic.getValue().numPartitions());
    assertEquals(TOPIC_REPLICATION_FACTOR, capturedNewTopic.getValue().replicationFactor());
    configStorage.start();
    configStorage.stop();

    PowerMock.verifyAll();
}
项目:kafka-0.11.0.0-src-with-comment    文件:StreamsConfig.java   
/**
 * Get the configs for the {@link KafkaProducer producer}.
 * Properties using the prefix {@link #PRODUCER_PREFIX} will be used in favor over their non-prefixed versions
 * except in the case of {@link ProducerConfig#BOOTSTRAP_SERVERS_CONFIG} where we always use the non-prefixed
 * version as we only support reading/writing from/to the same Kafka Cluster.
 *
 * @param clientId clientId
 * @return Map of the producer configuration.
 */
public Map<String, Object> getProducerConfigs(final String clientId) {
    final Map<String, Object> clientProvidedProps = getClientPropsWithPrefix(PRODUCER_PREFIX, ProducerConfig.configNames());

    if (eosEnabled) {
        if (clientProvidedProps.containsKey(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG)) {
            throw new ConfigException("Unexpected user-specified consumer config " + ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG
                + "; because " + PROCESSING_GUARANTEE_CONFIG + " is set to '" + EXACTLY_ONCE + "' producer will always have idempotency enabled.");
        }

        if (clientProvidedProps.containsKey(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION)) {
            throw new ConfigException("Unexpected user-specified consumer config " + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION
                + "; because " + PROCESSING_GUARANTEE_CONFIG + " is set to '" + EXACTLY_ONCE + "' producer will always have only one in-flight request per connection.");
        }
    }

    // generate producer configs from original properties and overridden maps
    final Map<String, Object> props = new HashMap<>(eosEnabled ? PRODUCER_EOS_OVERRIDES : PRODUCER_DEFAULT_OVERRIDES);
    props.putAll(clientProvidedProps);

    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, originals().get(BOOTSTRAP_SERVERS_CONFIG));
    // add client id with stream client id prefix
    props.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-producer");

    return props;
}
项目:kafka-0.11.0.0-src-with-comment    文件:JoinIntegrationTest.java   
@BeforeClass
public static void setupConfigsAndUtils() throws Exception {
    PRODUCER_CONFIG.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
    PRODUCER_CONFIG.put(ProducerConfig.ACKS_CONFIG, "all");
    PRODUCER_CONFIG.put(ProducerConfig.RETRIES_CONFIG, 0);
    PRODUCER_CONFIG.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
    PRODUCER_CONFIG.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

    RESULT_CONSUMER_CONFIG.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
    RESULT_CONSUMER_CONFIG.put(ConsumerConfig.GROUP_ID_CONFIG, APP_ID + "-result-consumer");
    RESULT_CONSUMER_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    RESULT_CONSUMER_CONFIG.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
    RESULT_CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

    STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
    STREAMS_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    STREAMS_CONFIG.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
    STREAMS_CONFIG.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
    STREAMS_CONFIG.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    STREAMS_CONFIG.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
    STREAMS_CONFIG.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
    STREAMS_CONFIG.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
}
项目:kafka-0.11.0.0-src-with-comment    文件:QueryableStateIntegrationTest.java   
@Override
public void run() {
    final Properties producerConfig = new Properties();
    producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
    producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
    producerConfig.put(ProducerConfig.RETRIES_CONFIG, 10);
    producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

    try (final KafkaProducer<String, String> producer =
                 new KafkaProducer<>(producerConfig, new StringSerializer(), new StringSerializer())) {

        while (getCurrIteration() < numIterations && !shutdown) {
            for (final String value : inputValues) {
                producer.send(new ProducerRecord<String, String>(topic, value));
            }
            incrementInteration();
        }
    }
}
项目:kafka-0.11.0.0-src-with-comment    文件:SimpleBenchmark.java   
private Properties setProduceConsumeProperties(final String clientId) {
    Properties props = new Properties();
    props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
    // the socket buffer needs to be large, especially when running in AWS with
    // high latency. if running locally the default is fine.
    props.put(ProducerConfig.SEND_BUFFER_CONFIG, SOCKET_SIZE_BYTES);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
    // the socket buffer needs to be large, especially when running in AWS with
    // high latency. if running locally the default is fine.
    props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, SOCKET_SIZE_BYTES);
    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, MAX_POLL_RECORDS);
    return props;
}
项目:jkes    文件:JkesKafkaProducer.java   
@Inject
public JkesKafkaProducer(JkesProperties jkesProperties) {
    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, jkesProperties.getKafkaBootstrapServers());
    props.put("acks", "all");
    props.put("retries", 1);
    props.put("batch.size", 16384);
    props.put("linger.ms", 1);
    props.put("buffer.memory", 33554432);
    props.put("documentBasePackage", jkesProperties.getDocumentBasePackage());

    // Why use StringSerializer? Because in some cases key is String not Long, such as MongoDB
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
            org.apache.kafka.common.serialization.StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            JkesKafkaJsonSerializer.class);

    this.producer = new KafkaProducer<>(props);
}
项目:seldon-core    文件:KafkaRequestResponseProducer.java   
@Autowired
public KafkaRequestResponseProducer(@Value("${seldon.kafka.enable}") boolean kafkaEnabled) 
{
    if (kafkaEnabled)
    {
        enabled = true;
        String kafkaHostPort = System.getenv(ENV_VAR_SELDON_KAFKA_SERVER);
        logger.info(String.format("using %s[%s]", ENV_VAR_SELDON_KAFKA_SERVER, kafkaHostPort));
        if (kafkaHostPort == null) {
            logger.warn("*WARNING* SELDON_KAFKA_SERVER environment variable not set!");
            kafkaHostPort = "localhost:9093";
        }
        logger.info("Starting kafka client with server "+kafkaHostPort);
        Properties props = new Properties();
        props.put("bootstrap.servers", kafkaHostPort);
        props.put("client.id", "RequestResponseProducer");
        props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "1000");
        props.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, "1000");
        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG,"20"); //NB need to investigate issues of Kafka not able to get metadata
        producer = new KafkaProducer<>(props, new StringSerializer(), new RequestResponseSerializer());
    }
    else
        logger.warn("Kafka not enabled");
}
项目:cruise-control    文件:KafkaSampleStore.java   
@Override
public void configure(Map<String, ?> config) {
  _partitionMetricSampleStoreTopic = (String) config.get(PARTITION_METRIC_SAMPLE_STORE_TOPIC_CONFIG);
  _brokerMetricSampleStoreTopic = (String) config.get(BROKER_METRIC_SAMPLE_STORE_TOPIC_CONFIG);
  if (_partitionMetricSampleStoreTopic == null
      || _brokerMetricSampleStoreTopic == null
      || _partitionMetricSampleStoreTopic.isEmpty()
      || _brokerMetricSampleStoreTopic.isEmpty()) {
    throw new IllegalArgumentException("The sample store topic names must be configured.");
  }
  String numProcessingThreadsString = (String) config.get(NUM_SAMPLE_LOADING_THREADS);
  int numProcessingThreads = numProcessingThreadsString == null || numProcessingThreadsString.isEmpty() ?
      8 : Integer.parseInt(numProcessingThreadsString);
  _metricProcessorExecutor = Executors.newFixedThreadPool(numProcessingThreads);
  _consumers = new ArrayList<>(numProcessingThreads);
  for (int i = 0; i < numProcessingThreads; i++) {
    _consumers.add(createConsumers(config));
  }
  Properties producerProps = new Properties();
  producerProps.putAll(config);
  producerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                            (String) config.get(KafkaCruiseControlConfig.BOOTSTRAP_SERVERS_CONFIG));
  producerProps.setProperty(ProducerConfig.CLIENT_ID_CONFIG, PRODUCER_CLIENT_ID);
  // Set batch.size and linger.ms to a big number to have better batching.
  producerProps.setProperty(ProducerConfig.LINGER_MS_CONFIG, "30000");
  producerProps.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "800000");
  producerProps.setProperty(ProducerConfig.RETRIES_CONFIG, "5");
  producerProps.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");
  producerProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
  producerProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
  _producer = new KafkaProducer<>(producerProps);

  _loadingProgress = -1.0;

  ensureTopicCreated(config);
}
项目:cruise-control    文件:CruiseControlMetricsReporter.java   
@Override
public void configure(Map<String, ?> configs) {
  Properties producerProps = CruiseControlMetricsReporterConfig.parseProducerConfigs(configs);
  if (!producerProps.containsKey(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)) {
    String port = (String) configs.get("port");
    String bootstrapServers = "localhost:" + (port == null ? "9092" : port);
    producerProps.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    LOG.info("Using default value of {} for {}", bootstrapServers,
             CruiseControlMetricsReporterConfig.config(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG));
  }
  if (!producerProps.containsKey(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)) {
    String securityProtocol = "PLAINTEXT";
    producerProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol);
    LOG.info("Using default value of {} for {}", securityProtocol,
             CruiseControlMetricsReporterConfig.config(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
  }
  CruiseControlMetricsReporterConfig reporterConfig = new CruiseControlMetricsReporterConfig(configs, false);

  setIfAbsent(producerProps,
              ProducerConfig.CLIENT_ID_CONFIG,
              reporterConfig.getString(CruiseControlMetricsReporterConfig.config(CommonClientConfigs.CLIENT_ID_CONFIG)));
  // Set batch.size and linger.ms to a big number to have better batching.
  setIfAbsent(producerProps, ProducerConfig.LINGER_MS_CONFIG, "30000");
  setIfAbsent(producerProps, ProducerConfig.BATCH_SIZE_CONFIG, "800000");
  setIfAbsent(producerProps, ProducerConfig.RETRIES_CONFIG, "5");
  setIfAbsent(producerProps, ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");
  setIfAbsent(producerProps, ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  setIfAbsent(producerProps, ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, MetricSerde.class.getName());
  setIfAbsent(producerProps, ProducerConfig.ACKS_CONFIG, "all");
  _producer = new KafkaProducer<>(producerProps);

  _brokerId = Integer.parseInt((String) configs.get(KafkaConfig.BrokerIdProp()));

  _cruiseControlMetricsTopic = reporterConfig.getString(CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_CONFIG);
  _reportingIntervalMs = reporterConfig.getLong(CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_REPORTING_INTERVAL_MS_CONFIG);
}
项目:mtgo-best-bot    文件:BotOrchestratorApplication.java   
@Bean
public Map<String, Object> producerConfigs() {
    Map<String, Object> props = new HashMap<>();
    // list of host:port pairs used for establishing the initial connections to the Kakfa cluster
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

    return props;
}
项目:AthenaX    文件:KafkaJsonConnectorITest.java   
private static KafkaProducer<byte[], byte[]> getProducer(String brokerList) {
  Properties prop = new Properties();
  prop.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
  prop.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
  prop.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
  return new KafkaProducer<>(prop);
}
项目:AthenaX    文件:ITestUtil.java   
private static KafkaProducer<byte[], byte[]> getProducer(String brokerList) {
  Properties prop = new Properties();
  prop.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
  prop.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
  prop.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
  return new KafkaProducer<>(prop);
}
项目:open-kilda    文件:MessageProducerConfig.java   
/**
 * Kafka producer config bean.
 * This {@link Map} is used by {@link MessageProducerConfig#producerFactory}.
 *
 * @return kafka properties bean
 */
@Bean
public Map<String, Object> producerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaHosts);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.RETRIES_CONFIG, 0);
    props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
    props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
    props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
    props.put(ProducerConfig.ACKS_CONFIG, "all");
    return props;
}
项目:open-kilda    文件:MessageProducerConfig.java   
/**
 * Kafka producer config bean.
 * This {@link Map} is used by {@link MessageProducerConfig#producerFactory}.
 *
 * @return kafka properties bean
 */
@Bean
public Map<String, Object> producerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaHosts);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.RETRIES_CONFIG, 0);
    props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
    props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
    props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
    props.put(ProducerConfig.ACKS_CONFIG, "all");
    return props;
}
项目:open-kilda    文件:AbstractTopology.java   
private Properties makeKafkaProperties() {
    Properties kafka = new Properties();

    kafka.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    kafka.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    kafka.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getKafkaHosts());
    kafka.setProperty(ConsumerConfig.GROUP_ID_CONFIG, getTopologyName());
    kafka.setProperty("request.required.acks", "1");

    return kafka;
}
项目:open-kilda    文件:AbstractStormTest.java   
protected static Properties kafkaProperties() throws ConfigurationException, CmdLineException {
    Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, makeUnboundConfig().getKafkaHosts());
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
    properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
    properties.put("request.required.acks", "1");
    return properties;
}
项目:daf-replicate-ingestion    文件:SenderConfig.java   
@Bean
public Map<String, Object> producerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AvroSerializer.class);

    return props;
}