Java 类org.apache.kafka.clients.producer.KafkaProducer 实例源码

项目:kmq    文件:StandaloneSender.java   
public static void main(String[] args) throws InterruptedException, IOException {
    UncaughtExceptionHandling.setup();

    KafkaProducer<ByteBuffer, ByteBuffer> msgProducer = KAFKA_CLIENTS
            .createProducer(ByteBufferSerializer.class, ByteBufferSerializer.class);

    LOG.info("Sending ...");

    for(int i = 0; i < TOTAL_MSGS; i++) {
        ByteBuffer data = ByteBuffer.allocate(4).putInt(i);
        msgProducer.send(new ProducerRecord<>(KMQ_CONFIG.getMsgTopic(), data));
        try { Thread.sleep(100L); } catch (InterruptedException e) { throw new RuntimeException(e); }
        LOG.info(String.format("Sent message %d", i));
    }

    msgProducer.close();

    LOG.info("Sent");
}
项目:apache-kafka-demos    文件:PerformanceProducer.java   
public static void main(String[] args) throws InterruptedException {

        Properties props = new Properties();
        props.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ACKS_CONFIG, "all");
        props.put(RETRIES_CONFIG, 0);
        props.put(BATCH_SIZE_CONFIG, 32000);
        props.put(LINGER_MS_CONFIG, 100);
        props.put(BUFFER_MEMORY_CONFIG, 33554432);
        props.put(KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.LongSerializer");
        props.put(VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.LongSerializer");

        Producer<Long, Long> producer = new KafkaProducer<>(props);

        long t1 = System.currentTimeMillis();

        long i = 0;
        for(; i < 1000000; i++) {

            producer.send(new ProducerRecord<>("produktion", i, i));
        }
        producer.send(new ProducerRecord<Long,Long>("produktion", (long) -1, (long)-1));
        System.out.println("fertig " + i  + " Nachrichten in " + (System.currentTimeMillis() - t1 + " ms"));

        producer.close();
    }
项目:big-data-benchmark    文件:RealTimeTradeProducer.java   
private RealTimeTradeProducer(int index, String broker, String topic, int tradesPerSecond, int keysFrom, int keysTo) throws IOException,
        URISyntaxException {
    if (tradesPerSecond <= 0) {
        throw new RuntimeException("tradesPerSecond=" + tradesPerSecond);
    }
    this.index = index;
    this.topic = topic;
    this.tradesPerSecond = tradesPerSecond;
    tickers = new String[keysTo - keysFrom];
    Arrays.setAll(tickers, i -> "T-" + Integer.toString(i + keysFrom));
    Properties props = new Properties();
    props.setProperty("bootstrap.servers", broker);
    props.setProperty("key.serializer", LongSerializer.class.getName());
    props.setProperty("value.serializer", TradeSerializer.class.getName());
    this.producer = new KafkaProducer<>(props);
}
项目:echo    文件:KafkaFlowFilesProducer.java   
@OnScheduled
public void onScheduled(final ProcessContext context) {
    try {
        topic = context.getProperty(TOPIC).getValue();
        brokerIP = context.getProperty(BROKERIP).getValue();
        props = new Properties();
        props.put("bootstrap.servers", brokerIP);
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        producer = new KafkaProducer<>(props);
    } catch (Exception e) {
        e.printStackTrace();
    }
}
项目:doctorkafka    文件:KafkaWriter.java   
public static void main(String[] args) throws Exception {
  CommandLine commandLine = parseCommandLine(args);
  String zkUrl = commandLine.getOptionValue(ZOOKEEPER);
  String topic = commandLine.getOptionValue(TOPIC);
  int numMessages = Integer.parseInt(commandLine.getOptionValue(NUM_MESSAGES));

  Random random = new Random();
  Properties props = OperatorUtil.createKafkaProducerProperties(zkUrl);
  KafkaProducer kafkaProducer = new KafkaProducer<>(props);

  byte[] key = new byte[16];
  byte[] data = new byte[1024];
  for (int i = 0; i < numMessages; i++) {
    for (int j = 0; j < data.length; j++) {
      data[j] = (byte)random.nextInt();
    }
    ProducerRecord<byte[], byte[]> producerRecord = new ProducerRecord(
        topic, 0, System.currentTimeMillis(), key, data);
    Future<RecordMetadata> future = kafkaProducer.send(producerRecord);
    future.get();
    if (i % 100 == 0) {
      System.out.println("Have wrote " + i + " messages to kafka");
    }
  }
}
项目:flume-release-1.7.0    文件:TestKafkaChannel.java   
private void doTestNullKeyNoHeader() throws Exception {
  final KafkaChannel channel = startChannel(false);
  Properties props = channel.getProducerProps();
  KafkaProducer<String, byte[]> producer = new KafkaProducer<String, byte[]>(props);

  for (int i = 0; i < 50; i++) {
    ProducerRecord<String, byte[]> data =
        new ProducerRecord<String, byte[]>(topic, null, String.valueOf(i).getBytes());
    producer.send(data).get();
  }
  ExecutorCompletionService<Void> submitterSvc = new
          ExecutorCompletionService<Void>(Executors.newCachedThreadPool());
  List<Event> events = pullEvents(channel, submitterSvc,
          50, false, false);
  wait(submitterSvc, 5);
  List<String> finals = new ArrayList<String>(50);
  for (int i = 0; i < 50; i++) {
    finals.add(i, events.get(i).getHeaders().get(KEY_HEADER));
  }
  for (int i = 0; i < 50; i++) {
    Assert.assertTrue( finals.get(i) == null);
  }
  channel.stop();
}
项目:nighthawk    文件:KafkaProducerTest.java   
@Test
public void test_create_tracing_serializer() throws Exception {
    Properties props = new Properties();
    props.put("bootstrap.servers", "127.0.0.1:9092");//该地址是集群的子集,用来探测集群。
    props.put("acks", "all");// 记录完整提交,最慢的但是最大可能的持久化
    props.put("retries", 3);// 请求失败重试的次数
    props.put("batch.size", 16384);// batch的大小
    props.put("linger.ms", 1);// 默认情况即使缓冲区有剩余的空间,也会立即发送请求,设置一段时间用来等待从而将缓冲区填的更多,单位为毫秒,producer发送数据会延迟1ms,可以减少发送到kafka服务器的请求数据
    props.put("buffer.memory", 33554432);// 提供给生产者缓冲内存总量
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    KafkaProducer<String, String> producer = new KafkaProducer<>(props);
    for (int i = 0; i < 1000; i++) {
        producer.send(new ProducerRecord<>("test", "hello", "kafka - " + i));
        Thread.sleep(10000);
    }
}
项目:Re-Collector    文件:KafkaOutput.java   
static Producer<String, String> getProducer(KafkaOutputConfiguration configuration) {
    if (producer == null) {
        synchronized (KafkaOutput.class) {
            if (producer != null) {
                return producer;
            }
            Properties props = new Properties();
            props.put("bootstrap.servers", configuration.getHost() + ":" + configuration.getPort());
            props.put("acks", "all");
            props.put("retries", 0);
            props.put("request.required.acks", "0");
            props.put("batch.size", 64);
            props.put("linger.ms", 1);
            props.put("buffer.memory", 1024);
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            producer = new KafkaProducer<>(props);
        }
        return producer;
    } else {
        return producer;
    }
}
项目:wechat-mall    文件:OrderProducer.java   
@Override
public void send(Long k, byte[] v) {
    KafkaProducer<Long, byte[]> p = getWorker();
    p.initTransactions();
    p.beginTransaction();
    Future<RecordMetadata> res = worker.send(new ProducerRecord<Long, byte[]>(topic, k, v));
    RecordMetadata record;
    try {
        record = res.get();
        offsets.clear();
        offsets.put(new TopicPartition(topic, record.partition()), new OffsetAndMetadata(record.offset()));
        p.sendOffsetsToTransaction(offsets, MallConstants.ORDER_GROUP);
        p.commitTransaction();
    } catch (InterruptedException | ExecutionException e) {
        p.abortTransaction();
    }
}
项目:spark-cassandra-poc    文件:KafkaDataProducer.java   
public void sendData(String data) {

        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props);

        Map<MetricName, ? extends Metric> metrics = producer.metrics();
        System.out.println(metrics);

        for (int i = 0; i < 100; i++)
            producer.send(new ProducerRecord<String, String>("video_view", data));

        producer.close();

    }
项目:open-kilda    文件:Original.java   
@Test
public void shouldWriteThenRead() throws Exception {

    //Create a consumer
    ConsumerIterator<String, String> it = buildConsumer(Original.topic);

    //Create a producer
    producer = new KafkaProducer<>(producerProps());

    //send a message
    producer.send(new ProducerRecord<>(Original.topic, "message")).get();

    //read it back
    MessageAndMetadata<String, String> messageAndMetadata = it.next();
    String value = messageAndMetadata.message();
    assertThat(value, is("message"));
}
项目:talk-kafka-messaging-logs    文件:ProduceConsumeLongByteArrayRecord.java   
private static void produceRecords(String bootstrapServers) {
    Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());

    Producer<Long, byte[]> producer = new KafkaProducer<>(properties);

    LongStream.rangeClosed(1, 100).boxed()
            .map(number ->
                    new ProducerRecord<>(
                            TOPIC, //topic
                            number, //key
                            String.format("record-%s", number.toString()).getBytes())) //value
            .forEach(record -> producer.send(record));
    producer.close();
}
项目:oryx2    文件:ProduceData.java   
public void start() throws InterruptedException {
  RandomGenerator random = RandomManager.getRandom();

  Properties props = ConfigUtils.keyValueToProperties(
      "bootstrap.servers", "localhost:" + kafkaPort,
      "key.serializer", "org.apache.kafka.common.serialization.StringSerializer",
      "value.serializer", "org.apache.kafka.common.serialization.StringSerializer",
      "compression.type", "gzip",
      "batch.size", 0,
      "acks", 1,
      "max.request.size", 1 << 26 // TODO
  );
  try (Producer<String,String> producer = new KafkaProducer<>(props)) {
    for (int i = 0; i < howMany; i++) {
      Pair<String,String> datum = datumGenerator.generate(i, random);
      ProducerRecord<String,String> record =
          new ProducerRecord<>(topic, datum.getFirst(), datum.getSecond());
      producer.send(record);
      log.debug("Sent datum {} = {}", record.key(), record.value());
      if (intervalMsec > 0) {
        Thread.sleep(intervalMsec);
      }
    }
  }
}
项目:talk-kafka-messaging-logs    文件:Retention.java   
private static void produceRecords(String bootstrapServers) {
    Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());

    Producer<Integer, byte[]> producer = new KafkaProducer<>(properties);

    IntStream.rangeClosed(1, 10000).boxed()
            .map(number ->
                    new ProducerRecord<>(
                            TOPIC,
                            1, //Key
                            KafkaProducerUtil.createMessage(1000))) //Value
            .forEach(record -> {
                producer.send(record);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
    producer.close();
}
项目:DBus    文件:KafkaConsumerEvent.java   
public KafkaConsumerEvent(String topic) {
    super(0l);
    this.topic = topic;
    Properties props = HeartBeatConfigContainer.getInstance().getKafkaConsumerConfig();
    Properties producerProps = HeartBeatConfigContainer.getInstance().getKafkaProducerConfig();
    try {
        dataConsumer = new KafkaConsumer<>(props);
        partition0 = new TopicPartition(this.topic, 0);
        dataConsumer.assign(Arrays.asList(partition0));
        dataConsumer.seekToEnd(Arrays.asList(partition0));
        KafkaConsumerContainer.getInstances().putConsumer(this.topic, dataConsumer);

        statProducer = new KafkaProducer<>(producerProps);
    } catch (Exception e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
    startTime = System.currentTimeMillis();
}
项目:oryx2    文件:TopicProducerImpl.java   
private synchronized Producer<K,M> getProducer() {
  // Lazy init; also handles case where object has been serialized and Producer
  // needs to be recreated
  if (producer == null) {
    producer = new KafkaProducer<>(ConfigUtils.keyValueToProperties(
        "bootstrap.servers", updateBroker,
        "key.serializer", "org.apache.kafka.common.serialization.StringSerializer",
        "value.serializer", "org.apache.kafka.common.serialization.StringSerializer",
        "linger.ms", 1000, // Make configurable?
        "batch.size", async ? 1 << 14 : 0,
        "compression.type", "gzip",
        "acks", 1,
        "max.request.size", 1 << 26 // TODO
    ));
  }
  return producer;
}
项目:kafka-streams-example    文件:Producer.java   
public Producer() {
    LOGGER.log(Level.INFO, "Kafka Producer running in thread {0}", Thread.currentThread().getName());
    Properties kafkaProps = new Properties();

    String defaultClusterValue = "localhost:9092";
    String kafkaCluster = System.getProperty(KAFKA_CLUSTER_ENV_VAR_NAME, defaultClusterValue);
    LOGGER.log(Level.INFO, "Kafka cluster {0}", kafkaCluster);

    kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaCluster);
    kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    kafkaProps.put(ProducerConfig.ACKS_CONFIG, "0");

    this.kafkaProducer = new KafkaProducer<>(kafkaProps);

}
项目:Building-Data-Streaming-Applications-with-Apache-Kafka    文件:DemoProducer.java   
public static void main(final String[] args) {
    Properties producerProps = new Properties();
    producerProps.put("bootstrap.servers", "localhost:9092");
    producerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    producerProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    producerProps.put("acks", "all");
    producerProps.put("retries", 1);
    producerProps.put("batch.size", 20000);
    producerProps.put("linger.ms", 1);
    producerProps.put("buffer.memory", 24568545);
    KafkaProducer<String, String> producer = new KafkaProducer<String, String>(producerProps);

    for (int i = 0; i < 2000; i++) {
        ProducerRecord data = new ProducerRecord<String, String>("test1", "Hello this is record " + i);
        Future<RecordMetadata> recordMetadata = producer.send(data);
    }
    producer.close();
}
项目:kafka-docker-demo    文件:ProducerDemo.java   
public static void main(String[] args) throws Exception {

        Properties props = new Properties();

        props.put("bootstrap.servers", "192.168.77.7:9094,192.168.77.7:9093,192.168.77.7:9092");
        props.put("retries", 0);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props);

        for(int i = 0; i < 10; i++) {
            producer.send(new ProducerRecord<>("test", Long.toString(System.currentTimeMillis()), Integer.toString(i)));
            System.out.println("Sent message: " + i);
        }

        producer.close();
    }
项目:spark2.0    文件:KafkaSendMessage.java   
public static void  sendStringMessage() throws Exception{
    Properties props = new Properties();
    props.put("bootstrap.servers", servers);
    props.put("acks", "all");
    props.put("retries", 0);
    props.put("batch.size", 16384);
    props.put("linger.ms", 1);
    props.put("buffer.memory", 33554432);
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

    Producer<String, String> producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props);

    //没有任何分区,默认1个分区,发送消息
    int i=0;
    while(i<1000){
        Thread.sleep(1000L);
        String message = "zhangsan"+i;
        producer.send(new ProducerRecord<>("NL_U_APP_ALARM_APP_STRING",message));
        i++;
        producer.flush();
    }
    producer.close();
}
项目:video-stream-analytics    文件:VideoStreamCollector.java   
public static void main(String[] args) throws Exception {

    // set producer properties
    Properties prop = PropertyFileReader.readPropertyFile();    
    Properties properties = new Properties();
    properties.put("bootstrap.servers", prop.getProperty("kafka.bootstrap.servers"));
    properties.put("acks", prop.getProperty("kafka.acks"));
    properties.put("retries",prop.getProperty("kafka.retries"));
    properties.put("batch.size", prop.getProperty("kafka.batch.size"));
    properties.put("linger.ms", prop.getProperty("kafka.linger.ms"));
    properties.put("max.request.size", prop.getProperty("kafka.max.request.size"));
    properties.put("compression.type", prop.getProperty("kafka.compression.type"));
    properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

    // generate event
    Producer<String, String> producer = new KafkaProducer<String, String>(properties);
    generateIoTEvent(producer,prop.getProperty("kafka.topic"),prop.getProperty("camera.id"),prop.getProperty("camera.url"));
}
项目:iothub    文件:MsgProducer.java   
@PostConstruct
public void init() {
  properties.put("bootstrap.servers", "127.0.0.1:9092");
  properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  properties.put("acks", "-1");
  properties.put("retries", 0);
  properties.put("batch.size", 16384);
  properties.put("linger.ms", 0);
  properties.put("buffer.memory", 33554432);
  try {
    this.producer = new KafkaProducer<>(properties);
  } catch (Exception e) {
    log.error("Failed to start kafka producer", e);
    throw new RuntimeException(e);
  }
  log.info("Kafka Producer is started....");
}
项目:Building-Data-Streaming-Applications-with-Apache-Kafka    文件:IPLogProducer.java   
@Override
public void run() {
    PropertyReader propertyReader = new PropertyReader();

    Properties producerProps = new Properties();
    producerProps.put("bootstrap.servers", propertyReader.getPropertyValue("broker.list"));
    producerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    producerProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    producerProps.put("auto.create.topics.enable", "true");

    KafkaProducer<String, String> ipProducer = new KafkaProducer<String, String>(producerProps);

    BufferedReader br = readFile();
    String oldLine = "";
    try {
        while ((oldLine = br.readLine()) != null) {
            String line = getNewRecordWithRandomIP(oldLine).replace("[", "").replace("]", "");
            ProducerRecord ipData = new ProducerRecord<String, String>(propertyReader.getPropertyValue("topic"), line);
            Future<RecordMetadata> recordMetadata = ipProducer.send(ipData);
        }
    } catch (IOException e) {
        e.printStackTrace();
    }
    ipProducer.close();
}
项目:spring-tutorial    文件:SampleProducer.java   
/**
     * @param args
     */
    public static void main(String[] args) {

        Properties props=new Properties();
        props.put("bootstrap.servers", "localhost:9092,localhost:9093");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String,String> sampleProducer= new KafkaProducer<String,String>(props);

//      ProducerRecord<String, String> record = new ProducerRecord<String, String>(topicName, value);       
//      sampleProducer.send(record);
        for (int i = 0; i < 10; i++)
            sampleProducer.send(new ProducerRecord<String, String>("demo-topic1","Data:"+ Integer.toString(i)));
        sampleProducer.close();
    }
项目:talk-kafka-messaging-logs    文件:ProduceConsumeIntegerStringRecord.java   
private static void produceRecords(String bootstrapServers) {
    Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

    Producer<Integer, String> producer = new KafkaProducer<>(properties);

    IntStream.rangeClosed(1, 100).boxed()
            .map(number ->
                    new ProducerRecord<>(
                            TOPIC,
                            number, //Key
                            String.format("record-%s", number))) //Value
            .forEach(record -> producer.send(record));
    producer.close();
}
项目:j1st-mqtt    文件:KafkaCommunicator.java   
protected void init(AbstractConfiguration config) {
    BROKER_TOPIC_PREFIX = config.getString("communicator.broker.topic");
    APPLICATION_TOPIC = config.getString("communicator.application.topic");

    logger.trace("Initializing Kafka producer ...");

    // producer config
    Properties props = new Properties();
    props.put("bootstrap.servers", config.getString("bootstrap.servers"));
    props.put("acks", config.getString("acks"));
    props.put("key.serializer", StringSerializer.class.getName());
    props.put("value.serializer", InternalMessageSerializer.class.getName());

    // producer
    this.producer = new KafkaProducer<>(props);

    // consumer executor
    this.executor = Executors.newSingleThreadExecutor();
}
项目:kafka-webview    文件:WebKafkaConsumerTest.java   
public void publishDummyDataNumbers() {
    final String topic = "NumbersTopic";

    // Create publisher
    final Map<String, Object> config = new HashMap<>();
    config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
    config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
    config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

    final KafkaProducer<Integer, Integer> producer = new KafkaProducer<>(config);
    for (int value = 0; value < 10000; value++) {
        producer.send(new ProducerRecord<>(topic, value, value));
    }
    producer.flush();
    producer.close();
}
项目:flume-release-1.7.0    文件:KafkaSourceEmbeddedKafka.java   
private void initProducer() {
  Properties props = new Properties();
  props.put("bootstrap.servers", HOST + ":" + serverPort);
  props.put("acks", "1");
  producer = new KafkaProducer<String,byte[]>(props,
          new StringSerializer(), new ByteArraySerializer());
}
项目:kafka-0.11.0.0-src-with-comment    文件:KafkaBasedLog.java   
private Producer<K, V> createProducer() {
    // Always require producer acks to all to ensure durable writes
    producerConfigs.put(ProducerConfig.ACKS_CONFIG, "all");

    // Don't allow more than one in-flight request to prevent reordering on retry (if enabled)
    producerConfigs.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
    return new KafkaProducer<>(producerConfigs);
}
项目:Practical-Real-time-Processing-and-Analytics    文件:DataGenerator.java   
public static void main(String args[]) {
    Properties properties = new Properties();

    properties.put("bootstrap.servers", "localhost:9092");
    properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    properties.put("acks", "1");

    KafkaProducer<Integer, String> producer = new KafkaProducer<Integer, String>(properties);
    int counter =0;
    int nbrOfEventsRequired = Integer.parseInt(args[0]);
    while (counter<nbrOfEventsRequired) {
        StringBuffer stream = new StringBuffer();

        long phoneNumber = ThreadLocalRandom.current().nextLong(9999999950l,
                9999999999l);
        int bin = ThreadLocalRandom.current().nextInt(100000, 9999999);
        int bout = ThreadLocalRandom.current().nextInt(100000, 9999999);

        stream.append(phoneNumber);
        stream.append(",");
        stream.append(bin);
        stream.append(",");
        stream.append(bout);
        stream.append(",");
        stream.append(System.currentTimeMillis());

        System.out.println(stream.toString());
        ProducerRecord<Integer, String> data = new ProducerRecord<Integer, String>(
                "device-data", stream.toString());
        producer.send(data);
        counter++;
    }

    producer.close();
}
项目:Practical-Real-time-Processing-and-Analytics    文件:DataGenerator.java   
public static void main(String args[]) {
    Properties properties = new Properties();

    properties.put("bootstrap.servers", "localhost:9092");
    properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    properties.put("acks", "1");

    KafkaProducer<Integer, String> producer = new KafkaProducer<Integer, String>(properties);
    int counter =0;
    int nbrOfEventsRequired = Integer.parseInt(args[0]);
    while (counter<nbrOfEventsRequired) {
        StringBuffer stream = new StringBuffer();

        long phoneNumber = ThreadLocalRandom.current().nextLong(9999999950l,
                9999999999l);
        int bin = ThreadLocalRandom.current().nextInt(100000, 9999999);
        int bout = ThreadLocalRandom.current().nextInt(100000, 9999999);

        stream.append(phoneNumber);
        stream.append(",");
        stream.append(bin);
        stream.append(",");
        stream.append(bout);
        stream.append(",");
        stream.append(System.currentTimeMillis());

        System.out.println(stream.toString());
        ProducerRecord<Integer, String> data = new ProducerRecord<Integer, String>(
                "device-data", stream.toString());
        producer.send(data);
        counter++;
    }

    producer.close();
}
项目:Practical-Real-time-Processing-and-Analytics    文件:DataGenerator.java   
public static void main(String args[]) {
    Properties properties = new Properties();

    properties.put("bootstrap.servers", "localhost:9092");
    properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    properties.put("acks", "1");

    KafkaProducer<Integer, String> producer = new KafkaProducer<Integer, String>(properties);
    int counter =0;
    int nbrOfEventsRequired = Integer.parseInt(args[0]);
    while (counter<nbrOfEventsRequired) {
        StringBuffer stream = new StringBuffer();

        long phoneNumber = ThreadLocalRandom.current().nextLong(9999999950l,
                9999999999l);
        int bin = ThreadLocalRandom.current().nextInt(100000, 9999999);
        int bout = ThreadLocalRandom.current().nextInt(100000, 9999999);

        stream.append(phoneNumber);
        stream.append(",");
        stream.append(bin);
        stream.append(",");
        stream.append(bout);
        stream.append(",");
        stream.append(System.currentTimeMillis());

        System.out.println(stream.toString());
        ProducerRecord<Integer, String> data = new ProducerRecord<Integer, String>(
                "device-data", stream.toString());
        producer.send(data);
        counter++;
    }

    producer.close();
}
项目:Practical-Real-time-Processing-and-Analytics    文件:DataGenerator.java   
public static void main(String args[]) {
    Properties properties = new Properties();

    properties.put("bootstrap.servers", "localhost:9092");
    properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    properties.put("acks", "1");

    KafkaProducer<Integer, String> producer = new KafkaProducer<Integer, String>(properties);
    int counter =0;
    int nbrOfEventsRequired = Integer.parseInt(args[0]);
    while (counter<nbrOfEventsRequired) {
        StringBuffer stream = new StringBuffer();

        long phoneNumber = ThreadLocalRandom.current().nextLong(9999999950l,
                9999999960l);
        int bin = ThreadLocalRandom.current().nextInt(1000, 9999);
        int bout = ThreadLocalRandom.current().nextInt(1000, 9999);

        stream.append(phoneNumber);
        stream.append(",");
        stream.append(bin);
        stream.append(",");
        stream.append(bout);
        stream.append(",");
        stream.append(new Date(ThreadLocalRandom.current().nextLong()));

        System.out.println(stream.toString());
        ProducerRecord<Integer, String> data = new ProducerRecord<Integer, String>(
                "storm-trident-diy", stream.toString());
        producer.send(data);
        counter++;
    }

    producer.close();
}
项目:bullet-kafka    文件:KafkaPubSub.java   
/**
 * Get all partitions for a given topic.
 *
 * @param topicName The topic to get partitions for.
 * @return {@link List} of {@link TopicPartition} values corresponding to the topic.
 */
List<TopicPartition> getAllPartitions(KafkaProducer<String, byte[]> dummy, String topicName) {
    List<TopicPartition> partitions = dummy.partitionsFor(topicName)
                                           .stream().map(i -> new TopicPartition(i.topic(), i.partition()))
                                           .collect(Collectors.toList());
    dummy.close();
    return partitions;
}
项目:Practical-Real-time-Processing-and-Analytics    文件:VehicleStartPointGenerator.java   
private static KafkaProducer<Integer, String> configureKafka() {
    Properties properties = new Properties();

    properties.put("bootstrap.servers", BROKER_1_CONNECTION_STRING);
    properties.put("key.serializer", StringSerializer.class.getName());
    properties.put("value.serializer", StringSerializer.class.getName());
    properties.put("auto.offset.reset", "smallest");
    properties.put("acks", "1");

    KafkaProducer<Integer, String> producer = new KafkaProducer<Integer, String>(properties);
    return producer;
}
项目:apache-kafka-demos    文件:SimpleProducer.java   
public static void main(String[] args) throws InterruptedException {

        Properties props = new Properties();
        props.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ACKS_CONFIG, "all");
        props.put(RETRIES_CONFIG, 0);
        props.put(BATCH_SIZE_CONFIG, 25000); // nicht warten
        props.put(LINGER_MS_CONFIG, 200);
        props.put(BUFFER_MEMORY_CONFIG, 33554432);
        props.put(KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");


        StringBuffer muellBuffer = new StringBuffer();
        for(int j = 0; j < 100000; j++) {
            muellBuffer.append(j);
        }
        String muell = muellBuffer.toString();

        Producer<String, String> producer = new KafkaProducer<>(props);

        System.out.println("Start sending!");
        for(int i = 1; i <= 10000; i++) {
            int key = i % 10;

            producer.send(new ProducerRecord<>("produktion", Integer.toString(i % 12), muell));

            if(i % 500 == 0) {
                Thread.sleep(1000);
                System.out.println("i = " + i);
            }
        }
        System.out.println("done!");

        producer.close();
    }
项目:kalinka    文件:KafkaProducerClient.java   
public KafkaProducerClient(final List<String> clients, final long intervalInMillis) {
    this.clients = clients;
    this.intervalInMillis = intervalInMillis;
    final Map<String, Object> props = new HashMap<>();
    props.put("bootstrap.servers", kafkaHosts);
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
    producer = new KafkaProducer<>(props);
}
项目:Practical-Real-time-Processing-and-Analytics    文件:DataGenerator.java   
public static void main(String args[]) {
    Properties properties = new Properties();

    properties.put("bootstrap.servers", "localhost:9092");
    properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    properties.put("acks", "1");

    KafkaProducer<Integer, String> producer = new KafkaProducer<Integer, String>(properties);
    int counter =0;
    int nbrOfEventsRequired = Integer.parseInt(args[0]);
    while (counter<nbrOfEventsRequired) {
        StringBuffer stream = new StringBuffer();

        long phoneNumber = ThreadLocalRandom.current().nextLong(9999999950l,
                9999999999l);
        int bin = ThreadLocalRandom.current().nextInt(1000, 9999);
        int bout = ThreadLocalRandom.current().nextInt(1000, 9999);

        stream.append(phoneNumber);
        stream.append(",");
        stream.append(bin);
        stream.append(",");
        stream.append(bout);
        stream.append(",");
        stream.append(new Date(ThreadLocalRandom.current().nextLong()));

        System.out.println(stream.toString());
        ProducerRecord<Integer, String> data = new ProducerRecord<Integer, String>(
                "storm-diy", stream.toString());
        producer.send(data);
        counter++;
    }

    producer.close();
}
项目:oryx2    文件:TopicProducerImpl.java   
private synchronized Producer<K,M> getProducer() {
  // Lazy init
  if (producer == null) {
    producer = new KafkaProducer<>(ConfigUtils.keyValueToProperties(
        "bootstrap.servers", updateBroker,
        "key.serializer", "org.apache.kafka.common.serialization.StringSerializer",
        "value.serializer", "org.apache.kafka.common.serialization.StringSerializer",
        "linger.ms", 1000, // Make configurable?
        "compression.type", "gzip",
        "acks", 1,
        "max.request.size", 1 << 26 // TODO
    ));
  }
  return producer;
}
项目:eventapis    文件:KafkaOperationRepositoryFactory.java   
public KafkaOperationRepository createKafkaOperationRepository(ObjectMapper objectMapper) {
    KafkaProducer<String, Operation> operationsKafka = new KafkaProducer<>(
            kafkaProperties.buildProducerProperties(), new StringSerializer(), new JsonSerializer<>(objectMapper));
    KafkaProducer<String, PublishedEventWrapper> eventsKafka = new KafkaProducer<>(kafkaProperties.buildProducerProperties(),
            new StringSerializer(), new JsonSerializer<>(objectMapper));
    return new KafkaOperationRepository(operationsKafka, eventsKafka, kafkaProperties.getConsumer().getGroupId());
}