private void sendDataToKafka(long batchId, byte[] data, Tuple input) { @SuppressWarnings("rawtypes") ProducerRecord record = new ProducerRecord<>(outputTopic, "", data); producer.send(record, new Callback() { public void onCompletion(RecordMetadata metadata, Exception e) { synchronized (collector) { if (e != null) { collector.fail(input); logger.error("kafka ack failed to the message which batchId is " + batchId, e); } else { collector.ack(input); logger.debug("kafka ack to the message which batchId is " + batchId, e); } }} }); }
@Override public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) { /* // Create wrappedRecord because headers can be read only in record (if record is sent second time) ProducerRecord<K, V> wrappedRecord = new ProducerRecord<>(record.topic(), record.partition(), record.timestamp(), record.key(), record.value(), record.headers()); */ try (Scope scope = buildAndInjectSpan(record)) { Callback wrappedCallback = new TracingCallback(callback, scope); return producer.send(record, wrappedCallback); } }
@Test public void test() throws Exception { Producer<Integer, String> producer = createProducer(); // Send 1 producer.send(new ProducerRecord<>("messages", 1, "test")); // Send 2 producer.send(new ProducerRecord<>("messages", 1, "test"), new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { assertEquals("messages", metadata.topic()); } }); final CountDownLatch latch = new CountDownLatch(2); createConsumer(latch, 1); producer.close(); List<MockSpan> mockSpans = mockTracer.finishedSpans(); assertEquals(4, mockSpans.size()); checkSpans(mockSpans); assertNull(mockTracer.activeSpan()); }
/** * Send a CruiseControlMetric to the Kafka topic. * @param ccm the Cruise Control metric to send. */ public void sendCruiseControlMetric(CruiseControlMetric ccm) { // Use topic name as key if existing so that the same sampler will be able to collect all the information // of a topic. String key = ccm.metricClassId() == CruiseControlMetric.MetricClassId.TOPIC_METRIC ? ((TopicMetric) ccm).topic() : Integer.toString(ccm.brokerId()); ProducerRecord<String, CruiseControlMetric> producerRecord = new ProducerRecord<>(_cruiseControlMetricsTopic, null, ccm.time(), key, ccm); LOG.debug("Sending Cruise Control metric {}.", ccm); _producer.send(producerRecord, new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e != null) { LOG.warn("Failed to send Cruise Control metric {}", ccm); _numMetricSendFailure++; } } }); }
@Before public void setUp() { super.setUp(); Properties props = new Properties(); props.setProperty(ProducerConfig.ACKS_CONFIG, "-1"); AtomicInteger failed = new AtomicInteger(0); try (Producer<String, String> producer = createProducer(props)) { for (int i = 0; i < 10; i++) { producer.send(new ProducerRecord<>("TestTopic", Integer.toString(i)), new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e != null) { failed.incrementAndGet(); } } }); } } assertEquals(0, failed.get()); }
/*** * send stat info to statistic topic, do not care about success or not. * @param message */ private void sendTableStatInfo(StatMessage message) { String key = String.format("%s.%s.%s.%s.%s", message.getDsName(), message.getSchemaName(), message.getTableName(), message.getType(), message.getTxTimeMS()); String value = message.toJSONString(); Callback callback = new Callback() { @Override public void onCompletion(RecordMetadata ignored, Exception e) { if (e != null) { logger.error(String.format("Send statistic FAIL: toTopic=%s, key=%s", statTopic, key)); } else { logger.info(String.format(" Send statistic successful: toTopic=%s, key=(%s)", statTopic, key)); } } }; Future<RecordMetadata> result = producer.send(new ProducerRecord<>(statTopic, key, value), callback); }
@SuppressWarnings("unchecked") private void sendMessageToKafka(String key, DbusMessage dbusMessage, AtomicLong sendCnt, AtomicLong recvCnt, AtomicBoolean isError) throws Exception{ if(stringProducer == null) { throw new Exception("producer is null, can't send to kafka!"); } ProducerRecord record = new ProducerRecord<>(resultTopic, key, dbusMessage.toString()); sendCnt.getAndIncrement(); stringProducer.send(record, new Callback() { public void onCompletion(RecordMetadata metadata, Exception e) { if (e != null) { e.printStackTrace(); isError.set(true); }else{ recvCnt.getAndIncrement(); } } }); }
@Override public void run() { System.out.println("Producing to topic " + topic); String numPartitions = System.getenv().getOrDefault("NUM_PARTITIONS", "1"); System.out.println("Total Partitions " + numPartitions); while (true) { try { producer.send(new ProducerRecord<>(topic, "key-" + rnd.nextInt(10), "val-" + rnd.nextInt(10)), new Callback() { @Override public void onCompletion(RecordMetadata record, Exception excptn) { System.out.println("Sent data to Offset " + record.offset() + " in Partition " + record.partition()); } }); Thread.sleep(Long.valueOf(producerPause)); } catch (Exception ex) { Logger.getLogger(Producer.class.getName()).log(Level.SEVERE, null, ex); } } }
@Override public <K, V, E> boolean send(Producer<K, V> producer, ProducerRecord<K, V> record, final E event, final FailedDeliveryCallback<E> failedDeliveryCallback) { try { producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception != null) { failedDeliveryCallback.onFailedDelivery(event, exception); } } }); return true; } catch (BufferExhaustedException e) { failedDeliveryCallback.onFailedDelivery(event, e); return false; } }
@Override public Future<RecordMetadata> send(final ProducerRecord<K, V> record, final Callback callback) { return Retries.tryMe(new IgniteClosure<RetryCallableAsyncOnCallback, Future<RecordMetadata>>() { @Override public Future<RecordMetadata> apply(final RetryCallableAsyncOnCallback retryCallableAsyncOnCallback) { return inner.send(record, new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { callback.onCompletion(metadata, exception); if (exception != null) { retryCallableAsyncOnCallback.retry(exception); } } }); } }); }
@Test public void putSafeWithNoPreviousValueIsPropagated() { final Converter converter = mock(Converter.class); final KafkaBasedLog<String, byte[]> kafkaBasedLog = mock(KafkaBasedLog.class); final KafkaStatusBackingStore store = new KafkaStatusBackingStore(new MockTime(), converter, STATUS_TOPIC, kafkaBasedLog); final byte[] value = new byte[0]; final Capture<Struct> statusValueStruct = newCapture(); converter.fromConnectData(eq(STATUS_TOPIC), anyObject(Schema.class), capture(statusValueStruct)); EasyMock.expectLastCall().andReturn(value); kafkaBasedLog.send(eq("status-connector-" + CONNECTOR), eq(value), anyObject(Callback.class)); expectLastCall(); replayAll(); final ConnectorStatus status = new ConnectorStatus(CONNECTOR, ConnectorStatus.State.FAILED, WORKER_ID, 0); store.putSafe(status); verifyAll(); assertEquals(status.state().toString(), statusValueStruct.getValue().get(KafkaStatusBackingStore.STATE_KEY_NAME)); assertEquals(status.workerId(), statusValueStruct.getValue().get(KafkaStatusBackingStore.WORKER_ID_KEY_NAME)); assertEquals(status.generation(), statusValueStruct.getValue().get(KafkaStatusBackingStore.GENERATION_KEY_NAME)); }
@SuppressWarnings("unchecked") @Test public void shouldRetryWhenTimeoutExceptionOccursOnSend() throws Exception { final AtomicInteger attempt = new AtomicInteger(0); final RecordCollectorImpl collector = new RecordCollectorImpl( new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) { @Override public synchronized Future<RecordMetadata> send(final ProducerRecord record, final Callback callback) { if (attempt.getAndIncrement() == 0) { throw new TimeoutException(); } return super.send(record, callback); } }, "test"); collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner); final Long offset = collector.offsets().get(new TopicPartition("topic1", 0)); assertEquals(Long.valueOf(0L), offset); }
/** * Try to append to a ProducerBatch. * * If it is full, we return null and a new batch is created. We also close the batch for record appends to free up * resources like compression buffers. The batch will be fully closed (ie. the record batch headers will be written * and memory records built) in one of the following cases (whichever comes first): right before send, * if it is expired, or when the producer is closed. */ // 查找batches集合对应队列的最后一个ProducerBatch private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, Deque<ProducerBatch> deque) { //拿到消息队列中的最后一个 ProducerBatch last = deque.peekLast(); if (last != null) { //调用ProducerBatch的tryAppend方法返回 FutureRecordMetadata future,MemoryRecordsBuilder中是还有空间 FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers, callback, time.milliseconds()); if (future == null) last.closeForRecordAppends(); else return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false); } return null; }
public Future<RecordMetadata> send(Object value, Callback callback) { DocumentMetadata documentMetadata = Metadata.getMetadata().getMetadataMap().get(value.getClass()); String topic = documentMetadata.getTopic(); Method method = documentMetadata.getIdMetadata().getMethod(); try { String key = String.valueOf(method.invoke(value)); return producer.send(new ProducerRecord<>(topic, key, value), callback); } catch (IllegalAccessException | InvocationTargetException e) { throw new JkesException( String.format("Can't invoke method[%s] on object[%s] of class[%s]", method, value, value.getClass()), e); } }
public ErrorLoggingCallback(UUID messageId, K key, V value, String topic, Long timestamp, Integer serializedSize, Auditor<K, V> auditor, Callback userCallback) { _messageId = messageId; _value = value; _key = key; _topic = topic; _timestamp = timestamp; _serializedSize = serializedSize; _auditor = auditor; _userCallback = userCallback; }
@Override public void run() { final Set<String> ackedMessages = new HashSet<>(); for (int i = 0; i < MESSAGE_COUNT; i++) { // The message size is set to 100 - 1124, So we should have most of the messages to be large messages // while still have some ordinary size messages. int messageSize = 100 + _random.nextInt() % 1024; final String messageId = UUID.randomUUID().toString().replace("-", ""); final String message = messageId + TestUtils.getRandomString(messageSize); _producer.send(new ProducerRecord<String, String>(_topic, null, (long) i, null, message), new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { // The callback should have been invoked only once. assertFalse(ackedMessages.contains(messageId)); if (e == null) { ackedMessages.add(messageId); } _messages.put(recordMetadata.topic() + "-" + recordMetadata.partition() + "-" + recordMetadata.offset(), message); } }); } }
@Test public void testLargeMessageCallbackWithoutException() { final AtomicInteger callbackFired = new AtomicInteger(0); LargeMessageCallback callback = new LargeMessageCallback(numSegments, new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { callbackFired.incrementAndGet(); assertEquals("No exception should be there.", e, null); } }); for (int i = 0; i < numSegments - 1; i++) { callback.onCompletion(new RecordMetadata(new TopicPartition("topic", 0), 0L, 0L, 0L, 0, 0, 0), null); assertTrue("The user callback should not be fired.", callbackFired.get() == 0); } callback.onCompletion(new RecordMetadata(new TopicPartition("topic", 0), 0L, 0L, 0L, 0, 0, 0), null); assertTrue("The user callback should not be fired.", callbackFired.get() == 1); }
/** * Append the record to the current record set and return the relative offset within that record set * * @return The RecordSend corresponding to this record or null if there isn't sufficient room. */ public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, long now) { if (!this.records.hasRoomFor(key, value)) { return null; } else { long checksum = this.records.append(offsetCounter++, timestamp, key, value); this.maxRecordSize = Math.max(this.maxRecordSize, Record.recordSize(key, value)); this.lastAppendTime = now; FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount, timestamp, checksum, key == null ? -1 : key.length, value == null ? -1 : value.length); if (callback != null) thunks.add(new Thunk(callback, future)); this.recordCount++; return future; } }
@Test public void testAbortIncompleteBatches() throws Exception { long lingerMs = Long.MAX_VALUE; final AtomicInteger numExceptionReceivedInCallback = new AtomicInteger(0); final RecordAccumulator accum = new RecordAccumulator(4 * 1024, 64 * 1024, CompressionType.NONE, lingerMs, 100L, metrics, time); class TestCallback implements Callback { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { assertTrue(exception.getMessage().equals("Producer is closed forcefully.")); numExceptionReceivedInCallback.incrementAndGet(); } } for (int i = 0; i < 100; i++) accum.append(new TopicPartition(topic, i % 3), 0L, key, value, new TestCallback(), maxBlockTimeMs); RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds()); assertEquals("No nodes should be ready.", 0, result.readyNodes.size()); accum.abortIncompleteBatches(); assertEquals(numExceptionReceivedInCallback.get(), 100); assertFalse(accum.hasUnsent()); }
public void publishMessage(String topic, String key, String message){ final Callback callback = (RecordMetadata m, Exception e) -> { if ( e == null ) { Log.log(Level.FINER, this, "Published Event"); } else { Log.log(Level.FINER, this, "Error publishing event", e); } }; if(producer!=null){ Log.log(Level.FINER, this, "Publishing Event {0} {1} {2}",topic,key,message); ProducerRecord<String,String> pr = new ProducerRecord<>(topic, key, message); producer.send(pr, callback); }else{ Log.log(Level.FINER, this, "Kafka Unavailable, ignoring event {0} {1} {2}",topic,key,message); } }
public void send(final String topic, final byte[] key, final byte[] msg) throws InterruptedException, ExecutionException { ProducerRecord<byte[], byte[]> rec = new ProducerRecord<byte[], byte[]>( topic, key, msg); Future<RecordMetadata> res = producer.send(rec, new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e != null) { logger.error(" failed to send record to {}: {}", topic, e); logger.debug("Failed record: topic {}, key {}, value {}", topic, key, msg); } else { logger.trace( "Wrote record successfully: topic {} partition {} offset {}", recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset()); } } }); if (sync) { res.get(); } }
private String produceMessage(String topicName, Object msg) { String bootstrapServers = CLUSTER.bootstrapServers(); Map<String, Object> config = new HashMap<>(); config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); config.putAll(SCHEMA_REGISTRY_TEST_SERVER_CLIENT_WRAPPER.exportClientConf(true)); config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName()); config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName()); final Producer<String, Object> producer = new KafkaProducer<>(config); final Callback callback = new ProducerCallback(); LOG.info("Sending message: [{}] to topic: [{}]", msg, topicName); ProducerRecord<String, Object> producerRecord = new ProducerRecord<>(topicName, getKey(msg), msg); producer.send(producerRecord, callback); producer.flush(); LOG.info("Message successfully sent to topic: [{}]", topicName); producer.close(5, TimeUnit.SECONDS); return bootstrapServers; }
/** * {@inheritDoc} * @see org.apache.kafka.clients.producer.Producer#send(org.apache.kafka.clients.producer.ProducerRecord, org.apache.kafka.clients.producer.Callback) */ @Override public Future<RecordMetadata> send(final ProducerRecord<K, V> record, final Callback callback) { final long startTime = System.nanoTime(); try { final Future<RecordMetadata> f = producer.send(record, new Callback(){ @Override public void onCompletion(final RecordMetadata metadata, final Exception exception) { sendMessageResponse.update(System.nanoTime()-startTime, TimeUnit.NANOSECONDS); if(callback!=null) { callback.onCompletion(metadata, exception); } } }); sendCounter.inc(); sendMessage.update(System.nanoTime()-startTime, TimeUnit.NANOSECONDS); getTopicCounter(record.topic()).increment(); return f; } catch (Exception ex) { droppedMessages.inc(); log.error("Failed to send producer record", ex); return failedFuture(ex); } }
/** * This test is meant to assure that testAtLeastOnceProducer is valid by testing that if flushing is disabled, * the snapshot method does indeed finishes without waiting for pending records; * we set a timeout because the test will not finish if the logic is broken. */ @SuppressWarnings("unchecked") @Test(timeout = 5000) public void testDoesNotWaitForPendingRecordsIfFlushingDisabled() throws Throwable { final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>( FakeStandardProducerConfig.get(), new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), null); producer.setFlushOnCheckpoint(false); final KafkaProducer<?, ?> mockProducer = producer.getMockKafkaProducer(); final OneInputStreamOperatorTestHarness<String, Object> testHarness = new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producer)); testHarness.open(); testHarness.processElement(new StreamRecord<>("msg")); // make sure that all callbacks have not been completed verify(mockProducer, times(1)).send(any(ProducerRecord.class), any(Callback.class)); // should return even if there are pending records testHarness.snapshot(123L, 123L); testHarness.close(); }
@SuppressWarnings("unchecked") DummyFlinkKafkaProducer(Properties producerConfig, KeyedSerializationSchema<T> schema, FlinkKafkaPartitioner partitioner) { super(DUMMY_TOPIC, schema, producerConfig, partitioner); this.mockProducer = mock(KafkaProducer.class); when(mockProducer.send(any(ProducerRecord.class), any(Callback.class))).thenAnswer(new Answer<Object>() { @Override public Object answer(InvocationOnMock invocationOnMock) throws Throwable { pendingCallbacks.add(invocationOnMock.getArgumentAt(1, Callback.class)); return null; } }); this.pendingCallbacks = new ArrayList<>(); this.flushLatch = new MultiShotLatch(); }
DLFutureRecordMetadata(final String topic, com.twitter.util.Future<DLSN> dlsnFuture, final Callback callback) { this.topic = topic; this.dlsnFuture = dlsnFuture; this.callback = callback; this.dlsnFuture.addEventListener(new FutureEventListener<DLSN>() { @Override public void onFailure(Throwable cause) { callback.onCompletion(null, new IOException(cause)); } @Override public void onSuccess(DLSN value) { callback.onCompletion(new RecordMetadata(new TopicPartition(topic, 0), -1L, -1L), null); } }); }
@Test public void processAsyncSendsMessageWithException() throws Exception { endpoint.setTopic("sometopic"); Mockito.when(exchange.getIn()).thenReturn(in); // setup the exception here org.apache.kafka.clients.producer.KafkaProducer kp = producer.getKafkaProducer(); Mockito.when(kp.send(Mockito.any(ProducerRecord.class), Mockito.any(Callback.class))).thenThrow(new ApiException()); in.setHeader(KafkaConstants.PARTITION_KEY, "4"); producer.process(exchange, callback); Mockito.verify(producer.getKafkaProducer()).send(Matchers.any(ProducerRecord.class), Matchers.any(Callback.class)); Mockito.verify(exchange).setException(Matchers.isA(ApiException.class)); Mockito.verify(callback).done(Matchers.eq(true)); }
public Supplier<Callback> getCallbackSupplier() { if (callbackSupplier == null) { final Logger logger = getLogger(); final boolean debug = logger.isDebugEnabled(); callbackSupplier = () -> (metadata, exception) -> { if (debug) { if (metadata != null) { logger.debug(metadata.toString()); } } if (exception != null) { logger.error("Unable to send message to kafka", exception); } }; } return callbackSupplier; }
protected void sendTuple(T tuple) { if (alreadyInKafka(tuple)) { return; } getProducer().send(new ProducerRecord<>(getTopic(), key, tuple), new Callback() { public void onCompletion(RecordMetadata metadata, Exception e) { if (e != null) { logger.info("Wrting to Kafka failed with an exception {}" + e.getMessage()); throw new RuntimeException(e); } } }); }
public PrecipiceFuture<ProduceStatus, RecordMetadata> sendRecordAction(ProducerRecord<K, V> record) { final PrecipicePromise<ProduceStatus, RecordMetadata> promise = Asynchronous.acquirePermitsAndPromise(guardRail, 1L); producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception == null) { promise.complete(ProduceStatus.SUCCESS, metadata); } else { if (exception instanceof TimeoutException) { promise.completeExceptionally(ProduceStatus.TIMEOUT, exception); } else if (exception instanceof NetworkException) { promise.completeExceptionally(ProduceStatus.NETWORK_EXCEPTION, exception); } else { promise.completeExceptionally(ProduceStatus.OTHER_ERROR, exception); } } } }); return promise.future(); }
public void publishMessage(String topic, String key, String message){ final Callback callback = (RecordMetadata m, Exception e) -> { if ( e == null ) { Log.log(Level.FINER, this, "Published Event"); } else { Log.log(Level.FINER, this, "Error publishing event", e); } }; if(producer!=null){ Log.log(Level.FINER, this, "Publishing Event {0} {1} {2}",topic,key,message); ProducerRecord<String,String> pr = new ProducerRecord<String,String>(topic, key, message); producer.send(pr, callback); } else{ Log.log(Level.FINER, this, "Kafka Unavailable, ignoring event {0} {1} {2}",topic,key,message); } }
/** * Converts {@link com.outbrain.aletheia.datum.production.DeliveryCallback} to {@link org.apache.kafka.clients.producer.Callback} * * @param deliveryCallback The callback provided by the user. * @param endpoint The endpoint for which the delivery callback will be invoked. * */ public Callback transform(final DeliveryCallback deliveryCallback, final EndPoint endpoint){ return new Callback() { @Override public void onCompletion(final RecordMetadata metadata, final Exception exception) { // It's guaranteed in Kafka API that exactly one of the arguments will be null if (metadata != null){ deliveryCallback.onSuccess( new EndpointDeliveryMetadata(endpoint)); } else { kafkaDeliveryFailure.inc(); deliveryCallback.onError( new EndpointDeliveryMetadata(endpoint), exception); } } }; }
@Override public List<Future<RecordMetadata>> save(List<KafkaLogEventDto> logEventDtoList, GenericAvroConverter<GenericRecord> eventConverter, GenericAvroConverter<GenericRecord> headerConverter, Callback callback) throws IOException { List<Future<RecordMetadata>> results = new ArrayList<Future<RecordMetadata>>(); LOG.info("[{}] Sending events to Kafka using {} key defining strategy", topicName, configuration .getKafkaKeyType().toString()); for (KafkaLogEventDto dto : logEventDtoList) { ProducerRecord<String, String> recordToWrite; if (configuration.getUseDefaultPartitioner()) { recordToWrite = new ProducerRecord<String, String>(topicName, getKey(dto), formKafkaJson(dto, eventConverter, headerConverter)); } else { recordToWrite = new ProducerRecord<String, String>(topicName, calculatePartitionId(dto), getKey(dto), formKafkaJson(dto, eventConverter, headerConverter)); } results.add(producer.send(recordToWrite, callback)); } return results; }
private static Thread[] createProducerThreads(KafkaClient kafkaClient, String topic, AtomicLong counterSent, int numThreads, final int numMsgs) { Thread[] result = new Thread[numThreads]; for (int i = 0; i < numThreads; i++) { result[i] = new Thread("Producer - " + i) { public void run() { Callback callback = new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception == null) { counterSent.incrementAndGet(); } } }; for (int i = 0; i < numMsgs; i++) { String content = i + ":" + idGen.generateId128Hex(); KafkaMessage msg = new KafkaMessage(topic, content); kafkaClient.sendMessageRaw(msg, callback); } } }; } return result; }
public void send(final byte[] msg) throws ExecutionException, InterruptedException, TimeoutException { if (producer != null) { byte[] newKey = null; if(key != null && key.contains("${")) { newKey = getLoggerContext().getConfiguration().getStrSubstitutor().replace(key).getBytes(StandardCharsets.UTF_8); } else if (key != null) { newKey = key.getBytes(StandardCharsets.UTF_8); } final ProducerRecord<byte[], byte[]> newRecord = new ProducerRecord<>(topic, newKey, msg); if (syncSend) { final Future<RecordMetadata> response = producer.send(newRecord); response.get(timeoutMillis, TimeUnit.MILLISECONDS); } else { producer.send(newRecord, new Callback() { @Override public void onCompletion(final RecordMetadata metadata, final Exception e) { if (e != null) { LOGGER.error("Unable to write to Kafka in appender [" + getName() + "]", e); } } }); } } }
public ProviderThread(String kafkaTopic, String serverList, ConcurrentLinkedQueue<String> metaLogQueue, ConcurrentLinkedQueue<String> commonLogQueue) { if (metaLogQueue == null) { log.error("meta queue obj is null..."); System.exit(7); } if (commonLogQueue == null) { log.error("common queue obj is null..."); System.exit(8); } if (serverList == null || serverList.length() == 0) { log.error("kafka server list is empty..."); System.exit(9); } if (kafkaTopic == null || kafkaTopic.length() == 0) { log.error("kafka topic is empty..."); System.exit(4); } callback = new Callback() { public void onCompletion(RecordMetadata metadata, Exception e) { if (e != null) { e.printStackTrace(); log.error("message send to partition " + metadata.partition() + ", offset: " + metadata.offset()); } } }; this.metaLogQueue = metaLogQueue; this.commonLogQueue = commonLogQueue; this.serverList = serverList; this.kafkaTopic = kafkaTopic; }
@Test public void sendFailsReturnsFalse() { KafkaProducer producer = mock(KafkaProducer.class); publisher.realProducer = producer; RecordMetadata metadata = new RecordMetadata(null, 0, 0, 0, Long.valueOf(0), 0, 0); ArgumentCaptor<Callback> captor = ArgumentCaptor.forClass(Callback.class); when(producer.send(any(), captor.capture())).then( invocation -> { captor.getValue().onCompletion(metadata, new TimeoutException("error")); return new CompletableFuture(); }); String[] events = { "test" }; assertThat(publisher.publishEvents(false, null, events)).isFalse(); }