@Test public void runNoData() throws Exception { when(iterator.hasNext()).thenReturn(false); final KafkaRpcPluginThread writer = Mockito.spy( new KafkaRpcPluginThread(group, 1, TOPICS)); writer.run(); verify(tsdb, never()).addPoint(anyString(), anyLong(), anyLong(), anyMap()); verify(tsdb, never()).addHistogramPoint(anyString(), anyLong(), any(byte[].class), anyMap()); verify(tsdb, never()).addAggregatePoint(anyString(), anyLong(), anyLong(), anyMap(), anyBoolean(), anyString(), anyString(), anyString()); verify(consumer_connector, times(1)) .createMessageStreamsByFilter(any(TopicFilter.class), anyInt()); verify(writer, times(1)).shutdown(); verify(consumer_connector, times(1)).shutdown(); }
@Test public void runNoDataRestart() throws Exception { when(iterator.hasNext()).thenReturn(false); final KafkaRpcPluginThread writer = Mockito.spy( new KafkaRpcPluginThread(group, 1, TOPICS)); writer.run(); writer.run(); verify(tsdb, never()).addPoint(anyString(), anyLong(), anyLong(), anyMap()); verify(tsdb, never()).addHistogramPoint(anyString(), anyLong(), any(byte[].class), anyMap()); verify(tsdb, never()).addAggregatePoint(anyString(), anyLong(), anyLong(), anyMap(), anyBoolean(), anyString(), anyString(), anyString()); verify(consumer_connector, times(2)) .createMessageStreamsByFilter(any(TopicFilter.class), anyInt()); verify(writer, times(2)).shutdown(); verify(consumer_connector, times(2)).shutdown(); }
@Test public void runNoStreams() throws Exception { when(stream_list.get(0)) .thenThrow(new ArrayIndexOutOfBoundsException()); KafkaRpcPluginThread writer = Mockito.spy( new KafkaRpcPluginThread(group, 1, TOPICS)); writer.run(); verify(tsdb, never()).addPoint(anyString(), anyLong(), anyLong(), anyMap()); verify(tsdb, never()).addHistogramPoint(anyString(), anyLong(), any(byte[].class), anyMap()); verify(tsdb, never()).addAggregatePoint(anyString(), anyLong(), anyLong(), anyMap(), anyBoolean(), anyString(), anyString(), anyString()); verify(consumer_connector, times(1)) .createMessageStreamsByFilter(any(TopicFilter.class), anyInt()); verify(writer, times(1)).shutdown(); verify(consumer_connector, times(1)).shutdown(); }
private void verifyMessageRead(final KafkaRpcPluginThread writer, final boolean requeued) { verify(writer, times(1)).shutdown(); verify(consumer_connector, times(1)).shutdown(); verify(consumer_connector, times(1)) .createMessageStreamsByFilter(any(TopicFilter.class), anyInt()); verify(iterator, times(2)).hasNext(); if (requeued) { verify(requeue, times(1)).handleError( any(IncomingDataPoint.class), any(Exception.class)); } else { verify(requeue, never()).handleError( any(IncomingDataPoint.class), any(Exception.class)); } if (data != null) { verify(rate_limiter, times(1)).acquire(); } }
@Override protected void onStart(DbzNode node) { logger.debug("DATABASES: Starting and subscribing to '{}'...", Topic.SCHEMA_UPDATES); // Add a single-threaded consumer that will read the "schema-updates" topic to get all database schema updates. // We use a unique group ID so that we get *all* the messages on this topic. int numThreads = 1; String groupId = "databases-" + node.id(); // unique so that all clients see all messages TopicFilter topicFilter = Topics.anyOf(Topic.SCHEMA_UPDATES); node.subscribe(groupId, topicFilter, numThreads, (topic, partition, offset, key, msg) -> { Document updatedSchema = Message.getAfter(msg); DatabaseId dbId = Identifier.parseDatabaseId(key); activeDatabases.put(dbId.asString(), new ActiveDatabase(dbId, updatedSchema)); logger.debug("DATABASES: Cached active database '{}'...", dbId); return true; }); }
public MessageReader(SecorConfig config, OffsetTracker offsetTracker) throws UnknownHostException { mConfig = config; mOffsetTracker = offsetTracker; mConsumerConnector = Consumer.createJavaConsumerConnector(createConsumerConfig()); if (!mConfig.getKafkaTopicBlacklist().isEmpty() && !mConfig.getKafkaTopicFilter().isEmpty()) { throw new RuntimeException("Topic filter and blacklist cannot be both specified."); } TopicFilter topicFilter = !mConfig.getKafkaTopicBlacklist().isEmpty()? new Blacklist(mConfig.getKafkaTopicBlacklist()): new Whitelist(mConfig.getKafkaTopicFilter()); LOG.debug("Use TopicFilter {}({})", topicFilter.getClass(), topicFilter); List<KafkaStream<byte[], byte[]>> streams = mConsumerConnector.createMessageStreamsByFilter(topicFilter); KafkaStream<byte[], byte[]> stream = streams.get(0); mIterator = stream.iterator(); mLastAccessTime = new HashMap<TopicPartition, Long>(); StatsUtil.setLabel("secor.kafka.consumer.id", IdUtil.getConsumerId()); mTopicPartitionForgetSeconds = mConfig.getTopicPartitionForgetSeconds(); mCheckMessagesPerSecond = mConfig.getMessagesPerSecond() / mConfig.getConsumerThreads(); mKafkaMessageTimestampFactory = new KafkaMessageTimestampFactory(mConfig.getKafkaMessageTimestampClass()); }
@Test public void runConsumerRuntimeException() throws Exception { when(consumer_connector.createMessageStreamsByFilter( (TopicFilter) any(), anyInt())).thenThrow( new RuntimeException("Foobar")); KafkaRpcPluginThread writer = Mockito.spy( new KafkaRpcPluginThread(group, 1, TOPICS)); writer.run(); verify(writer, times(1)).shutdown(); verify(consumer_connector, times(1)).shutdown(); }
@Test(expected = Exception.class) public void runConsumerException() throws Exception { when(consumer_connector.createMessageStreamsByFilter( (TopicFilter) any(), anyInt())).thenThrow( new Exception("Foobar")); KafkaRpcPluginThread writer = Mockito.spy( new KafkaRpcPluginThread(group, 1, TOPICS)); writer.run(); verify(writer, times(1)).shutdown(); verify(consumer_connector, times(1)).shutdown(); }
private void init() { // register kafka offset lag metrics, one Gauge is for per consumer level granularity MetricRegistry registry = Metrics.getRegistry(); try { fetchedMsgCounter = registry.meter("kafkaIngesterConsumer." + this.getName() + "-msgFetchRate"); failedToIngestCounter = registry.meter("kafkaIngesterConsumer." + this.getName() + "-failedToIngest"); kafkaOffsetLagGauge = registry.register("kafkaIngesterConsumer." + this.getName() + "-kafkaOffsetLag", new JmxAttributeGauge( new ObjectName(maxLagMetricName), "Value")); } catch (MalformedObjectNameException | IllegalArgumentException e) { logger.error("Register failure for metrics of KafkaIngesterConsumer", e); } TopicFilter topicFilter = new Whitelist(AuditConfig.AUDIT_TOPIC_NAME); logger.info("{}: Topic filter is {}", getName(), AuditConfig.AUDIT_TOPIC_NAME); this.consumer = Consumer.createJavaConsumerConnector(createConsumerConfig()); KafkaStream<byte[], byte[]> stream = consumer.createMessageStreamsByFilter(topicFilter, 1).get(0); iterator = stream.iterator(); logger.info("KafkaIngesterConsumer thread {} is initialized successfully", getName()); if (AuditConfig.INGESTER_ENABLE_DEDUP) { deduplicator = new Deduplicator(threadId, AuditConfig.INGESTER_REDIS_HOST, AuditConfig.INGESTER_REDIS_PORT, AuditConfig.INGESTER_REDIS_KEY_TTL_SEC, AuditConfig.INGESTER_DUP_HOST_PREFIX, AuditConfig.INGESTER_HOSTS_WITH_DUP); deduplicator.open(); } else { deduplicator = null; } }
public WildcardTopicCount(ZKConnector<?> zkClient, String consumerIdString, TopicFilter topicFilter, Integer numStreams) { this.consumerIdString = consumerIdString; this.numStreams = numStreams; this.topicFilter = topicFilter; this.zkClient = zkClient; }
public static TopicFilter anyOf( String...topics) { StringJoiner joiner = new StringJoiner(","); for ( String topic : topics ) { joiner.add(topic); } return new Whitelist(joiner.toString()); }
public static TopicFilter noneOf( String...topics) { StringJoiner joiner = new StringJoiner(","); for ( String topic : topics ) { joiner.add(topic); } return new Blacklist(joiner.toString()); }
public Subscriber(String groupId, TopicFilter topicFilter, int numThreads, Deserializer<KeyType> keyDecoder, Deserializer<MessageType> messageDecoder, MessageConsumer<KeyType, MessageType> consumer) { this.topicFilter = topicFilter; this.keyDecoder = keyDecoder; this.messageDecoder = messageDecoder; this.consumer = consumer; }
@SuppressWarnings("unchecked") @Before public void before() throws Exception { tsdb = PowerMockito.mock(TSDB.class); config = new KafkaRpcPluginConfig(new Config(false)); group = mock(KafkaRpcPluginGroup.class); message = mock(MessageAndMetadata.class); rate_limiter = mock(RateLimiter.class); requeue = mock(KafkaStorageExceptionHandler.class); counters = new ConcurrentHashMap<String, Map<String, AtomicLong>>(); deserializer = new JSONDeserializer(); consumer_connector = mock(ConsumerConnector.class); mockStatic(Consumer.class); when(Consumer.createJavaConsumerConnector((ConsumerConfig) any())) .thenReturn(consumer_connector); when(tsdb.getConfig()).thenReturn(config); when(tsdb.getStorageExceptionHandler()).thenReturn(requeue); parent = mock(KafkaRpcPlugin.class); when(parent.getHost()).thenReturn(LOCALHOST); when(parent.getTSDB()).thenReturn(tsdb); when(parent.getConfig()).thenReturn(config); when(parent.getNamespaceCounters()).thenReturn(counters); when(parent.trackMetricPrefix()).thenReturn(true); when(group.getParent()).thenReturn(parent); when(group.getRateLimiter()).thenReturn(rate_limiter); when(group.getGroupID()).thenReturn(GROUPID); when(group.getConsumerType()).thenReturn(TsdbConsumerType.RAW); when(group.getDeserializer()).thenReturn(deserializer); config.overrideConfig(KafkaRpcPluginConfig.KAFKA_CONFIG_PREFIX + "zookeeper.connect", ZKS); stream_list = mock(List.class); when(consumer_connector.createMessageStreamsByFilter( (TopicFilter) any(), anyInt())).thenReturn(stream_list); final KafkaStream<byte[], byte[]> stream = mock(KafkaStream.class); when(stream_list.get(0)).thenReturn(stream); iterator = mock(ConsumerIterator.class); when(stream.iterator()).thenReturn(iterator); when(iterator.hasNext()).thenReturn(true).thenReturn(false); when(iterator.next()).thenReturn(message); PowerMockito.mockStatic(ConsumerConfig.class); PowerMockito.whenNew(ConsumerConfig.class).withAnyArguments() .thenReturn(mock(ConsumerConfig.class)); PowerMockito.mockStatic(Consumer.class); when(Consumer.createJavaConsumerConnector(any(ConsumerConfig.class))) .thenReturn(consumer_connector); }
@SuppressWarnings("unchecked") public static TopicCount constructTopicCount(ZKConnector<?> zkClient, String group, String consumerId) { KafkaZKData.ZKGroupDirs dirs = new KafkaZKData.ZKGroupDirs(group); String subscriptionPattern = null; Map<String, Integer> topMap = null; try { String topicCountString = zkClient.readData(dirs.consumerRegistryDir() + "/" + consumerId); ObjectMapper mapper = new ObjectMapper(); TypeReference<Map<String, Object>> typeMap = new TypeReference<Map<String, Object>>() { }; Map<String, Object> jsonObj = mapper.reader(typeMap).readValue( topicCountString); if (jsonObj == null) throw new KafkaZKException("error constructing TopicCount : " + topicCountString); Object pattern = jsonObj.get("pattern"); if (pattern == null) throw new KafkaZKException("error constructing TopicCount : " + topicCountString); subscriptionPattern = (String) pattern; Object sub = jsonObj.get("subscription"); if (sub == null) throw new KafkaZKException("error constructing TopicCount : " + topicCountString); topMap = (Map<String, Integer>) sub; } catch (Throwable t) { throw new KafkaZKException(t); } boolean hasWhiteList = whiteListPattern.equals(subscriptionPattern); boolean hasBlackList = blackListPattern.equals(subscriptionPattern); if (topMap.isEmpty() || !(hasWhiteList || hasBlackList)) { return new StaticTopicCount(consumerId, topMap); } else { String regex = null; Integer numStreams = -1; for (Entry<String, Integer> entity : topMap.entrySet()) { regex = entity.getKey(); numStreams = entity.getValue(); break; } TopicFilter filter = hasWhiteList ? new Whitelist(regex) : new Blacklist(regex); return new WildcardTopicCount(zkClient, consumerId, filter, numStreams); } }
public static TopicCount constructTopicCount(ZKConnector<?> zkClient, String consumerIdString, TopicFilter filter, int numStreams) { return new WildcardTopicCount(zkClient, consumerIdString, filter, numStreams); }
public static TopicFilter of( String topic ) { return new Whitelist(topic); }
@Override public <KeyType, MessageType> void subscribe(String groupId, TopicFilter topicFilter, int numThreads, Deserializer<KeyType> keyDecoder, Deserializer<MessageType> messageDecoder, MessageConsumer<KeyType, MessageType> consumer) { if (!running) throw new IllegalStateException("Kafka client is no longer running"); if (numThreads < 1) return; // Create the config for this consumer ... final boolean autoCommit = true; final boolean debug = logger.isDebugEnabled(); Properties props = new Properties(); props.putAll(this.consumerConfig); props.put("group.id", groupId); // Create the consumer and iterate over the streams and create a thread to process each one ... ConsumerConnector connector = getOrCreateConnector(props); connector.createMessageStreamsByFilter(topicFilter, numThreads, DEFAULT_DECODER, DEFAULT_DECODER) .forEach(stream -> { this.executor.get().execute(() -> { final ConsumerIterator<byte[], byte[]> iter = stream.iterator(); boolean success = false; while (true) { try { while (running && iter.hasNext()) { // Determine if we're still running after we've received a message ... if ( running ) { MessageAndMetadata<byte[], byte[]> msg = iter.next(); if (debug) { logger.debug("Consuming next message on topic '{}', partition {}, offset {}", msg.topic(), msg.partition(), msg.offset()); } success = consumer.consume(msg.topic(), msg.partition(), msg.offset(), keyDecoder.deserialize(msg.topic(),msg.key()), messageDecoder.deserialize(msg.topic(),msg.message())); logger.debug("Consume message: {}", success); if (success && autoCommit) { logger.debug("Committing offsets"); connector.commitOffsets(); } } } } catch (ConsumerTimeoutException e) { logger.debug("Consumer timed out and continuing"); // Keep going ... } } }); }); }
@Override public <KeyType, MessageType> void subscribe(String groupId, TopicFilter topicFilter, int numThreads, Deserializer<KeyType> keyDecoder, Deserializer<MessageType> messageDecoder, MessageConsumer<KeyType, MessageType> consumer) { subscribers.add(new Subscriber<>(groupId, topicFilter, numThreads, keyDecoder, messageDecoder, consumer)); }
/** * Create a list of MessageAndTopicStreams containing messages of type T. * * @param topicFilter a TopicFilter that specifies which topics to * subscribe to (encapsulates a whitelist or a blacklist). * @param numStreams the number of message streams to return. * @param keyDecoder a decoder that decodes the message key * @param valueDecoder a decoder that decodes the message itself * @return a list of KafkaStream. Each stream supports an * iterator over its MessageAndMetadata elements. */ public <K, V> List<KafkaStream<K, V>> createMessageStreamsByFilter(TopicFilter topicFilter, int numStreams, Decoder<K> keyDecoder, Decoder<V> valueDecoder);
/** * Subscribe to one or more topics. * * @param groupId the identifier of the consumer's group; may not be null * @param topicFilter the filter for the topics; may not be null * @param numThreads the number of threads on which consumers should be called * @param messageDecoder the decoder that should be used to convert the {@code byte[]} message into an object form expected by * the consumer * @param consumer the consumer, which should be threadsafe if {@code numThreads} is more than 1 */ public <MessageType> void subscribe(String groupId, TopicFilter topicFilter, int numThreads, Deserializer<MessageType> messageDecoder, MessageConsumer<String, MessageType> consumer) { subscribe(groupId, topicFilter, numThreads, Serdes.stringDeserializer(), messageDecoder, consumer); }
/** * Subscribe to one or more topics. * * @param groupId the identifier of the consumer's group; may not be null * @param topicFilter the filter for the topics; may not be null * @param numThreads the number of threads on which consumers should be called * @param consumer the consumer, which should be threadsafe if {@code numThreads} is more than 1 */ public void subscribe(String groupId, TopicFilter topicFilter, int numThreads, MessageConsumer<String, Document> consumer) { subscribe(groupId, topicFilter, numThreads, Serdes.stringDeserializer(), Serdes.document(), consumer); }
/** * Subscribe to one or more topics. * * @param groupId the identifier of the consumer's group; may not be null * @param topicFilter the filter for the topics; may not be null * @param numThreads the number of threads on which consumers should be called * @param keyDecoder the decoder that should be used to convert the {@code byte[]} key into an object form expected by the * consumer * @param messageDecoder the decoder that should be used to convert the {@code byte[]} message into an object form expected by * the consumer * @param consumer the consumer, which should be threadsafe if {@code numThreads} is more than 1 */ public <KeyType, MessageType> void subscribe(String groupId, TopicFilter topicFilter, int numThreads, Deserializer<KeyType> keyDecoder, Deserializer<MessageType> messageDecoder, MessageConsumer<KeyType, MessageType> consumer) { logger.debug("NODE: subscribing {} in group '{}' to topics {}",consumer,groupId,topicFilter); messageBus.get().subscribe(groupId, topicFilter, numThreads, keyDecoder, messageDecoder, consumer); }
/** * Subscribe to one or more topics. * * @param groupId the identifier of the consumer's group; may not be null * @param topicFilter the filter for the topics; may not be null * @param numThreads the number of threads on which consumers should be called * @param keyDeserializer the deserializer that should be used to convert the {@code byte[]} key into an object form expected * by the consumer * @param messageDeserializer the deserializer that should be used to convert the {@code byte[]} message into an object form * expected by the consumer * @param consumer the consumer, which should be threadsafe if {@code numThreads} is more than 1 */ public <KeyType, MessageType> void subscribe(String groupId, TopicFilter topicFilter, int numThreads, Deserializer<KeyType> keyDeserializer, Deserializer<MessageType> messageDeserializer, MessageConsumer<KeyType, MessageType> consumer);
public List<KafkaStream<byte[], byte[]>> createMessageStreamsByFilter(TopicFilter topicFilter, int numStreams);
public List<KafkaStream<byte[], byte[]>> createMessageStreamsByFilter(TopicFilter topicFilter);