@Override public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) { /* // Create wrappedRecord because headers can be read only in record (if record is sent second time) ProducerRecord<K, V> wrappedRecord = new ProducerRecord<>(record.topic(), record.partition(), record.timestamp(), record.key(), record.value(), record.headers()); */ try (Scope scope = buildAndInjectSpan(record)) { Callback wrappedCallback = new TracingCallback(callback, scope); return producer.send(record, wrappedCallback); } }
@Ignore @Test public void testWithString() throws Exception { KafkaComponent kafka = createComponent(); Mono<List<String>> receive = Flux.from(kafka.from(TOPIC2, String.class)) .take(2) .collectList(); Subscriber<ProducerRecord> toTopic = kafka.to(TOPIC2, ProducerRecord.class); Flux.just(new ProducerRecord<String, String>(TOPIC2, "1", "test"), new ProducerRecord<String, String>(TOPIC2, "1", "test2")) .subscribe(toTopic); List<String> received = receive.block(Duration.ofSeconds(10)); Assert.assertEquals(2, received.size()); Assert.assertEquals("test", received.get(0)); Assert.assertEquals("test2", received.get(1)); }
@Ignore @Test public void testWithConsumerRecord() throws Exception { KafkaComponent kafka = createComponent(); Mono<List<Object>> receive = Flux.from(kafka.from(TOPIC1, ConsumerRecord.class)) .map(record -> record.value()) .take(2) .collectList(); Subscriber<ProducerRecord> toTopic = kafka.to(TOPIC1, ProducerRecord.class); Flux.just(new ProducerRecord<String, String>(TOPIC1, "1", "test"), new ProducerRecord<String, String>(TOPIC1, "1", "test2")) .subscribe(toTopic); List<Object> received = receive.block(Duration.ofSeconds(10)); Assert.assertEquals(2, received.size()); Assert.assertEquals("test", received.get(0)); }
public void run() { int messageNo = 1; while (true) { String messageStr = "Message_" + messageNo; long startTime = System.currentTimeMillis(); if (isAsync) { // Send asynchronously producer.send(new ProducerRecord<>(topic, messageNo, messageStr), new DemoCallBack(startTime, messageNo, messageStr)); } else { // Send synchronously try { producer.send(new ProducerRecord<>(topic, messageNo, messageStr)).get(); System.out.println("Sent message: (" + messageNo + ", " + messageStr + ")"); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } ++messageNo; } }
@Override public <K, V, E> boolean send(Producer<K, V> producer, ProducerRecord<K, V> record, final E event, final FailedDeliveryCallback<E> failedDeliveryCallback) { try { producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception != null) { failedDeliveryCallback.onFailedDelivery(event, exception); } } }); return true; } catch (BufferExhaustedException e) { failedDeliveryCallback.onFailedDelivery(event, e); return false; } }
public void start() throws InterruptedException { RandomGenerator random = RandomManager.getRandom(); Properties props = ConfigUtils.keyValueToProperties( "bootstrap.servers", "localhost:" + kafkaPort, "key.serializer", "org.apache.kafka.common.serialization.StringSerializer", "value.serializer", "org.apache.kafka.common.serialization.StringSerializer", "compression.type", "gzip", "batch.size", 0, "acks", 1, "max.request.size", 1 << 26 // TODO ); try (Producer<String,String> producer = new KafkaProducer<>(props)) { for (int i = 0; i < howMany; i++) { Pair<String,String> datum = datumGenerator.generate(i, random); ProducerRecord<String,String> record = new ProducerRecord<>(topic, datum.getFirst(), datum.getSecond()); producer.send(record); log.debug("Sent datum {} = {}", record.key(), record.value()); if (intervalMsec > 0) { Thread.sleep(intervalMsec); } } } }
@Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { switch (method.getName()) { case SEND_METHOD_NAME: { ProducerRecord record = (ProducerRecord) args[0]; args[0] = new ProducerRecord<>( BaseIntegrationTest.adjustTopicNameForTest(record.topic()), record.partition(), record.timestamp(), record.key(), record.value() ); break; } case PARTITIONS_FOR_METHOD_NAME: { args[0] = BaseIntegrationTest.adjustTopicNameForTest((String) args[0]); break; } } return method.invoke(producer, args); }
public static void main(String[] args) { Map<String, Object> config = new HashMap<String, Object>(); config.put("partitioner.class", "com.wngn.kafka.SimpleKeyPartition"); LatestProducer producer = LatestProducer.getInstance(ProducerConstants.TOPIC_KAFKA_TEST, config); ProducerRecord<String, String> record = null; long index = 0L; boolean controller = true; while (controller) { controller = false; index++; System.out.println(index + "------------"); try { String message = "message_" + index; RecordMetadata recordMetadata = producer.sendWithSync("1", message); System.out.format("PARTITION: %d OFFSET: %d\n", recordMetadata.partition(), recordMetadata.offset()); } catch (Exception e) { e.printStackTrace(); } } producer.close(); }
@Test public void shouldNotBeLoggingEnabledStoreWhenLoggingNotEnabled() throws Exception { store = createStore(false, false); final List<ProducerRecord> logged = new ArrayList<>(); final NoOpRecordCollector collector = new NoOpRecordCollector() { @Override public <K, V> void send(final String topic, K key, V value, Integer partition, Long timestamp, Serializer<K> keySerializer, Serializer<V> valueSerializer) { logged.add(new ProducerRecord<K, V>(topic, partition, timestamp, key, value)); } }; final MockProcessorContext context = new MockProcessorContext(TestUtils.tempDirectory(), Serdes.String(), Serdes.String(), collector, cache); context.setTime(1); store.init(context, store); store.put("a", "b"); assertTrue(logged.isEmpty()); }
public ConsumerRecords<K, V> nextBatch() { List<Future<RecordMetadata>> markerSends = new ArrayList<>(); // 1. Get messages from topic, in batches ConsumerRecords<K, V> records = msgConsumer.poll(msgPollTimeout); for (ConsumerRecord<K, V> record : records) { // 2. Write a "start" marker. Collecting the future responses. markerSends.add(markerProducer.send( new ProducerRecord<>(config.getMarkerTopic(), MarkerKey.fromRecord(record), new StartMarker(config.getMsgTimeoutMs())))); } // Waiting for a confirmation that each start marker has been sent markerSends.forEach(f -> { try { f.get(); } catch (Exception e) { throw new RuntimeException(e); } }); // 3. after all start markers are sent, commit offsets. This needs to be done as close to writing the // start marker as possible, to minimize the number of double re-processed messages in case of failure. msgConsumer.commitSync(); return records; }
private static void produceRecords(String bootstrapServers) { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); Producer<Long, byte[]> producer = new KafkaProducer<>(properties); LongStream.rangeClosed(1, 100).boxed() .map(number -> new ProducerRecord<>( TOPIC, //topic number, //key String.format("record-%s", number.toString()).getBytes())) //value .forEach(record -> producer.send(record)); producer.close(); }
public void publish(BrokerStats brokerStats) throws IOException { try { ByteArrayOutputStream stream = new ByteArrayOutputStream(); BinaryEncoder binaryEncoder = avroEncoderFactory.binaryEncoder(stream, null); avroEventWriter.write(brokerStats, binaryEncoder); binaryEncoder.flush(); IOUtils.closeQuietly(stream); String key = brokerStats.getName() + "_" + System.currentTimeMillis(); int numPartitions = kafkaProducer.partitionsFor(destTopic).size(); int partition = brokerStats.getId() % numPartitions; Future<RecordMetadata> future = kafkaProducer.send( new ProducerRecord(destTopic, partition, key.getBytes(), stream.toByteArray())); future.get(); OpenTsdbMetricConverter.incr("kafka.stats.collector.success", 1, "host=" + HOSTNAME); } catch (Exception e) { LOG.error("Failure in publish stats", e); OpenTsdbMetricConverter.incr("kafka.stats.collector.failure", 1, "host=" + HOSTNAME); throw new RuntimeException("Avro serialization failure", e); } }
public static void sendStringMessage() throws Exception{ Properties props = new Properties(); props.put("bootstrap.servers", servers); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props); //没有任何分区,默认1个分区,发送消息 int i=0; while(i<1000){ Thread.sleep(1000L); String message = "zhangsan"+i; producer.send(new ProducerRecord<>("NL_U_APP_ALARM_APP_STRING",message)); i++; producer.flush(); } producer.close(); }
private void checkMulticast(KafkaUsage usage, String topic, KafkaSource<Integer> source) { List<Integer> resultsA = new ArrayList<>(); List<Integer> resultsB = new ArrayList<>(); source .transformPayload(i -> i + 1) .to(Sink.forEachPayload(resultsB::add)); source .transformPayload(i -> i + 1) .to(Sink.forEachPayload(resultsA::add)); AtomicInteger counter = new AtomicInteger(); usage.produceIntegers(10, null, () -> new ProducerRecord<>(topic, counter.getAndIncrement())); await().atMost(1, TimeUnit.MINUTES).until(() -> resultsA.size() >= 10); await().atMost(1, TimeUnit.MINUTES).until(() -> resultsB.size() >= 10); assertThat(resultsA).containsExactly(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); assertThat(resultsB).containsExactly(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); }
@Test public void testSource() throws InterruptedException { KafkaUsage usage = new KafkaUsage(); String topic = UUID.randomUUID().toString(); List<Integer> results = new ArrayList<>(); KafkaSource<Integer> source = new KafkaSource<>(vertx, getKafkaConfig() .put("topic", topic) .put("value.serializer", IntegerSerializer.class.getName()) .put("value.deserializer", IntegerDeserializer.class.getName()) ); source .transformPayload(i -> i + 1) .to(Sink.forEachPayload(results::add)); AtomicInteger counter = new AtomicInteger(); usage.produceIntegers(10, null, () -> new ProducerRecord<>(topic, counter.getAndIncrement())); await().atMost(1, TimeUnit.MINUTES).until(() -> results.size() >= 10); assertThat(results).containsExactly(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); }
@Test public void shouldCreateLoggingEnabledStoreWhenWindowStoreLogged() throws Exception { store = createStore(true, false); final List<ProducerRecord> logged = new ArrayList<>(); final NoOpRecordCollector collector = new NoOpRecordCollector() { @Override public <K, V> void send(final String topic, K key, V value, Integer partition, Long timestamp, Serializer<K> keySerializer, Serializer<V> valueSerializer) { logged.add(new ProducerRecord<K, V>(topic, partition, timestamp, key, value)); } }; final MockProcessorContext context = new MockProcessorContext(TestUtils.tempDirectory(), Serdes.String(), Serdes.String(), collector, cache); context.setTime(1); store.init(context, store); store.put("a", "b"); assertFalse(logged.isEmpty()); }
@Override public void send(Long k, byte[] v) { KafkaProducer<Long, byte[]> p = getWorker(); p.initTransactions(); p.beginTransaction(); Future<RecordMetadata> res = worker.send(new ProducerRecord<Long, byte[]>(topic, k, v)); RecordMetadata record; try { record = res.get(); offsets.clear(); offsets.put(new TopicPartition(topic, record.partition()), new OffsetAndMetadata(record.offset())); p.sendOffsetsToTransaction(offsets, MallConstants.ORDER_GROUP); p.commitTransaction(); } catch (InterruptedException | ExecutionException e) { p.abortTransaction(); } }
@Test public void shouldWriteThenRead() throws Exception { //Create a consumer ConsumerIterator<String, String> it = buildConsumer(Original.topic); //Create a producer producer = new KafkaProducer<>(producerProps()); //send a message producer.send(new ProducerRecord<>(Original.topic, "message")).get(); //read it back MessageAndMetadata<String, String> messageAndMetadata = it.next(); String value = messageAndMetadata.message(); assertThat(value, is("message")); }
@Test public void shouldWriteThenRead() throws Exception { //Create a consumer ConsumerIterator<String, String> it = buildConsumer(SimpleKafkaTest.topic); //Create a producer producer = new KafkaProducer<>(producerProps()); //send a message producer.send(new ProducerRecord<>(SimpleKafkaTest.topic, "message")).get(); //read it back MessageAndMetadata<String, String> messageAndMetadata = it.next(); String value = messageAndMetadata.message(); assertThat(value, is("message")); }
public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.LongSerializer"); props.put("linger.ms", 0); KafkaProducer<String, Long> producer = new KafkaProducer<>(props); for(int i=0; i < 10000; i++){ String ip = "127.0.0." + i % 10; System.out.println(ip); producer.send(new ProducerRecord<>("visits", ip, System.currentTimeMillis() + i)); } producer.close(); }
@Override public void run() { PropertyReader propertyReader = new PropertyReader(); Properties producerProps = new Properties(); producerProps.put("bootstrap.servers", propertyReader.getPropertyValue("broker.list")); producerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); producerProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); producerProps.put("auto.create.topics.enable", "true"); KafkaProducer<String, String> ipProducer = new KafkaProducer<String, String>(producerProps); BufferedReader br = readFile(); String oldLine = ""; try { while ((oldLine = br.readLine()) != null) { String line = getNewRecordWithRandomIP(oldLine).replace("[", "").replace("]", ""); ProducerRecord ipData = new ProducerRecord<String, String>(propertyReader.getPropertyValue("topic"), line); Future<RecordMetadata> recordMetadata = ipProducer.send(ipData); } } catch (IOException e) { e.printStackTrace(); } ipProducer.close(); }
@Test public void shouldCreateLoggingEnabledStoreWhenStoreLogged() throws Exception { store = createStore(true, false); final List<ProducerRecord> logged = new ArrayList<>(); final NoOpRecordCollector collector = new NoOpRecordCollector() { @Override public <K, V> void send(final String topic, K key, V value, Integer partition, Long timestamp, Serializer<K> keySerializer, Serializer<V> valueSerializer) { logged.add(new ProducerRecord<K, V>(topic, partition, timestamp, key, value)); } }; final MockProcessorContext context = new MockProcessorContext(TestUtils.tempDirectory(), Serdes.String(), Serdes.String(), collector, cache); context.setTime(1); store.init(context, store); store.put("a", "b"); assertFalse(logged.isEmpty()); }
@Override public void run() { final Properties producerConfig = new Properties(); producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); producerConfig.put(ProducerConfig.ACKS_CONFIG, "all"); producerConfig.put(ProducerConfig.RETRIES_CONFIG, 10); producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); try (final KafkaProducer<String, String> producer = new KafkaProducer<>(producerConfig, new StringSerializer(), new StringSerializer())) { while (getCurrIteration() < numIterations && !shutdown) { for (final String value : inputValues) { producer.send(new ProducerRecord<String, String>(topic, value)); } incrementInteration(); } } }
public static void main(String[] args) { ExecutorService executor = Executors.newFixedThreadPool(3); StockQuotationInfo quotationInfo = null; ProducerRecord<String, String> record = null; try { int num = 0; for (int i = 0; i < MSG_SIZE; i++) { quotationInfo = createQuotationInfo(); record = new ProducerRecord<String, String>(TOPIC, null, quotationInfo.getTradeTime(), quotationInfo .getStockCode(), quotationInfo.toString()); executor.execute(new KafkaProducerThread(producer, record)); } Thread.sleep(2000L); } catch (Exception e) { logger.error("Send message occurs exception", e); } finally { producer.close(); executor.shutdown(); } }
private Optional<ProducerRecord<String, String>> buildMembershipMessage(final String topic, final IRI resource, final Resource parent, final Dataset dataset) throws Exception { try (final Dataset data = rdf.createDataset()) { if (DirectContainer.equals(parent.getInteractionModel())) { parent.getMembershipResource().ifPresent(member -> { parent.getMemberRelation().ifPresent(relation -> data.add(rdf.createQuad(PreferMembership, member, relation, resource))); parent.getMemberOfRelation().ifPresent(relation -> data.add(rdf.createQuad(PreferMembership, resource, relation, member))); }); } else if (IndirectContainer.equals(parent.getInteractionModel())) { parent.getMembershipResource().ifPresent(member -> parent.getMemberRelation().ifPresent(relation -> parent.getInsertedContentRelation().ifPresent(inserted -> dataset.stream(of(PreferUserManaged), null, inserted, null).sequential().forEachOrdered(q -> data.add(rdf.createQuad(PreferMembership, member, relation, q.getObject())))))); } final Optional<String> key = data.stream(of(PreferMembership), null, null, null).map(Quad::getSubject) .filter(x -> x instanceof IRI).map(x -> (IRI) x).map(IRI::getIRIString).findFirst(); if (key.isPresent()) { dataset.stream(of(PreferAudit), null, null, null).map(auditTypeMapper).forEachOrdered(data::add); return of(new ProducerRecord<>(topic, key.get(), serialize(data))); } return empty(); } }
/** * @param args */ public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092,localhost:9093"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String, String> sampleProducer = new KafkaProducer<String, String>(props); ProducerRecord<String, String> record = new ProducerRecord<String, String>(topicName,key, value); try { RecordMetadata metaData = sampleProducer.send(record).get(); System.out.printf("Message sent to Partition No. %d and offset %d", metaData.partition(), metaData.offset()); } catch (Exception e) { LOGGER.error("exception occured while sending message to broker", e); } finally { sampleProducer.close(); } }
/** * @param args */ public static void main(String[] args) { Properties props=new Properties(); props.put("bootstrap.servers", "localhost:9092,localhost:9093"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String,String> sampleProducer= new KafkaProducer<String,String>(props); // ProducerRecord<String, String> record = new ProducerRecord<String, String>(topicName, value); // sampleProducer.send(record); for (int i = 0; i < 10; i++) sampleProducer.send(new ProducerRecord<String, String>("demo-topic1","Data:"+ Integer.toString(i))); sampleProducer.close(); }
@Test(expected = InvalidRecordException.class) public void testSendRecordsCorruptTimestamp() throws Exception { final Long timestamp = -3L; createWorkerTask(); List<SourceRecord> records = Collections.singletonList( new SourceRecord(PARTITION, OFFSET, "topic", null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD, timestamp) ); Capture<ProducerRecord<byte[], byte[]>> sent = expectSendRecordAnyTimes(); PowerMock.replayAll(); Whitebox.setInternalState(workerTask, "toSend", records); Whitebox.invokeMethod(workerTask, "sendRecords"); assertEquals(null, sent.getValue().timestamp()); PowerMock.verifyAll(); }
public void publishDummyData() { final String topic = "TestTopic"; // Create publisher final Map<String, Object> config = new HashMap<>(); config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); final KafkaProducer<String, String> producer = new KafkaProducer<>(config); for (int charCode = 65; charCode < 91; charCode++) { final char[] key = new char[1]; key[0] = (char) charCode; producer.send(new ProducerRecord<>(topic, new String(key), new String(key))); } producer.flush(); producer.close(); }
public void publishDummyDataNumbers() { final String topic = "NumbersTopic"; // Create publisher final Map<String, Object> config = new HashMap<>(); config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); final KafkaProducer<Integer, Integer> producer = new KafkaProducer<>(config); for (int value = 0; value < 10000; value++) { producer.send(new ProducerRecord<>(topic, value, value)); } producer.flush(); producer.close(); }
public static void main(String[] args) { logger.info("---------"); ProducerRecord<String, AvroStockQuotation> record = null; AvroStockQuotation quotation = null; try { int num = 0; for (int i = 0; i < MSG_SIZE; i++) { quotation = createQuotationInfo(); record = new ProducerRecord<String, AvroStockQuotation>(KafkaConstants.TOPIC_AVRO_STOCK_QUOTATION, (String) quotation.getStockCode(), quotation); QuotationCallback callback = new QuotationCallback(); System.out.println("--------" + num); producer.send(record); if (num++ % 10 == 0) { Thread.sleep(2000L); } } } catch (InterruptedException e) { logger.error("Send message occurs exception", e); } finally { producer.close(); } }
private void processSingleRecord(List<Long> txIds, ConsumerRecord<ByteBuffer, ByteBuffer> record) { long txId = record.timestamp(); boolean found = txIds.remove(txId); if (found) { ProducerRecord<ByteBuffer, ByteBuffer> producerRecord = new ProducerRecord<>(clusterConfig.getGapTopic(), record.key(), record.value()); producer.send(producerRecord); } }
/** * @param topic Kafka topic to write the data records to * @param records Data records to write to Kafka * @param producerConfig Kafka producer configuration * @param <K> Key type of the data records * @param <V> Value type of the data records */ public static <K, V> void produceKeyValuesSynchronously( String topic, Collection<KeyValue<K, V>> records, Properties producerConfig) throws ExecutionException, InterruptedException { Producer<K, V> producer = new KafkaProducer<>(producerConfig); for (KeyValue<K, V> record : records) { Future<RecordMetadata> f = producer.send( new ProducerRecord<>(topic, record.key, record.value)); f.get(); } producer.flush(); producer.close(); }
private synchronized void finishSuccessfulFlush() { // If we were successful, we can just swap instead of replacing items back into the original map IdentityHashMap<ProducerRecord<byte[], byte[]>, ProducerRecord<byte[], byte[]>> temp = outstandingMessages; outstandingMessages = outstandingMessagesBacklog; outstandingMessagesBacklog = temp; flushing = false; }
/** * Read the next record from the given topic. These records were output by the topology during the previous calls to * {@link #process(String, byte[], byte[])}. * * @param topic the name of the topic * @param keyDeserializer the deserializer for the key type * @param valueDeserializer the deserializer for the value type * @return the next record on that topic, or null if there is no record available */ public <K, V> ProducerRecord<K, V> readOutput(final String topic, final Deserializer<K> keyDeserializer, final Deserializer<V> valueDeserializer) { final ProducerRecord<byte[], byte[]> record = readOutput(topic); if (record == null) { return null; } final K key = keyDeserializer.deserialize(record.topic(), record.key()); final V value = valueDeserializer.deserialize(record.topic(), record.value()); return new ProducerRecord<>(record.topic(), record.partition(), record.timestamp(), key, value); }
/** Produce a message with given key and value. */ public void send(String key, String value) { ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, key, value); numSent++; try { producer.send(record, new PrintInfoCallback(key, value)); } catch (Exception e) { synchronized (System.out) { System.out.println(errorString(e, key, value, System.currentTimeMillis())); } } }
public void writeTransactions(TransactionWrapper txWrapper) { long transactionId = txWrapper.id(); int partition = TransactionMessageUtil.partitionFor(txWrapper.deserializedMetadata(), partitions); producer.send(new ProducerRecord<>(topic, partition, transactionId, txWrapper.metadata(), txWrapper.data()), new LoggingErrorHandler()); }
public static void main(String args[]) { Properties properties = new Properties(); properties.put("bootstrap.servers", "localhost:9092"); properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("acks", "1"); KafkaProducer<Integer, String> producer = new KafkaProducer<Integer, String>(properties); int counter =0; int nbrOfEventsRequired = Integer.parseInt(args[0]); while (counter<nbrOfEventsRequired) { StringBuffer stream = new StringBuffer(); long phoneNumber = ThreadLocalRandom.current().nextLong(9999999950l, 9999999960l); int bin = ThreadLocalRandom.current().nextInt(1000, 9999); int bout = ThreadLocalRandom.current().nextInt(1000, 9999); stream.append(phoneNumber); stream.append(","); stream.append(bin); stream.append(","); stream.append(bout); stream.append(","); stream.append(new Date(ThreadLocalRandom.current().nextLong())); System.out.println(stream.toString()); ProducerRecord<Integer, String> data = new ProducerRecord<Integer, String>( "storm-trident-diy", stream.toString()); producer.send(data); counter++; } producer.close(); }
/** * 心跳检测开始 */ public void heartbeatStart() { // 心跳检测定时器初始化 this.timer = new Timer(); this.timer.schedule(new TimerTask() { @Override public void run() { byte[] key = ByteBuffer.allocate(4).putInt(Constants.HEARTBEAT_KEY.hashCode()).array(); final ProducerRecord<byte[], String> record = new ProducerRecord<>(topic, key, Constants.HEARTBEAT_VALUE); // java 8 lambda // LazySingletonProducer.getInstance(config).send(record, (RecordMetadata recordMetadata, Exception e) -> { // logic code // }); LazySingletonProducer.getInstance(config).send(record, new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (null == e) { // 更新flag状态 flag.compareAndSet(false, true); // 如果没有发生异常, 说明kafka从异常状态切换为正常状态, 将开始状态设置为true started = true; addStatus(new InfoStatus("kafka send normal in appender", this, e)); // 关闭心跳检测机制 KafkaAppender.this.heartbeatStop(); zkRegister.write(Constants.SLASH + app + Constants.SLASH + host, NodeMode.EPHEMERAL, String.valueOf(Constants.APP_APPENDER_RESTART_KEY + Constants.SEMICOLON + System.currentTimeMillis()) + Constants.SEMICOLON + SysUtil.userDir); } } }); } }, 10000,60000); }
public static void main(String[] args) throws InterruptedException { Properties props = new Properties(); props.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ACKS_CONFIG, "all"); props.put(RETRIES_CONFIG, 0); props.put(BATCH_SIZE_CONFIG, 0); props.put(LINGER_MS_CONFIG, 0); props.put(BUFFER_MEMORY_CONFIG, 33554432); props.put(KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<>(props); JsonObject json = Json.createObjectBuilder() .add("windrad", 6) .add("kw/h",33) .build(); String msg= json.toString(); long t1 = System.currentTimeMillis(); for(int i = 1; i <= 10; i++) { String key = String.valueOf(round(random() * 1000)); double value = new Double(round(random()*10000000L)).intValue()/1000.0; producer.send(new ProducerRecord<>("produktion", key,msg )); } System.out.println("Zeit: " + (System.currentTimeMillis() - t1)/1000 + "s"); producer.close(); }