public void addPartitions(Map<TopicAndPartition, Long> partitionAndOffsets) { Utils.lockInterruptibly(partitionMapLock); try { for (Map.Entry<TopicAndPartition, Long> entry : partitionAndOffsets.entrySet()) { TopicAndPartition topicAndPartition = entry.getKey(); long offset = entry.getValue(); // If the partitionMap already has the topic/partition, then do not update the map with the old offset if (!partitionMap.containsKey(topicAndPartition)) partitionMap.put(topicAndPartition, PartitionTopicInfo.isOffsetInvalid(offset) ? handleOffsetOutOfRange(topicAndPartition) : offset); } partitionMapCond.signalAll(); } finally { partitionMapLock.unlock(); } }
public void pushToStream(String message) { int streamNo = (int) this.nextStream.incrementAndGet() % this.queues.size(); AtomicLong offset = this.offsets.get(streamNo); BlockingQueue<FetchedDataChunk> queue = this.queues.get(streamNo); AtomicLong thisOffset = new AtomicLong(offset.incrementAndGet()); List<Message> seq = Lists.newArrayList(); seq.add(new Message(message.getBytes(Charsets.UTF_8))); ByteBufferMessageSet messageSet = new ByteBufferMessageSet(NoCompressionCodec$.MODULE$, offset, JavaConversions.asScalaBuffer(seq)); FetchedDataChunk chunk = new FetchedDataChunk(messageSet, new PartitionTopicInfo("topic", streamNo, queue, thisOffset, thisOffset, new AtomicInteger(1), "clientId"), thisOffset.get()); queue.add(chunk); }