@Override public void send(Long k, byte[] v) { KafkaProducer<Long, byte[]> p = getWorker(); p.initTransactions(); p.beginTransaction(); Future<RecordMetadata> res = worker.send(new ProducerRecord<Long, byte[]>(topic, k, v)); RecordMetadata record; try { record = res.get(); offsets.clear(); offsets.put(new TopicPartition(topic, record.partition()), new OffsetAndMetadata(record.offset())); p.sendOffsetsToTransaction(offsets, MallConstants.ORDER_GROUP); p.commitTransaction(); } catch (InterruptedException | ExecutionException e) { p.abortTransaction(); } }
@Override public void commit() { HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<TopicPartition, OffsetAndMetadata>(); partitionOffset.forEach((key, value) -> { String topic = key.split("\\+")[0]; int partition = Integer.valueOf(key.split("\\+")[1]); offsets.put(new TopicPartition(topic, partition), new OffsetAndMetadata(value + 1)); }); consumer.commitSync(offsets); committed.set(true); partitionOffset.clear(); // record the time being committed timerCTX.stop(); stat.newestCompleted = newestRecord; stat.delay = new Date().getTime() - start.getTime(); }
public void recommitOffsets() { LocalDateTime now = LocalDateTime.now(clock); if (now.isAfter(lastUpdateTime.plus(IDLE_DURATION))) { for (TopicPartition tp : offsetData.keySet()) { OffsetAndTime offsetAndTime = offsetData.get(tp); if (now.isAfter(offsetAndTime.time.plus(IDLE_DURATION))) { try { consumer.commitSync(Collections.singletonMap(tp, new OffsetAndMetadata(offsetAndTime.offset))); } catch (CommitFailedException covfefe) { logger.info("Caught CommitFailedException attempting to commit {} {}", tp, offsetAndTime.offset); } offsetAndTime.time = now; } } lastUpdateTime = now; } }
private Map<TopicPartition, OffsetAndMetadata> getZookeeperOffsets(ZkUtils client, String topicStr) { Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(); ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, topicStr); List<String> partitions = asJavaListConverter( client.getChildrenParentMayNotExist(topicDirs.consumerOffsetDir())).asJava(); for (String partition : partitions) { TopicPartition key = new TopicPartition(topicStr, Integer.valueOf(partition)); Option<String> data = client.readDataMaybeNull( topicDirs.consumerOffsetDir() + "/" + partition)._1(); if (data.isDefined()) { Long offset = Long.valueOf(data.get()); offsets.put(key, new OffsetAndMetadata(offset)); } } return offsets; }
/** * Fetch the committed offsets for a set of partitions. This is a non-blocking call. The * returned future can be polled to get the actual offsets returned from the broker. * * @param partitions The set of partitions to get offsets for. * @return A request future containing the committed offsets. */ // 创建并缓存OffsetFethcRequest private RequestFuture<Map<TopicPartition, OffsetAndMetadata>> sendOffsetFetchRequest(Set<TopicPartition> partitions) { Node coordinator = coordinator(); if (coordinator == null) return RequestFuture.coordinatorNotAvailable(); log.debug("Group {} fetching committed offsets for partitions: {}", groupId, partitions); // construct the request OffsetFetchRequest.Builder requestBuilder = new OffsetFetchRequest.Builder(this.groupId, new ArrayList<>(partitions)); // send the request with a callback // 使用OffsetFetchResponseHandler来处理响应 return client.send(coordinator, requestBuilder) .compose(new OffsetFetchResponseHandler()); }
public void run() { try { printJson(new StartupComplete()); consumer.subscribe(Collections.singletonList(topic), this); while (true) { ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE); Map<TopicPartition, OffsetAndMetadata> offsets = onRecordsReceived(records); if (!useAutoCommit) { if (useAsyncCommit) consumer.commitAsync(offsets, this); else commitSync(offsets); } } } catch (WakeupException e) { // ignore, we are closing } finally { consumer.close(); printJson(new ShutdownComplete()); shutdownLatch.countDown(); } }
@Test public void testPutFlush() { HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(); final String newLine = System.getProperty("line.separator"); // We do not call task.start() since it would override the output stream task.put(Arrays.asList( new SinkRecord("topic1", 0, null, null, Schema.STRING_SCHEMA, "line1", 1) )); offsets.put(new TopicPartition("topic1", 0), new OffsetAndMetadata(1L)); task.flush(offsets); assertEquals("line1" + newLine, os.toString()); task.put(Arrays.asList( new SinkRecord("topic1", 0, null, null, Schema.STRING_SCHEMA, "line2", 2), new SinkRecord("topic2", 0, null, null, Schema.STRING_SCHEMA, "line3", 1) )); offsets.put(new TopicPartition("topic1", 0), new OffsetAndMetadata(2L)); offsets.put(new TopicPartition("topic2", 0), new OffsetAndMetadata(1L)); task.flush(offsets); assertEquals("line1" + newLine + "line2" + newLine + "line3" + newLine, os.toString()); }
@Override public void flush(Map<TopicPartition, OffsetAndMetadata> arg0) { // TODO Auto-generated method stub if (singleKinesisProducerPerPartition) { producerMap.values().forEach(producer -> { if (flushSync) producer.flushSync(); else producer.flush(); }); } else { if (flushSync) kinesisProducer.flushSync(); else kinesisProducer.flush(); } }
@Test public void testOnCommitChain() { List<ConsumerInterceptor<Integer, Integer>> interceptorList = new ArrayList<>(); // we are testing two different interceptors by configuring the same interceptor differently, which is not // how it would be done in KafkaConsumer, but ok for testing interceptor callbacks FilterConsumerInterceptor<Integer, Integer> interceptor1 = new FilterConsumerInterceptor<>(filterPartition1); FilterConsumerInterceptor<Integer, Integer> interceptor2 = new FilterConsumerInterceptor<>(filterPartition2); interceptorList.add(interceptor1); interceptorList.add(interceptor2); ConsumerInterceptors<Integer, Integer> interceptors = new ConsumerInterceptors<>(interceptorList); // verify that onCommit is called for all interceptors in the chain Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(); offsets.put(tp, new OffsetAndMetadata(0)); interceptors.onCommit(offsets); assertEquals(2, onCommitCount); // verify that even if one of the interceptors throws an exception, all interceptors' onCommit are called interceptor1.injectOnCommitError(true); interceptors.onCommit(offsets); assertEquals(4, onCommitCount); interceptors.close(); }
private void seekToMissingTransactions(Map<TopicPartition, List<Long>> txByPartition) { Map<TopicPartition, Long> timestamps = txByPartition.entrySet().stream() .collect(Collectors.toMap( Map.Entry::getKey, entry -> Collections.min(entry.getValue()) )); Map<TopicPartition, OffsetAndTimestamp> foundOffsets = consumer.offsetsForTimes(timestamps); Map<TopicPartition, OffsetAndMetadata> toCommit = foundOffsets.entrySet().stream() .collect(Collectors.toMap( Map.Entry::getKey, entry -> { long offset = entry.getValue() != null? entry.getValue().offset() : 0; return new OffsetAndMetadata(offset); } )); consumer.commitSync(toCommit); }
public void seekToTransaction(DataRecoveryConfig config, long transactionId, KafkaFactory kafkaFactory, String groupId) { String topic = config.getLocalTopic(); Properties consumerProperties = PropertiesUtil.propertiesForGroup(config.getConsumerConfig(), groupId); try (Consumer<ByteBuffer, ByteBuffer> consumer = kafkaFactory.consumer(consumerProperties)) { List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic); Map<TopicPartition, Long> seekMap = new HashMap<>(partitionInfos.size()); for (PartitionInfo partitionInfo : partitionInfos) { seekMap.put(new TopicPartition(topic, partitionInfo.partition()), transactionId); } consumer.assign(seekMap.keySet()); Map<TopicPartition, OffsetAndTimestamp> foundOffsets = consumer.offsetsForTimes(seekMap); Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>(); for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : foundOffsets.entrySet()) { if (entry.getValue() != null) { offsetsToCommit.put(entry.getKey(), new OffsetAndMetadata(entry.getValue().offset())); } } consumer.commitSync(offsetsToCommit); } }
public Map<TopicPartition, OffsetAndMetadata> calculateChangedOffsets(List<List<TransactionWrapper>> txToCommit) { if (txToCommit.isEmpty()) { return Collections.emptyMap(); } Lazy<TopicPartition, MutableLongList> offsetsFromTransactions = calculateOffsetsFromTransactions(txToCommit); Collection<TopicPartition> allTopics = new HashSet<>(offsets.keySet()); allTopics.addAll(offsetsFromTransactions.keySet()); Map<TopicPartition, OffsetAndMetadata> result = new HashMap<>(); for (TopicPartition topic : allTopics) { OffsetHolder offsetHolder = offsets.get(topic); long currentOffset = offsetHolder.getLastDenseOffset(); long updatedOffset = MergeHelper.mergeWithDenseCompaction(offsetsFromTransactions.get(topic), offsetHolder.getSparseCommittedOffsets(), currentOffset); if (updatedOffset != INITIAL_SYNC_POINT && updatedOffset != currentOffset) { offsetHolder.setLastDenseOffset(updatedOffset); result.put(topic, new OffsetAndMetadata(updatedOffset)); } } return result; }
@Override public void commitAsync(final OffsetCommitCallback callback) { Retries.tryMe(new IgniteInClosure<RetryRunnableAsyncOnCallback>() { @Override public void apply(final RetryRunnableAsyncOnCallback retryRunnableAsyncOnCallback) { inner.commitAsync(new OffsetCommitCallback() { @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) { callback.onComplete(offsets, exception); if (exception != null) { retryRunnableAsyncOnCallback.retry(exception); } } }); } }, strategy()); }
@Test public void topicSubscription() { state.subscribe(singleton(topic), rebalanceListener); assertEquals(1, state.subscription().size()); assertTrue(state.assignedPartitions().isEmpty()); assertTrue(state.partitionsAutoAssigned()); state.assignFromSubscribed(singleton(tp0)); state.seek(tp0, 1); state.committed(tp0, new OffsetAndMetadata(1)); assertAllPositions(tp0, 1L); state.assignFromSubscribed(singleton(tp1)); assertTrue(state.isAssigned(tp1)); assertFalse(state.isAssigned(tp0)); assertFalse(state.isFetchable(tp1)); assertEquals(singleton(tp1), state.assignedPartitions()); }
private void rewind() { Map<TopicPartition, Long> offsets = context.offsets(); if (offsets.isEmpty()) { return; } for (Map.Entry<TopicPartition, Long> entry: offsets.entrySet()) { TopicPartition tp = entry.getKey(); Long offset = entry.getValue(); if (offset != null) { log.trace("Rewind {} to offset {}.", tp, offset); consumer.seek(tp, offset); lastCommittedOffsets.put(tp, new OffsetAndMetadata(offset)); currentOffsets.put(tp, new OffsetAndMetadata(offset)); } else { log.warn("Cannot rewind {} to null offset.", tp); } } context.clearOffsets(); }
private void expectRebalanceAssignmentError(RuntimeException e) { final List<TopicPartition> partitions = asList(TOPIC_PARTITION, TOPIC_PARTITION2); sinkTask.close(new HashSet<>(partitions)); EasyMock.expectLastCall(); sinkTask.preCommit(EasyMock.<Map<TopicPartition, OffsetAndMetadata>>anyObject()); EasyMock.expectLastCall().andReturn(Collections.emptyMap()); EasyMock.expect(consumer.position(TOPIC_PARTITION)).andReturn(FIRST_OFFSET); EasyMock.expect(consumer.position(TOPIC_PARTITION2)).andReturn(FIRST_OFFSET); sinkTask.open(partitions); EasyMock.expectLastCall().andThrow(e); EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer( new IAnswer<ConsumerRecords<byte[], byte[]>>() { @Override public ConsumerRecords<byte[], byte[]> answer() throws Throwable { rebalanceListener.getValue().onPartitionsRevoked(partitions); rebalanceListener.getValue().onPartitionsAssigned(partitions); return ConsumerRecords.empty(); } }); }
@Test public void testCommitOffsetOnly() { subscriptions.assignFromUser(singleton(t1p)); client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); coordinator.ensureCoordinatorReady(); client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE))); AtomicBoolean success = new AtomicBoolean(false); coordinator.commitOffsetsAsync(Collections.singletonMap(t1p, new OffsetAndMetadata(100L)), callback(success)); coordinator.invokeCompletedOffsetCommitCallbacks(); assertTrue(success.get()); assertEquals(100L, subscriptions.committed(t1p).offset()); }
private void commitInvalidOffsets() { final KafkaConsumer consumer = new KafkaConsumer(TestUtils.consumerConfig( CLUSTER.bootstrapServers(), streamsConfiguration.getProperty(StreamsConfig.APPLICATION_ID_CONFIG), StringDeserializer.class, StringDeserializer.class)); final Map<TopicPartition, OffsetAndMetadata> invalidOffsets = new HashMap<>(); invalidOffsets.put(new TopicPartition(TOPIC_1_2, 0), new OffsetAndMetadata(5, null)); invalidOffsets.put(new TopicPartition(TOPIC_2_2, 0), new OffsetAndMetadata(5, null)); invalidOffsets.put(new TopicPartition(TOPIC_A_2, 0), new OffsetAndMetadata(5, null)); invalidOffsets.put(new TopicPartition(TOPIC_C_2, 0), new OffsetAndMetadata(5, null)); invalidOffsets.put(new TopicPartition(TOPIC_Y_2, 0), new OffsetAndMetadata(5, null)); invalidOffsets.put(new TopicPartition(TOPIC_Z_2, 0), new OffsetAndMetadata(5, null)); consumer.commitSync(invalidOffsets); consumer.close(); }
@Test public void shouldNotThrowUnsupportedOperationExceptionWhenInitializingStateStores() throws Exception { final String changelogName = "test-application-my-store-changelog"; final List<TopicPartition> partitions = Utils.mkList(new TopicPartition(changelogName, 0)); consumer.assign(partitions); final Map<TopicPartition, OffsetAndMetadata> committedOffsets = new HashMap<>(); committedOffsets.put(new TopicPartition(changelogName, 0), new OffsetAndMetadata(0L)); consumer.commitSync(committedOffsets); restoreStateConsumer.updatePartitions(changelogName, Utils.mkList( new PartitionInfo(changelogName, 0, Node.noNode(), new Node[0], new Node[0]))); final KStreamBuilder builder = new KStreamBuilder(); builder.stream("topic").groupByKey().count("my-store"); final ProcessorTopology topology = builder.setApplicationId(applicationId).build(0); StreamsConfig config = createConfig(baseDir); new StandbyTask(taskId, applicationId, partitions, topology, consumer, changelogReader, config, new MockStreamsMetrics(new Metrics()), stateDirectory); }
@Test public void testUpdateFetchPositionOfPausedPartitionsRequiringOffsetReset() { subscriptions.assignFromUser(singleton(tp1)); subscriptions.committed(tp1, new OffsetAndMetadata(0)); subscriptions.pause(tp1); // paused partition does not have a valid position subscriptions.needOffsetReset(tp1, OffsetResetStrategy.LATEST); client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP), listOffsetResponse(Errors.NONE, 1L, 10L)); fetcher.updateFetchPositions(singleton(tp1)); assertFalse(subscriptions.isOffsetResetNeeded(tp1)); assertFalse(subscriptions.isFetchable(tp1)); // because tp is paused assertTrue(subscriptions.hasValidPosition(tp1)); assertEquals(10, subscriptions.position(tp1).longValue()); }
/** * Refresh the committed offsets for provided partitions. */ // 发送offsetFetchRequest请求从服务端拉取最近提交的offset集合,更新到Subcriptions中 // 更新分区状态中已提交的偏移量 public void refreshCommittedOffsetsIfNeeded() { if (subscriptions.refreshCommitsNeeded()) { //ConsumerNetworkClient send方法 Map<TopicPartition, OffsetAndMetadata> offsets = fetchCommittedOffsets(subscriptions.assignedPartitions()); // 处理offset请求 for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) { TopicPartition tp = entry.getKey(); // verify assignment is still active if (subscriptions.isAssigned(tp)) //提交offset this.subscriptions.committed(tp, entry.getValue()); } this.subscriptions.commitsRefreshed(); } }
/** * Fetch the current committed offsets from the coordinator for a set of partitions. * @param partitions The partitions to fetch offsets for * @return A map from partition to the committed offset */ public Map<TopicPartition, OffsetAndMetadata> fetchCommittedOffsets(Set<TopicPartition> partitions) { while (true) { // 检测GroupCoordinator的状态 ensureCoordinatorReady(); // contact coordinator to fetch committed offsets // 创建并缓存OffsetFetchRequest请求 RequestFuture<Map<TopicPartition, OffsetAndMetadata>> future = sendOffsetFetchRequest(partitions); // 阻塞发送request client.poll(future); if (future.succeeded()) return future.value(); if (!future.isRetriable()) throw future.exception(); // 退避一段时间 time.sleep(retryBackoffMs); } }
private void doCommitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, final OffsetCommitCallback callback) { this.subscriptions.needRefreshCommits(); //rebalance RequestFuture<Void> future = sendOffsetCommitRequest(offsets); final OffsetCommitCallback cb = callback == null ? defaultOffsetCommitCallback : callback; future.addListener(new RequestFutureListener<Void>() { @Override public void onSuccess(Void value) { if (interceptors != null) interceptors.onCommit(offsets); completedOffsetCommits.add(new OffsetCommitCompletion(cb, offsets, null)); } @Override public void onFailure(RuntimeException e) { Exception commitException = e; if (e instanceof RetriableException) commitException = RetriableCommitFailedException.withUnderlyingMessage(e.getMessage()); completedOffsetCommits.add(new OffsetCommitCompletion(cb, offsets, commitException)); } }); }
private void doAutoCommitOffsetsAsync() { Map<TopicPartition, OffsetAndMetadata> allConsumedOffsets = subscriptions.allConsumed(); log.debug("Sending asynchronous auto-commit of offsets {} for group {}", allConsumedOffsets, groupId); //rebalance commitOffsetsAsync(allConsumedOffsets, new OffsetCommitCallback() { @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) { if (exception != null) { log.warn("Auto-commit of offsets {} failed for group {}: {}", offsets, groupId, exception.getMessage()); if (exception instanceof RetriableException) nextAutoCommitDeadline = Math.min(time.milliseconds() + retryBackoffMs, nextAutoCommitDeadline); } else { log.debug("Completed auto-commit of offsets {} for group {}", offsets, groupId); } } }); }
@Test public void testUpdateFetchPositionOfPausedPartitionsWithoutAValidPosition() { subscriptions.assignFromUser(singleton(tp1)); subscriptions.committed(tp1, new OffsetAndMetadata(0)); subscriptions.pause(tp1); // paused partition does not have a valid position subscriptions.seek(tp1, 10); fetcher.updateFetchPositions(singleton(tp1)); assertFalse(subscriptions.isOffsetResetNeeded(tp1)); assertFalse(subscriptions.isFetchable(tp1)); // because tp is paused assertTrue(subscriptions.hasValidPosition(tp1)); assertEquals(10, subscriptions.position(tp1).longValue()); }
@Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) { if (null != exception) { // 如果异步提交发生了异常 LOGGER.error("commit failed for offsets {}, and exception is {}", offsets, exception); } }
/** * Flush all records that have been {@link #put(Collection)} for the specified topic-partitions. * * @param currentOffsets the current offset state as of the last call to {@link #put(Collection)}}, * provided for convenience but could also be determined by tracking all offsets included in the {@link SinkRecord}s * passed to {@link #put}. */ public void flush(Map<TopicPartition, OffsetAndMetadata> currentOffsets) { for (Map.Entry<TopicPartition, OffsetAndMetadata> entry: currentOffsets.entrySet()) { TopicPartition tp = entry.getKey(); OffsetAndMetadata om = entry.getValue(); log.trace("Flushing up to topic {}, partition {} and offset {}", tp.topic(), tp.partition(), om.offset()); } writer.commit(); }
@Test public void testUpdateFetchPositionOfPausedPartitionsWithAValidPosition() { subscriptions.assignFromUser(singleton(tp1)); subscriptions.committed(tp1, new OffsetAndMetadata(0)); subscriptions.seek(tp1, 10); subscriptions.pause(tp1); // paused partition already has a valid position fetcher.updateFetchPositions(singleton(tp1)); assertFalse(subscriptions.isOffsetResetNeeded(tp1)); assertFalse(subscriptions.isFetchable(tp1)); // because tp is paused assertTrue(subscriptions.hasValidPosition(tp1)); assertEquals(10, subscriptions.position(tp1).longValue()); }
@Test public void testCommitOffsetAsyncWithDefaultCallback() { int invokedBeforeTest = mockOffsetCommitCallback.invoked; client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); coordinator.ensureCoordinatorReady(); client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE))); coordinator.commitOffsetsAsync(Collections.singletonMap(t1p, new OffsetAndMetadata(100L)), mockOffsetCommitCallback); coordinator.invokeCompletedOffsetCommitCallbacks(); assertEquals(invokedBeforeTest + 1, mockOffsetCommitCallback.invoked); assertNull(mockOffsetCommitCallback.exception); }
private void checkIfRefreshCommitRequired() { // Here's the issue: // The retention of __consumer_offsets is less than most topics itself, so we need to re-commit regularly to keep the // last committed offset per consumer group. This is especially an issue in cases were we have bursty / little traffic. Map<TopicPartition, OffsetAndMetadata> commitOffsets = new HashMap<>(); long now = System.currentTimeMillis(); if (nextCommitRefreshRequiredTimestamp < now) { nextCommitRefreshRequiredTimestamp = now + COMMIT_REFRESH_INTERVAL_MILLIS; for (PartitionProcessor processor : partitions.allProcessors()) { TopicPartition assignedPartition = processor.getAssignedPartition(); long lastCommittedOffset = processor.getLastCommittedOffset(); // We haven't committed from this partiton yet if (lastCommittedOffset < 0) { OffsetAndMetadata offset = kafka.committed(assignedPartition); if (offset == null) { // there was no commit on this partition at all continue; } lastCommittedOffset = offset.offset(); processor.forceSetLastCommittedOffset(lastCommittedOffset); } commitOffsets.put(assignedPartition, new OffsetAndMetadata(lastCommittedOffset)); } kafka.commitSync(commitOffsets); logger.info("Refreshing last committed offset {}", commitOffsets); } }
@Test public void testGroupAuthorizationFailureInFindCoordinator() { final String consumerGroupId = "consumer"; final long pid = 13131L; final short epoch = 1; doInitTransactions(pid, epoch); transactionManager.beginTransaction(); TransactionalRequestResult sendOffsetsResult = transactionManager.sendOffsetsToTransaction( singletonMap(new TopicPartition("foo", 0), new OffsetAndMetadata(39L)), consumerGroupId); prepareAddOffsetsToTxnResponse(Errors.NONE, consumerGroupId, pid, epoch); sender.run(time.milliseconds()); // AddOffsetsToTxn Handled, TxnOffsetCommit Enqueued sender.run(time.milliseconds()); // FindCoordinator Enqueued prepareFindCoordinatorResponse(Errors.GROUP_AUTHORIZATION_FAILED, false, CoordinatorType.GROUP, consumerGroupId); sender.run(time.milliseconds()); // FindCoordinator Failed sender.run(time.milliseconds()); // TxnOffsetCommit Aborted assertTrue(transactionManager.hasError()); assertTrue(transactionManager.lastError() instanceof GroupAuthorizationException); assertTrue(sendOffsetsResult.isCompleted()); assertFalse(sendOffsetsResult.isSuccessful()); assertTrue(sendOffsetsResult.error() instanceof GroupAuthorizationException); GroupAuthorizationException exception = (GroupAuthorizationException) sendOffsetsResult.error(); assertEquals(consumerGroupId, exception.groupId()); assertAbortableError(GroupAuthorizationException.class); }
@Test public void subscriberLosesPartitionAssignment() { KafkaSubscriber<String> subscriber = new KafkaSubscriber<>(new MessageCallback(), "topic", "groupId", false, KafkaSubscriber.OffsetReset.Earliest, 1, 1, 1, 5000, 5000); KafkaTopicInfo message1 = new KafkaTopicInfo("topic", 0, 1, null); KafkaTopicInfo message2 = new KafkaTopicInfo("topic", 0, 2, null); KafkaTopicInfo message3 = new KafkaTopicInfo("topic", 1, 1, null); KafkaTopicInfo message4 = new KafkaTopicInfo("topic", 1, 2, null); subscriber.consume(message1); subscriber.consume(message2); subscriber.consume(message3); subscriber.consume(message4); KafkaConsumer realConsumer = mock(KafkaConsumer.class); class ArgMatcher implements ArgumentMatcher<Map<TopicPartition, OffsetAndMetadata>> { @Override public boolean matches(Map<TopicPartition, OffsetAndMetadata> arg) { OffsetAndMetadata oam = arg.values().iterator().next(); return oam.offset() == 3; } } doThrow(new CommitFailedException()).when(realConsumer).commitSync(argThat(new ArgMatcher())); subscriber.realConsumer = realConsumer; subscriber.offsetCommitter = new OffsetCommitter(realConsumer, Clock.systemUTC()); subscriber.consumeMessages(); }
@Test public void offsetGetsCommitted() throws Exception { committer.offsetCommitted(Collections.singletonMap(new TopicPartition("a", 1), new OffsetAndMetadata(333L))); committer.recommitOffsets(); verifyNoMoreInteractions(consumer); when(clock.instant()).thenReturn(Instant.now().plus(OffsetCommitter.IDLE_DURATION).plusMillis(1000)); committer.recommitOffsets(); //now it's time to work... verify(consumer, times(1)).commitSync(any(Map.class)); }
private void migrateOffsets(String topicStr) { ZkUtils zkUtils = ZkUtils.apply(zookeeperConnect, ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT, JaasUtils.isZkSecurityEnabled()); KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(kafkaProps); try { Map<TopicPartition, OffsetAndMetadata> kafkaOffsets = getKafkaOffsets(consumer, topicStr); if (!kafkaOffsets.isEmpty()) { log.info("Found Kafka offsets for topic " + topicStr + ". Will not migrate from zookeeper"); log.debug("Offsets found: {}", kafkaOffsets); return; } log.info("No Kafka offsets found. Migrating zookeeper offsets"); Map<TopicPartition, OffsetAndMetadata> zookeeperOffsets = getZookeeperOffsets(zkUtils, topicStr); if (zookeeperOffsets.isEmpty()) { log.warn("No offsets to migrate found in Zookeeper"); return; } log.info("Committing Zookeeper offsets to Kafka"); log.debug("Offsets to commit: {}", zookeeperOffsets); consumer.commitSync(zookeeperOffsets); // Read the offsets to verify they were committed Map<TopicPartition, OffsetAndMetadata> newKafkaOffsets = getKafkaOffsets(consumer, topicStr); log.debug("Offsets committed: {}", newKafkaOffsets); if (!newKafkaOffsets.keySet().containsAll(zookeeperOffsets.keySet())) { throw new FlumeException("Offsets could not be committed"); } } finally { zkUtils.close(); consumer.close(); } }
private Map<TopicPartition, OffsetAndMetadata> getKafkaOffsets( KafkaConsumer<String, byte[]> client, String topicStr) { Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(); List<PartitionInfo> partitions = client.partitionsFor(topicStr); for (PartitionInfo partition : partitions) { TopicPartition key = new TopicPartition(topicStr, partition.partition()); OffsetAndMetadata offsetAndMetadata = client.committed(key); if (offsetAndMetadata != null) { offsets.put(key, offsetAndMetadata); } } return offsets; }
private synchronized ConsumerAndRecords createConsumerAndRecords() { try { KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<String, byte[]>(consumerProps); ConsumerAndRecords car = new ConsumerAndRecords(consumer, channelUUID); logger.info("Created new consumer to connect to Kafka"); car.consumer.subscribe(Arrays.asList(topic.get()), new ChannelRebalanceListener(rebalanceFlag)); car.offsets = new HashMap<TopicPartition, OffsetAndMetadata>(); consumers.add(car); return car; } catch (Exception e) { throw new FlumeException("Unable to connect to Kafka", e); } }
private void migrateOffsets() { ZkUtils zkUtils = ZkUtils.apply(zookeeperConnect, ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT, JaasUtils.isZkSecurityEnabled()); KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(consumerProps); try { Map<TopicPartition, OffsetAndMetadata> kafkaOffsets = getKafkaOffsets(consumer); if (!kafkaOffsets.isEmpty()) { logger.info("Found Kafka offsets for topic " + topicStr + ". Will not migrate from zookeeper"); logger.debug("Offsets found: {}", kafkaOffsets); return; } logger.info("No Kafka offsets found. Migrating zookeeper offsets"); Map<TopicPartition, OffsetAndMetadata> zookeeperOffsets = getZookeeperOffsets(zkUtils); if (zookeeperOffsets.isEmpty()) { logger.warn("No offsets to migrate found in Zookeeper"); return; } logger.info("Committing Zookeeper offsets to Kafka"); logger.debug("Offsets to commit: {}", zookeeperOffsets); consumer.commitSync(zookeeperOffsets); // Read the offsets to verify they were committed Map<TopicPartition, OffsetAndMetadata> newKafkaOffsets = getKafkaOffsets(consumer); logger.debug("Offsets committed: {}", newKafkaOffsets); if (!newKafkaOffsets.keySet().containsAll(zookeeperOffsets.keySet())) { throw new FlumeException("Offsets could not be committed"); } } finally { zkUtils.close(); consumer.close(); } }
private Map<TopicPartition, OffsetAndMetadata> getKafkaOffsets( KafkaConsumer<String, byte[]> client) { Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(); List<PartitionInfo> partitions = client.partitionsFor(topicStr); for (PartitionInfo partition : partitions) { TopicPartition key = new TopicPartition(topicStr, partition.partition()); OffsetAndMetadata offsetAndMetadata = client.committed(key); if (offsetAndMetadata != null) { offsets.put(key, offsetAndMetadata); } } return offsets; }
public static void main(String[] args) { Map<String, Object> configs = new HashMap<String, Object>(); // bootstrap.servers指定一个或多个broker,不用指定全部的broker,它将自动发现集群中的其余的borker。 configs.put("bootstrap.servers", "192.168.0.107:9092,192.168.0.108:9092,192.168.0.109:9092"); configs.put("group.id", "kafka-test"); // 是否自动确认offset configs.put("enable.auto.commit", "false"); // 自动确认offset的时间间隔 configs.put("auto.commit.interval.ms", "1000"); configs.put("session.timeout.ms", "30000"); configs.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); configs.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(configs); // 消费者订阅的topic, 可同时订阅多个 consumer.subscribe(Arrays.asList("kafka-test")); final int minBatchSize = 200; List<ConsumerRecord<String, String>> buffer = new ArrayList<ConsumerRecord<String, String>>(); while (true) { ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE); for (TopicPartition partition : records.partitions()) { List<ConsumerRecord<String, String>> partitionRecords = records.records(partition); for (ConsumerRecord<String, String> record : partitionRecords) { System.out.println(record.offset() + ": " + record.value()); } /* 同步确认某个分区的特定offset */ long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1))); } } }
@Test public void shouldThrowOnNullConsumerGroupIdWhenSendOffsetsToTransaction() { producer.initTransactions(); producer.beginTransaction(); try { producer.sendOffsetsToTransaction(Collections.<TopicPartition, OffsetAndMetadata>emptyMap(), null); fail("Should have thrown NullPointerException"); } catch (NullPointerException e) { } }