@Override public ConsumerConnector newConsumerConnector(String name, ConsumerConfig configOverrides) { Properties mergedProps = new Properties(); Map<String, String> config = configs.get(name); if (config != null) { mergedProps.putAll(config); } if (configOverrides != null) { mergedProps.putAll(configOverrides.createConsumerConfig()); } return Consumer.createJavaConsumerConnector(new kafka.consumer.ConsumerConfig(mergedProps)); }
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { _collector = spoutOutputCollector; Properties props = new Properties(); props.put("zookeeper.connect", conf.get(OSMIngest.ZOOKEEPERS)); props.put("group.id", groupId); props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "1000"); ConsumerConfig consumerConfig = new ConsumerConfig(props); ConsumerConnector consumer = Consumer.createJavaConsumerConnector(consumerConfig); Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, 1); Map<String, List<KafkaStream<String, String>>> consumerMap = consumer.createMessageStreams(topicCountMap, new StringDecoder(new VerifiableProperties()), new StringDecoder(new VerifiableProperties())); List<KafkaStream<String, String>> streams = consumerMap.get(topic); KafkaStream<String, String> stream = null; if (streams.size() == 1) { stream = streams.get(0); } else { log.error("Streams should be of size 1"); } kafkaIterator = stream.iterator(); }
@Override public CompletionService<Histogram> startConsumers() { final ConsumerConfig consumerConfig = new ConsumerConfig(props); consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig); // Create message streams final Map<String, Integer> topicMap = new HashMap<>(); topicMap.put(topic, numThreads); final Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicMap); final List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); // Pass each stream to a consumer that will read from the stream in its own thread. for (final KafkaStream<byte[], byte[]> stream : streams) { executorCompletionService.submit(new BlockingKafkaMessageConsumer(stream)); } return executorCompletionService; }
/** * {@inheritDoc} */ @Override public void initialize() throws StreamingException { ConsumerConfig consumerConfig = new ConsumerConfig(kafkaProperties); consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig); Map<String, Integer> topicCountMap = Maps.newHashMap(); topicCountMap.put(topic, TOPIC_COUNT); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicCountMap); KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0); consumerIterator = stream.iterator(); }
@Provides @KafkaScope ConsumerSchema.Consumer consumer(final IngestionManager ingestionManager) { // XXX: make target group configurable? final IngestionGroup ingestion = ingestionManager.useDefaultGroup(); if (ingestion.isEmpty()) { throw new IllegalStateException("No backends are part of the ingestion group"); } final ConsumerSchema.Depends d = DaggerConsumerSchema_Depends .builder() .primaryComponent(primary) .depends(depends) .dependsModule(new ConsumerSchema.DependsModule(ingestion)) .build(); final ConsumerSchema.Exposed exposed = schema.setup(d); return exposed.consumer(); }
public static ConsumerConnector getConsumer(String groupId) { //加上线程名字的考虑是:保证每个线程只有一个Consumer,但是每个线程又可以有一个独立的Consumer,从而消费不同的partition String consumerKey = groupId + "|" + Thread.currentThread().getName(); ConsumerConnector msgConnector = groupConsumers.get(consumerKey); if (msgConnector == null) { try { consumerLock.lock(); msgConnector = groupConsumers.get(consumerKey); if (msgConnector == null) { msgConnector = Consumer.createJavaConsumerConnector(getConsumerRealConfig(groupId)); groupConsumers.put(consumerKey, msgConnector); } } finally { consumerLock.unlock(); } } return msgConnector; }
public static void main(String[] args) throws Exception { if (id == null) throw new IllegalStateException("Undefined HC_ID"); if (zk == null) throw new IllegalStateException("Undefined HC_ZK"); out.println("Starting " + HttpClient.class.getSimpleName()); out.println("Using zk:" + zk + ", id:" + id); Properties props = new Properties(); props.put("zookeeper.connect", zk); props.put("group.id", id); props.put("zookeeper.session.timeout.ms", "400"); props.put("zookeeper.sync.time.ms", "200"); ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props)); KafkaStream<byte[],byte[]> stream = consumer.createMessageStreams(Collections.singletonMap(id, 1)).get(id).get(0); consume(consumer, stream); }
@Bean protected KafkaStream<String, float[]> kafkaStream() { final String topicName = retrieveTopicNameFromGatewayAddress(gatewayUrl()); ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig()); Map<String, Integer> topicCounts = new HashMap<>(); topicCounts.put(topicName, 1); VerifiableProperties emptyProps = new VerifiableProperties(); StringDecoder keyDecoder = new StringDecoder(emptyProps); FeatureVectorDecoder valueDecoder = new FeatureVectorDecoder(); Map<String, List<KafkaStream<String, float[]>>> streams = consumerConnector.createMessageStreams(topicCounts, keyDecoder, valueDecoder); List<KafkaStream<String, float[]>> streamsByTopic = streams.get(topicName); Preconditions.checkNotNull(streamsByTopic, String.format("Topic %s not found in streams map.", topicName)); Preconditions.checkElementIndex(0, streamsByTopic.size(), String.format("List of streams of topic %s is empty.", topicName)); return streamsByTopic.get(0); }
public List<KafkaStream<byte[], byte[]>> createKafkaStream( String zookeeperConnectString, String topic, int partitions ) { //create consumer Properties consumerProps = new Properties(); consumerProps.put("zookeeper.connect", zookeeperConnectString); consumerProps.put("group.id", "testClient"); consumerProps.put("zookeeper.session.timeout.ms", "6000"); consumerProps.put("zookeeper.sync.time.ms", "200"); consumerProps.put("auto.commit.interval.ms", "1000"); consumerProps.put("consumer.timeout.ms", "500"); ConsumerConfig consumerConfig = new ConsumerConfig(consumerProps); ConsumerConnector consumer = Consumer.createJavaConsumerConnector(consumerConfig); Map<String, Integer> topicCountMap = new HashMap<>(); topicCountMap.put(topic, partitions); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); return consumerMap.get(topic); }
public static List<KafkaStream<byte[], byte[]>> createKafkaStream(String zookeeperConnectString, String topic, int partitions) { //create consumer Properties consumerProps = new Properties(); consumerProps.put("zookeeper.connect", zookeeperConnectString); consumerProps.put("group.id", "testClient"); consumerProps.put("zookeeper.session.timeout.ms", "6000"); consumerProps.put("zookeeper.sync.time.ms", "200"); consumerProps.put("auto.commit.interval.ms", "1000"); consumerProps.put("consumer.timeout.ms", "500"); ConsumerConfig consumerConfig = new ConsumerConfig(consumerProps); ConsumerConnector consumer = Consumer.createJavaConsumerConnector(consumerConfig); Map<String, Integer> topicCountMap = new HashMap<>(); topicCountMap.put(topic, partitions); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); return consumerMap.get(topic); }
public List<KafkaStream<byte[], byte[]>> consume(String topic) { Properties consumerProperties = TestUtils.createConsumerProperties( ZK_HOST, UUID.randomUUID().toString(), "client", TIMEOUT); Map<String, Integer> topicCountMap = new HashMap<>(); topicCountMap.put(topic, 1); // not sure why is this 1 Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = Consumer.createJavaConsumerConnector (new ConsumerConfig(consumerProperties)).createMessageStreams(topicCountMap); return consumerMap.get(topic); }
@Test public void testKafkaLogAppender() { Properties consumerProps = new Properties(); consumerProps.put("zookeeper.connect", zookeeper); consumerProps.put("group.id", "kafka-log-appender-test"); consumerProps.put("auto.offset.reset", "smallest"); consumerProps.put("schema.registry.url", schemaRegistry); Map<String, Integer> topicMap = new HashMap<String, Integer>(); topicMap.put(topic, 1); ConsumerIterator<String, Object> iterator = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumerProps)) .createMessageStreams(topicMap, new StringDecoder(null), new KafkaAvroDecoder(new VerifiableProperties(consumerProps))) .get(topic).get(0).iterator(); String testMessage = "I am a test message"; logger.info(testMessage); MessageAndMetadata<String, Object> messageAndMetadata = iterator.next(); GenericRecord logLine = (GenericRecord) messageAndMetadata.message(); assertEquals(logLine.get("line").toString(), testMessage); assertEquals(logLine.get("logtypeid"), KafkaLogAppender.InfoLogTypeId); assertNotNull(logLine.get("source")); assertEquals(((Map<CharSequence, Object>) logLine.get("timings")).size(), 1); assertEquals(((Map<CharSequence, Object>) logLine.get("tag")).size(), 2); }
public KtGroup(Config config) { // Because we are not pushing names to zookeeper random names should be fine String groupId = config.getGroupId(); if (groupId == null) { // default to a unique group id groupId = "Kt-" + UUID.randomUUID(); } String offset = "largest"; if (config.getLocation().equals("tail")) { offset = "smallest"; } log.info("Starting consumer at '{}' offset", offset); consumer = Consumer.createJavaConsumerConnector(createConsumerConfig(config.getZookeeper(), groupId, offset)); this.topic = config.getTopic(); }
@Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { logger.info("Opened"); this.collector = collector; logger.info(" topic = " + kafkaSpoutConfig.kafkaConsumerConfiguration.getTopic()); this.spoutName = String.format("%s-%d", context.getThisComponentId(), context.getThisTaskId()); Properties kafkaProperties = KafkaConsumerProperties.createKafkaProperties(kafkaSpoutConfig.kafkaConsumerConfiguration); // Have to use a different consumer.id for each spout so use the storm taskId. Otherwise, // zookeeper complains about a conflicted ephemeral node when there is more than one spout // reading from a topic kafkaProperties.setProperty("consumer.id", String.valueOf(context.getThisTaskId())); ConsumerConfig consumerConfig = new ConsumerConfig(kafkaProperties); this.consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig); }
KafkaConsumerSuite(String zkConnectString, String topic) { _topic = topic; Properties consumeProps = new Properties(); consumeProps.put("zookeeper.connect", zkConnectString); consumeProps.put("group.id", _topic+"-"+System.nanoTime()); consumeProps.put("zookeeper.session.timeout.ms", "10000"); consumeProps.put("zookeeper.sync.time.ms", "10000"); consumeProps.put("auto.commit.interval.ms", "10000"); consumeProps.put("_consumer.timeout.ms", "10000"); _consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumeProps)); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = _consumer.createMessageStreams(ImmutableMap.of(this._topic, 1)); List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(this._topic); _stream = streams.get(0); _iterator = _stream.iterator(); }
public MessageReader(SecorConfig config, OffsetTracker offsetTracker) throws UnknownHostException { mConfig = config; mOffsetTracker = offsetTracker; mConsumerConnector = Consumer.createJavaConsumerConnector(createConsumerConfig()); if (!mConfig.getKafkaTopicBlacklist().isEmpty() && !mConfig.getKafkaTopicFilter().isEmpty()) { throw new RuntimeException("Topic filter and blacklist cannot be both specified."); } TopicFilter topicFilter = !mConfig.getKafkaTopicBlacklist().isEmpty()? new Blacklist(mConfig.getKafkaTopicBlacklist()): new Whitelist(mConfig.getKafkaTopicFilter()); LOG.debug("Use TopicFilter {}({})", topicFilter.getClass(), topicFilter); List<KafkaStream<byte[], byte[]>> streams = mConsumerConnector.createMessageStreamsByFilter(topicFilter); KafkaStream<byte[], byte[]> stream = streams.get(0); mIterator = stream.iterator(); mLastAccessTime = new HashMap<TopicPartition, Long>(); StatsUtil.setLabel("secor.kafka.consumer.id", IdUtil.getConsumerId()); mTopicPartitionForgetSeconds = mConfig.getTopicPartitionForgetSeconds(); mCheckMessagesPerSecond = mConfig.getMessagesPerSecond() / mConfig.getConsumerThreads(); mKafkaMessageTimestampFactory = new KafkaMessageTimestampFactory(mConfig.getKafkaMessageTimestampClass()); }
/** * 初始化Kafka消费者客户端, 并获取Topic对应的Stream */ private void openKafkaStream() { logger.info("开始初始化Kafka消费客户端"); this.consumer = Consumer.createJavaConsumerConnector(getConsumerConfig()); StringDecoder decoder = new StringDecoder(null); Map<String, Integer> topicCountMap = Maps.of(topic, 1); Map<String, List<KafkaStream<String, String>>> consumerMap = consumer.createMessageStreams(topicCountMap, decoder, decoder); List<KafkaStream<String, String>> streams = consumerMap.get(topic); this.stream = streams.get(0); Assert.notNull(stream); }
/** * Create a Kafka consumer. */ @Override public void open() { // these consumers use ZooKeeper for commit, offset and segment consumption tracking // TODO: consider using SimpleConsumer the same way the Hadoop consumer job does to avoid ZK dependency // TODO: use the task details from TopologyContext in the normal open method ConsumerConfig consumerConfig = new ConsumerConfig(kafkaProperties); consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig); // consumer with just one thread since the real parallelism is handled by Storm already Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, new Integer(1)); Map<String, List<KafkaMessageStream>> consumerMap = consumerConnector.createMessageStreams(topicCountMap); KafkaMessageStream stream = consumerMap.get(topic).get(0); consumerIterator = stream.iterator(); }
@Before public void setUp() { data = new KafkaConsumerData(); meta = new KafkaConsumerMeta(); meta.setKafkaProperties(getDefaultKafkaProperties()); meta.setLimit(STEP_LIMIT); stepMeta = new StepMeta("KafkaConsumer", meta); transMeta = new TransMeta(); transMeta.addStep(stepMeta); trans = new Trans(transMeta); PowerMockito.mockStatic(Consumer.class); when(Consumer.createJavaConsumerConnector(any(ConsumerConfig.class))).thenReturn(zookeeperConsumerConnector); when(zookeeperConsumerConnector.createMessageStreams(anyMapOf(String.class, Integer.class))).thenReturn(streamsMap); when(streamsMap.get(anyObject())).thenReturn(stream); when(stream.get(anyInt())).thenReturn(kafkaStream); when(kafkaStream.iterator()).thenReturn(streamIterator); when(streamIterator.next()).thenReturn(generateKafkaMessage()); }
private static Map<String, Location> getVehicleStartPoints() { Map<String, Location> vehicleStartPoint = new HashMap<String, Location>(); Properties props = new Properties(); props.put("zookeeper.connect", ZOOKEEPER_CONNECTION_STRING); props.put("group.id", "DataLoader" + r.nextInt(100)); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); props.put("auto.offset.reset", "smallest"); ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props)); Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(KAFKA_TOPIC_STATIC_DATA, new Integer(1)); KafkaStream<byte[], byte[]> stream = consumer.createMessageStreams(topicCountMap).get(KAFKA_TOPIC_STATIC_DATA) .get(0); ConsumerIterator<byte[], byte[]> it = stream.iterator(); while (it.hasNext()) { String message = new String(it.next().message()); try { vehicleStartPoint = objectMapper.readValue(message, new TypeReference<Map<String, Location>>() { }); } catch (IOException e) { e.printStackTrace(); } break; } consumer.shutdown(); return vehicleStartPoint; }
private ConsumerIterator<String, String> buildConsumer(String topic) { Properties props = consumerProperties(); Map<String, Integer> topicCountMap = new HashMap<>(); topicCountMap.put(topic, 1); ConsumerConfig consumerConfig = new ConsumerConfig(props); consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig); Map<String, List<KafkaStream<String, String>>> consumers = consumerConnector.createMessageStreams(topicCountMap, new StringDecoder(null), new StringDecoder(null)); KafkaStream<String, String> stream = consumers.get(topic).get(0); return stream.iterator(); }
private ConsumerIterator<String, String> buildConsumer(String topic) { Properties props = consumerProperties(); Map<String, Integer> topicCountMap = new HashMap<>(); topicCountMap.put(topic, 1); ConsumerConfig consumerConfig = new ConsumerConfig(props); consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig); Map<String, List<KafkaStream<String, String>>> consumers = consumerConnector.createMessageStreams( topicCountMap, new StringDecoder(null), new StringDecoder(null)); KafkaStream<String, String> stream = consumers.get(topic).get(0); return stream.iterator(); }
public static ConsumerConnector createConsumerConnector(String zkAddr, String group) { Properties props = new Properties(); props.put(ConsumerConfig.GROUP_ID_CONFIG, group); props.put(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG, "false"); props.put(KafkaConfig.ZkConnectProp(), zkAddr); ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(new kafka.consumer.ConsumerConfig(props)); return consumerConnector; }
public DeprecatedConsumer(String topic, Properties props) { this.props.put(ConsumerConstants.ZK_CONNECT, ConsumerConstants.ZK_CLUSTER_LIST); this.props.put(ConsumerConstants.GROUP_ID, ConsumerConstants.GROUPID_KAFKA_TEST); this.props.put(ConsumerConstants.ZK_SESSION_TIMEOUT_MS, "40000"); this.props.put(ConsumerConstants.ZK_SYNC_TIME_MS, "200"); this.props.put(ConsumerConstants.AUTO_COMMIT_INTERVAL_MS, "1000"); this.props.putAll(props); this.consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props)); this.topic = topic; }
@Test public void testLogging() throws InterruptedException { final Logger logger = loggerContext.getLogger("ROOT"); unit.start(); assertTrue("appender is started", unit.isStarted()); for (int i = 0; i<1000; ++i) { final LoggingEvent loggingEvent = new LoggingEvent("a.b.c.d", logger, Level.INFO, "message"+i, null, new Object[0]); unit.append(loggingEvent); } final Properties consumerProperties = new Properties(); consumerProperties.put("metadata.broker.list", kafka.getBrokerList()); consumerProperties.put("group.id", "simple-consumer-" + new Random().nextInt()); consumerProperties.put("auto.commit.enable","false"); consumerProperties.put("auto.offset.reset","smallest"); consumerProperties.put("zookeeper.connect", kafka.getZookeeperConnection()); final kafka.consumer.ConsumerConfig consumerConfig = new kafka.consumer.ConsumerConfig(consumerProperties); final ConsumerConnector javaConsumerConnector = Consumer.createJavaConsumerConnector(consumerConfig); final KafkaStream<byte[], byte[]> log = javaConsumerConnector.createMessageStreamsByFilter(new Whitelist("logs"),1).get(0); final ConsumerIterator<byte[], byte[]> iterator = log.iterator(); for (int i=0; i<1000; ++i) { final String messageFromKafka = new String(iterator.next().message(), UTF8); assertThat(messageFromKafka, Matchers.equalTo("message"+i)); } }
public ConsumerConnector createClient(Properties consumerProperties) { consumerProperties.put("metadata.broker.list", getBrokerList()); consumerProperties.put("group.id", "simple-consumer-" + new Random().nextInt()); consumerProperties.put("auto.commit.enable","false"); consumerProperties.put("auto.offset.reset","smallest"); consumerProperties.put("zookeeper.connect", getZookeeperConnection()); final kafka.consumer.ConsumerConfig consumerConfig = new kafka.consumer.ConsumerConfig(consumerProperties); return Consumer.createJavaConsumerConnector(consumerConfig); }
private static ConsumerIterator<String, String> buildConsumer(String topic) { Map<String, Integer> topicCountMap = new HashMap<>(); topicCountMap.put(topic, 1); ConsumerConfig consumerConfig = new ConsumerConfig(consumerProperties()); ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig); Map<String, List<KafkaStream<String, String>>> consumers = consumerConnector.createMessageStreams(topicCountMap, new StringDecoder(null), new StringDecoder(null)); KafkaStream<String, String> stream = consumers.get(topic).get(0); return stream.iterator(); }
public static void main(String[] args) { args = new String[] { Constants.ZK_SERVER, Constants.TOPIC_NAME, "group1", "consumer1" }; if (args == null || args.length != 4) { System.err.println("Usage:\n\tjava -jar kafka_consumer.jar ${zookeeper_list} ${topic_name} ${group_name} ${consumer_id}"); System.exit(1); } String zk = args[0]; String topic = args[1]; String groupid = args[2]; String consumerid = args[3]; Properties props = new Properties(); props.put("zookeeper.connect", zk); props.put("group.id", groupid); props.put("client.id", "test"); props.put("consumer.id", consumerid); props.put("auto.offset.reset", "smallest"); props.put("auto.commit.enable", "true"); props.put("auto.commit.interval.ms", "60000"); ConsumerConfig consumerConfig = new ConsumerConfig(props); ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig); Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, 1); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicCountMap); KafkaStream<byte[], byte[]> stream1 = consumerMap.get(topic).get(0); ConsumerIterator<byte[], byte[]> interator = stream1.iterator(); while (interator.hasNext()) { MessageAndMetadata<byte[], byte[]> messageAndMetadata = interator.next(); String message = String.format( "Topic:%s, GroupID:%s, Consumer ID:%s, PartitionID:%s, Offset:%s, Message Key:%s, Message Payload: %s", messageAndMetadata.topic(), groupid, consumerid, messageAndMetadata.partition(), messageAndMetadata.offset(), new String(messageAndMetadata.key() != null ? messageAndMetadata.key() : "".getBytes()), new String(messageAndMetadata.message())); System.out.println(message); } }
public void collect() throws Exception{ Properties properties = new Properties(); //class name을 user_id, grup_id로 사용함 properties.put("zookeeper.connect",Utils.ZOOKEEPER_LIST); properties.put("group.id",group_id); properties.put("zookeeper.session.timeout.ms", "6000"); properties.put("zookeeper.sync.time.ms", "2000"); properties.put("auto.commit.enable", "true"); properties.put("auto.commit.interval.ms", "5000"); properties.put("fetch.message.max.bytes", "31457280"); // 30MB properties.put("auto.offset.reset", "smallest"); //properties.put("auto.offset.reset", "largest"); final ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(properties)); Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(TOPIC, NUM_THREADS); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(TOPIC); ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS); log.debug("NUM_THREADS : "+NUM_THREADS); //for (final KafkaStream<byte[], byte[]> stream : streams) { // executor.execute(new ConsumerT(stream)); //} for (int m = 0; m < NUM_THREADS; m++) { executor.execute(new ConsumerT(streams.get(m))); } }
public void init(){ Properties props = new Properties(); props.put("zookeeper.connect", Constants.kfZkServers); props.put("group.id", Constants.kfGroupId); props.put("auto.offset.reset", Constants.kfAutoOffsetReset); props.put("zookeeper.session.timeout.ms", "4000"); props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "1000"); props.put("serializer.class", "kafka.serializer.StringEncoder"); ConsumerConfig config = new ConsumerConfig(props); consumer = Consumer.createJavaConsumerConnector(config); }
public KafkaConsumer(final String topic, final String zkConnect, final String groupId, Decoder<Val> decoder){ consumer = kafka.consumer.Consumer.createJavaConsumerConnector( new ConsumerConfig(getConsumerConfig(zkConnect, groupId))); Map<String, Integer> topicCountMap = new HashMap(){{ put(topic, new Integer(1)); }}; Map<String, List<KafkaStream<byte[], Val>>> consumerMap = consumer.createMessageStreams(topicCountMap, new DefaultDecoder(null), decoder); stream = consumerMap.get(topic).get(0); }
public OldConsumer(String topic, Properties consumerProperties) { _connector = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumerProperties)); Map<String, Integer> topicCountMap = new HashMap<>(); topicCountMap.put(topic, 1); Map<String, List<KafkaStream<String, String>>> kafkaStreams = _connector.createMessageStreams(topicCountMap, new StringDecoder(null), new StringDecoder(null)); _iter = kafkaStreams.get(topic).get(0).iterator(); }
public static void main(String[] args) { args = new String[] { "zookeeper0:2181/kafka", "topic1", "group2", "consumer1" }; if (args == null || args.length != 4) { System.err.println("Usage:\n\tjava -jar kafka_consumer.jar ${zookeeper_list} ${topic_name} ${group_name} ${consumer_id}"); System.exit(1); } String zk = args[0]; String topic = args[1]; String groupid = args[2]; String consumerid = args[3]; Properties props = new Properties(); props.put("zookeeper.connect", zk); props.put("group.id", groupid); props.put("client.id", "test"); props.put("consumer.id", consumerid); props.put("auto.offset.reset", "largest"); props.put("auto.commit.enable", "false"); props.put("auto.commit.interval.ms", "60000"); ConsumerConfig consumerConfig = new ConsumerConfig(props); ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig); Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, 1); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicCountMap); KafkaStream<byte[], byte[]> stream1 = consumerMap.get(topic).get(0); ConsumerIterator<byte[], byte[]> interator = stream1.iterator(); while (interator.hasNext()) { MessageAndMetadata<byte[], byte[]> messageAndMetadata = interator.next(); String message = String.format( "Topic:%s, GroupID:%s, Consumer ID:%s, PartitionID:%s, Offset:%s, Message Key:%s, Message Payload: %s", messageAndMetadata.topic(), groupid, consumerid, messageAndMetadata.partition(), messageAndMetadata.offset(), new String(messageAndMetadata.key()), new String(messageAndMetadata.message())); System.out.println(message); consumerConnector.commitOffsets(); } }
public KafkaEventConsumer(ActorSystem system) { logger.debug("Initilizing Consumer"); hazelcastEventActor = system.actorOf(Props.create(HazelcastEventActor.class), "hazelcastEventHandler"); mangoDBEventActor = system.actorOf(Props.create(MongoDBEventActor.class), "mongoDbEventHandler"); Properties props = new Properties(); props.put("zookeeper.connect", "localhost:2181"); props.put("group.id", "eventProcessor"); props.put("client.id", "workerEventProcessor"); props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "1000"); props.put("auto.offset.reset", "smallest"); ConsumerConfig consumerConfig = new ConsumerConfig(props); consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig); logger.debug("Intialized Consumer"); }
protected void initKafka() { if (handler == null) { log.error("Exectuor can't be null!"); throw new RuntimeException("Exectuor can't be null!"); } log.info("Consumer properties:" + properties); ConsumerConfig config = new ConsumerConfig(properties); isAutoCommitOffset = config.autoCommitEnable(); log.info("Auto commit: " + isAutoCommitOffset); consumerConnector = Consumer.createJavaConsumerConnector(config); Map<String, Integer> topics = new HashMap<String, Integer>(); topics.put(topic, streamNum); StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties()); StringDecoder valueDecoder = new StringDecoder( new VerifiableProperties()); Map<String, List<KafkaStream<String, String>>> streamsMap = consumerConnector .createMessageStreams(topics, keyDecoder, valueDecoder); streams = streamsMap.get(topic); log.info("Streams:" + streams); if (streams == null || streams.isEmpty()) { log.error("Streams are empty."); throw new IllegalArgumentException("Streams are empty."); } streamThreadPool = Executors.newFixedThreadPool(streamNum); }
/** * Read topic to list, only using Kafka code. */ private static List<MessageAndMetadata<byte[], byte[]>> readTopicToList(String topicName, ConsumerConfig config, final int stopAfter) { ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(config); // we request only one stream per consumer instance. Kafka will make sure that each consumer group // will see each message only once. Map<String, Integer> topicCountMap = Collections.singletonMap(topicName, 1); Map<String, List<KafkaStream<byte[], byte[]>>> streams = consumerConnector.createMessageStreams(topicCountMap); if (streams.size() != 1) { throw new RuntimeException("Expected only one message stream but got " + streams.size()); } List<KafkaStream<byte[], byte[]>> kafkaStreams = streams.get(topicName); if (kafkaStreams == null) { throw new RuntimeException("Requested stream not available. Available streams: " + streams.toString()); } if (kafkaStreams.size() != 1) { throw new RuntimeException("Requested 1 stream from Kafka, bot got " + kafkaStreams.size() + " streams"); } LOG.info("Opening Consumer instance for topic '{}' on group '{}'", topicName, config.groupId()); ConsumerIterator<byte[], byte[]> iteratorToRead = kafkaStreams.get(0).iterator(); List<MessageAndMetadata<byte[], byte[]>> result = new ArrayList<>(); int read = 0; while (iteratorToRead.hasNext()) { read++; result.add(iteratorToRead.next()); if (read == stopAfter) { LOG.info("Read " + read + " elements"); return result; } } return result; }
/** * Read topic to list, only using Kafka code. */ private static List<MessageAndMetadata<byte[], byte[]>> readTopicToList(String topicName, ConsumerConfig config, final int stopAfter) { ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(config); // we request only one stream per consumer instance. Kafka will make sure that each consumer group // will see each message only once. Map<String,Integer> topicCountMap = Collections.singletonMap(topicName, 1); Map<String, List<KafkaStream<byte[], byte[]>>> streams = consumerConnector.createMessageStreams(topicCountMap); if (streams.size() != 1) { throw new RuntimeException("Expected only one message stream but got "+streams.size()); } List<KafkaStream<byte[], byte[]>> kafkaStreams = streams.get(topicName); if (kafkaStreams == null) { throw new RuntimeException("Requested stream not available. Available streams: "+streams.toString()); } if (kafkaStreams.size() != 1) { throw new RuntimeException("Requested 1 stream from Kafka, bot got "+kafkaStreams.size()+" streams"); } LOG.info("Opening Consumer instance for topic '{}' on group '{}'", topicName, config.groupId()); ConsumerIterator<byte[], byte[]> iteratorToRead = kafkaStreams.get(0).iterator(); List<MessageAndMetadata<byte[], byte[]>> result = new ArrayList<>(); int read = 0; while(iteratorToRead.hasNext()) { read++; result.add(iteratorToRead.next()); if (read == stopAfter) { LOG.info("Read "+read+" elements"); return result; } } return result; }