/** * When an object implementing interface <code>Runnable</code> is used * to create a thread, starting the thread causes the object's * <code>run</code> method to be called in that separately executing * thread. * <p> * The general contract of the method <code>run</code> is that it may * take any action whatsoever. * * @see Thread#run() */ @Override public void run() { ConsumerConnector consumerConnector = KafkaUtils.createConsumerConnector(zkAddr, group); Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(CONSUMER_OFFSET_TOPIC, new Integer(1)); KafkaStream<byte[], byte[]> offsetMsgStream = consumerConnector.createMessageStreams(topicCountMap).get(CONSUMER_OFFSET_TOPIC).get(0); ConsumerIterator<byte[], byte[]> it = offsetMsgStream.iterator(); while (true) { MessageAndMetadata<byte[], byte[]> offsetMsg = it.next(); if (ByteBuffer.wrap(offsetMsg.key()).getShort() < 2) { try { GroupTopicPartition commitKey = readMessageKey(ByteBuffer.wrap(offsetMsg.key())); if (offsetMsg.message() == null) { continue; } kafka.common.OffsetAndMetadata commitValue = readMessageValue(ByteBuffer.wrap(offsetMsg.message())); kafkaConsumerOffsets.put(commitKey, commitValue); } catch (Exception e) { e.printStackTrace(); } } } }
public MessageAndMetadata getNextMessage(String topic) { List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); // it has only a single stream, because there is only one consumer KafkaStream stream = streams.get(0); final ConsumerIterator<byte[], byte[]> it = stream.iterator(); int counter = 0; try { if (it.hasNext()) { return it.next(); } else { return null; } } catch (ConsumerTimeoutException e) { logger.error("0 messages available to fetch for the topic " + topic); return null; } }
@Test public void testLogging() throws InterruptedException { for (int i = 0; i<1000; ++i) { logger.info("message"+i); } final KafkaStream<byte[], byte[]> log = kafka.createClient().createMessageStreamsByFilter(new Whitelist("logs"),1).get(0); final ConsumerIterator<byte[], byte[]> iterator = log.iterator(); for (int i=0; i<1000; ++i) { final String messageFromKafka = new String(iterator.next().message(), UTF8); assertThat(messageFromKafka, Matchers.equalTo("message"+i)); } }
@Override public void run() { Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(transducer_topic, new Integer(1)); StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties()); StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties()); Map<String, List<KafkaStream<String, String>>> consumerMap = consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder); KafkaStream<String, String> stream = consumerMap.get(transducer_topic).get(0); ConsumerIterator<String, String> it = stream.iterator(); while (it.hasNext() && bStartConsume){ transducerDataProcessor.newData(it.next().message()); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } }
void consume() throws Exception { // specify the number of consumer threads Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(KafkaProducer.TOPIC, new Integer(threadsNum)); // specify data decoder StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties()); StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties()); Map<String, List<KafkaStream<String, String>>> consumerMap = consumer .createMessageStreams(topicCountMap, keyDecoder, valueDecoder); // 三个String分别为TOPIC、Key、Value // acquire data List<KafkaStream<String, String>> streams = consumerMap.get(KafkaProducer.TOPIC); // multi-threaded consume executor = Executors.newFixedThreadPool(threadsNum); //create a thread pool for (final KafkaStream<String, String> stream : streams) { executor.submit(new ConsumerThread(stream)); // run thread } }
public KafkaDataProvider(String zookeeper, String topic, String groupId) { super(MessageAndMetadata.class); Properties props = new Properties(); props.put("zookeeper.connect", zookeeper); props.put("group.id", groupId); props.put("zookeeper.session.timeout.ms", "30000"); props.put("auto.commit.interval.ms", "1000"); props.put("fetch.message.max.bytes", "4194304"); consumer = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(props)); Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, 1); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0); iter = stream.iterator(); }
public void nextTuple() { Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(TopologyConfig.kafkaTopic, 1);//one excutor - one thread Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = conn.createMessageStreams(topicCountMap); List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(kafkaTopic); ConsumerIterator<byte[], byte[]> iter = streams.get(0).iterator(); while(true){ while(iter.hasNext()){ String s = new String(iter.next().message()); collector.emit(new Values(s)); UUID msgId = UUID.randomUUID(); this.pending.put(msgId, new Values(s)); } try { Thread.sleep(1000L); } catch (InterruptedException e) { logger.error("Spout : sleep wrong \n", e); } } }
/** * 启动 MessageReceiver,开始监听topic消息 */ @Override public void start() { if (consumer == null) { //sync init synchronized (lock) { init(); } } String topicString = buildTopicsString(); Whitelist topicFilter = new Whitelist(topicString); List<KafkaStream<byte[], byte[]>> streamList = consumer.createMessageStreamsByFilter(topicFilter, partitions); if (org.apache.commons.collections.CollectionUtils.isEmpty(streamList)) try { TimeUnit.MILLISECONDS.sleep(1); } catch (InterruptedException e) { log.warn(e.getMessage(), e); } processStreamsByTopic(topicString, streamList); }
private void processStreamsByTopic(String topicKeys, List<KafkaStream<byte[], byte[]>> streamList) { // init stream thread pool ExecutorService streamPool = Executors.newFixedThreadPool(partitions); String[] topics = StringUtils.split(topicKeys, ","); if (log.isDebugEnabled()) log.debug("准备处理消息流集合 KafkaStreamList,topic count={},topics={}, partitions/topic={}", topics.length, topicKeys, partitions); //遍历stream AtomicInteger index = new AtomicInteger(0); for (KafkaStream<byte[], byte[]> stream : streamList) { Thread streamThread = new Thread() { @Override public void run() { int i = index.getAndAdd(1); if (log.isDebugEnabled()) log.debug("处理消息流KafkaStream -- No.={}, partitions={}", i, partitions + ":" + i); ConsumerIterator<byte[], byte[]> consumerIterator = stream.iterator(); processStreamByConsumer(topicKeys, consumerIterator); } }; streamPool.execute(streamThread); } }
/** * KafkaConsumer() is constructor. It has following 4 parameters:- * @param topic * @param group * @param id * @param cc * */ public KafkaConsumer(String topic, String group, String id, ConsumerConnector cc) { fTopic = topic; fGroup = group; fId = id; fConnector = cc; fCreateTimeMs = System.currentTimeMillis(); fLastTouch = fCreateTimeMs; fLogTag = fGroup + "(" + fId + ")/" + fTopic; offset = 0; state = KafkaConsumer.State.OPENED; final Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(fTopic, 1); final Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = fConnector .createMessageStreams(topicCountMap); final List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(fTopic); fStream = streams.iterator().next(); }
public void collectMq(){ Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(Constants.kfTopic, new Integer(1)); StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties()); StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties()); Map<String, List<KafkaStream<String, String>>> consumerMap = consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder); KafkaStream<String, String> stream = consumerMap.get(Constants.kfTopic).get(0); ConsumerIterator<String, String> it = stream.iterator(); MessageAndMetadata<String, String> msgMeta; while (it.hasNext()){ msgMeta = it.next(); super.mqTimer.parseMqText(msgMeta.key(), msgMeta.message()); //System.out.println(msgMeta.key()+"\t"+msgMeta.message()); } }
public void run(int a_numThreads) { Map<String, Integer> topicCountMap = new HashMap<>(); topicCountMap.put(topic, new Integer(a_numThreads)); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); // now launch all the threads // executor = Executors.newFixedThreadPool(a_numThreads); // now create an object to consume the messages // int threadNumber = 0; for (final KafkaStream stream : streams) { executor.submit(new ConsumerThread(consumer, stream, threadNumber)); threadNumber++; } }
public void run(Decoder<K> keyDecoder, Decoder<V> valueDecoder){ Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, threadNum); Map<String, List<KafkaStream<K, V>>> consumerMap = consumer.createMessageStreams(topicCountMap, keyDecoder, valueDecoder); List<KafkaStream<K, V>> streams = consumerMap.get(topic); executor = Executors.newFixedThreadPool(threadNum); int threadNo = 0; for (final KafkaStream<K, V> stream : streams) { ConsumerWorker<K, V> worker = new ConsumerWorker<K, V>(stream, threadNo); executor.submit(worker); threadNo++; } }
private void consumeMessages() { final Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(TOPIC, 1); final StringDecoder decoder = new StringDecoder(new VerifiableProperties()); final Map<String, List<KafkaStream<String, String>>> consumerMap = consumer.createMessageStreams(topicCountMap, decoder, decoder); final KafkaStream<String, String> stream = consumerMap.get(TOPIC).get(0); final ConsumerIterator<String, String> iterator = stream.iterator(); Thread kafkaMessageReceiverThread = new Thread( () -> { while (iterator.hasNext()) { String msg = iterator.next().message(); msg = msg == null ? "<null>" : msg; System.out.println("got message: " + msg); messagesReceived.add(msg); } }, "kafkaMessageReceiverThread" ); kafkaMessageReceiverThread.start(); }
public void startup() { if (status != Status.INIT) { log.error("The client has been started."); throw new IllegalStateException("The client has been started."); } status = Status.RUNNING; log.info("Streams num: " + streams.size()); tasks = new ArrayList<AbstractMessageTask>(); for (KafkaStream<String, String> stream : streams) { AbstractMessageTask abstractMessageTask = (fixedThreadNum == 0 ? new SequentialMessageTask( stream, handler) : new ConcurrentMessageTask(stream, handler, fixedThreadNum)); tasks.add(abstractMessageTask); streamThreadPool.execute(abstractMessageTask); } }
public void run(int a_numThreads) { Map<String, Integer> topicCountMap = new HashMap<>(); //这个版本的kafka的这个消费线程填写要注意。否则收不到数据 topicCountMap.put(topic, new Integer(a_numThreads)); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); // 启动所有线程 executor = Executors.newFixedThreadPool(a_numThreads); // 开始消费消息 int threadNumber = 0; for (final KafkaStream stream : streams) { executor.submit(new ConsumerTest(stream, threadNumber)); threadNumber++; } }
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { _collector = spoutOutputCollector; Properties props = new Properties(); props.put("zookeeper.connect", conf.get(OSMIngest.ZOOKEEPERS)); props.put("group.id", groupId); props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "1000"); ConsumerConfig consumerConfig = new ConsumerConfig(props); ConsumerConnector consumer = Consumer.createJavaConsumerConnector(consumerConfig); Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, 1); Map<String, List<KafkaStream<String, String>>> consumerMap = consumer.createMessageStreams(topicCountMap, new StringDecoder(new VerifiableProperties()), new StringDecoder(new VerifiableProperties())); List<KafkaStream<String, String>> streams = consumerMap.get(topic); KafkaStream<String, String> stream = null; if (streams.size() == 1) { stream = streams.get(0); } else { log.error("Streams should be of size 1"); } kafkaIterator = stream.iterator(); }
public void addNewConsumer(String topic, Integer threads){ ConsumerConnector consumer = consumerConnMap.get(topic); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = null; Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, threads); consumerMap = consumer.createMessageStreams(topicCountMap); List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); ExecutorService executor = Executors.newFixedThreadPool(threads); for (final KafkaStream<byte[], byte[]> stream : streams) { executor.submit(new Consumer(stream, this)); } executorMap.put(topic, executor); }
/** * Run. * * @param numThreads the num threads */ public void run(int numThreads) { Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, new Integer(numThreads)); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); // now launch all the threads // executor = Executors.newFixedThreadPool(numThreads); // now create an object to consume the messages // int threadNumber = 0; for (final KafkaStream stream : streams) { executor.submit(new ConsumerDemoThread(stream, threadNumber, this)); threadNumber++; } }
/** * Run. * * @param a_numThreads the a num threads */ public void run(int a_numThreads) { Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, new Integer(a_numThreads)); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); // now launch all the threads // executor = Executors.newFixedThreadPool(a_numThreads); // now create an object to consume the messages // int threadNumber = 0; for (final KafkaStream stream : streams) { executor.submit(new AvroConsumerThread(stream, threadNumber)); threadNumber++; } }
/** * Run. * * @param numThreads the num threads */ public void run(int numThreads) { Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, new Integer(numThreads)); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); // now launch all the threads // executor = Executors.newFixedThreadPool(numThreads); // now create an object to consume the messages // int threadNumber = 0; for (final KafkaStream<byte[], byte[]> stream : streams) { executor.submit(new AvroConsumerWeatherThread(stream, threadNumber)); threadNumber++; } }
/** * Run. * * @param numThreads the num threads */ public void run(int numThreads) { Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, new Integer(numThreads)); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); // now launch all the threads // executor = Executors.newFixedThreadPool(numThreads); // now create an object to consume the messages // int threadNumber = 0; for (final KafkaStream<byte[], byte[]> stream : streams) { executor.submit(new AvroConsumerWaveThread(stream, threadNumber)); threadNumber++; } }
/** * Run. * * @param a_numThreads the a num threads */ public void run(int a_numThreads) { Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, new Integer(a_numThreads)); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); // now launch all the threads // executor = Executors.newFixedThreadPool(a_numThreads); // now create an object to consume the messages // int threadNumber = 0; for (final KafkaStream stream : streams) { executor.submit(new ConsumerDemoThread(stream, threadNumber, this)); threadNumber++; } }
public void recv() { consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig()); Map<String, Integer> topicMap = new HashMap<String, Integer>(); topicMap.put(topic, new Integer(1)); Map<String, List<KafkaStream<String, String>>> streamMap = consumer.createMessageStreams(topicMap, new StringDecoder(null), new StringDecoder(null)); KafkaStream<String, String> stream = streamMap.get(topic).get(0); ConsumerIterator<String, String> it = stream.iterator(); while (it.hasNext()) { MessageAndMetadata<String, String> mm = it.next(); System.out.println("<<< Got new message"); System.out.println("<<< key:" + mm.key()); System.out.println("<<< m: " + mm.message()); } }
public void activate() { consumer =kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig()); Map<String,Integer> topickMap = new HashMap<String, Integer>(); topickMap.put(topic, 1); System.out.println("*********Results********topic:"+topic); Map<String, List<KafkaStream<byte[],byte[]>>> streamMap=consumer.createMessageStreams(topickMap); KafkaStream<byte[],byte[]>stream = streamMap.get(topic).get(0); ConsumerIterator<byte[],byte[]> it =stream.iterator(); while(it.hasNext()){ String value =new String(it.next().message()); SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd日 HH:mm:ss SSS"); Date curDate = new Date(System.currentTimeMillis()); String str = formatter.format(curDate); System.out.println("storm接收到来自kafka的消息--->" + value); collector.emit(new Values(value,1,str), value); } }
@Test public void test_producer() throws Exception { String topic = "test"; ProducerProperties properties = new ProducerProperties(); properties.override(ProducerProperties.NETTY_DEBUG_PIPELINE, true); createTopic(topic); KafkaProducer producer = new KafkaProducer("localhost", START_PORT, topic, properties); producer.connect().sync(); KafkaTopic kafkaTopic = producer.topic(); kafkaTopic.send(null, freeLaterBuffer((TEST_MESSAGE + "01").getBytes())); kafkaTopic.send(null, freeLaterBuffer((TEST_MESSAGE + "02").getBytes())); kafkaTopic.send(null, freeLaterBuffer((TEST_MESSAGE + "03").getBytes())); final KafkaStream<byte[], byte[]> stream = consume(topic).get(0); final ConsumerIterator<byte[], byte[]> messages = stream.iterator(); Assert.assertThat(new String(messages.next().message()), is(TEST_MESSAGE + "01")); Assert.assertThat(new String(messages.next().message()), is(TEST_MESSAGE + "02")); Assert.assertThat(new String(messages.next().message()), is(TEST_MESSAGE + "03")); producer.disconnect().sync(); }
public String getDataFromKafka(int a_numThreads) throws Exception { Map<String, Integer> topicCountMap = new HashMap<>(); topicCountMap.put(TestConstants.topic, new Integer(a_numThreads)); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(TestConstants.topic); // now launch all the threads // executor = Executors.newFixedThreadPool(a_numThreads); // now create an object to consume the messages // Future fut = null; int threadNumber = 0; ConsumerTest consumerTest = null; for (final KafkaStream stream : streams) { consumerTest = new ConsumerTest(stream, threadNumber); fut = executor.submit(consumerTest); threadNumber++; } Object result = fut.get(); return consumerTest.getResult(); }
/** * Modified example from kafka site with some defensive checks added. */ private ConsumerIterator<String, String> getStreamIterator() { Map<String, Integer> topicCountMap = ImmutableMap.of(topic, TOPIC_COUNT); Map<String, List<KafkaStream<String, String>>> consumerMap = consumer.createMessageStreams(topicCountMap, keyDecoder, msgDecoder); List<KafkaStream<String, String>> streams = consumerMap.get(topic); Preconditions.checkNotNull(streams, "There is no topic named : " + topic); //copy in case of live list returned. Needed for index check below. ImmutableList<KafkaStream<String, String>> streamsCopy = ImmutableList.copyOf(streams); Preconditions.checkElementIndex(FIRST_ELEMENT_INDEX, streamsCopy.size(), "Failed to find any KafkaStreams related to topic : " + topic); KafkaStream<String, String> stream = streamsCopy.get(FIRST_ELEMENT_INDEX); Preconditions.checkNotNull(stream, "Returned kafka stream is null"); ConsumerIterator<String, String> iterator = stream.iterator(); Preconditions.checkNotNull(iterator, "Returned kafka iterator is null"); return iterator; }
private void run(int threadNum) { Map<String, Integer> topicCountMap = new HashMap<>(); topicCountMap.put(topic, threadNum); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); executor = Executors.newFixedThreadPool(threadNum); int threadNumber = 0; _log.info("the streams size is {}", streams.size()); for (final KafkaStream<byte[], byte[]> stream : streams) { executor.submit(new ConsumerWorker(stream, threadNumber)); consumer.commitOffsets(); threadNumber++; } }
public CassandaAvroConsumer(SchemaRepository repo, KafkaStream stream) { this.repo = repo; this.stream = stream; this.id = nextId++; this.cluster = Cluster.builder() .addContactPoint(Config.cassandraUrl) .build(); Metadata metadata = cluster.getMetadata(); log.debug("Connected to cluster: %s\n", metadata.getClusterName()); for (Host host : metadata.getAllHosts()) { log.debug("Datatacenter: %s; Host: %s; Rack: %s\n", host.getDatacenter(), host.getAddress(), host.getRack()); } this.session = cluster.connect(); createSchemas(); createInserts(); }
@Override public CompletionService<Histogram> startConsumers() { final ConsumerConfig consumerConfig = new ConsumerConfig(props); consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig); // Create message streams final Map<String, Integer> topicMap = new HashMap<>(); topicMap.put(topic, numThreads); final Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicMap); final List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); // Pass each stream to a consumer that will read from the stream in its own thread. for (final KafkaStream<byte[], byte[]> stream : streams) { executorCompletionService.submit(new BlockingKafkaMessageConsumer(stream)); } return executorCompletionService; }
public void run(int a_numThreads) { Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, new Integer(a_numThreads)); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); // now launch all the threads // executor = Executors.newFixedThreadPool(a_numThreads); // now create an object to consume the messages // int threadNumber = 0; for (final KafkaStream stream : streams) { executor.submit(new ConsumerTest(stream, threadNumber)); threadNumber++; } }
public void run(int a_numThreads) { Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, new Integer(a_numThreads)); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); // now launch all the threads // executor = Executors.newFixedThreadPool(a_numThreads); // now create an object to consume the messages // int threadNumber = 0; for (final KafkaStream<byte[], byte[]> stream : streams) { executor.submit(new ConsumerTest(stream, threadNumber)); threadNumber++; } }
/**消费消息 [指定Topic] * * @param topicName 队列名称 * @param groupId Group Name * @return */ static MsgIterator consume(String topicName, String groupId) { ConsumerConnector consumerConnector = KafkaHelper.getConsumer(groupId); Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); //(topic, #stream) pair topicCountMap.put(topicName, new Integer(1)); //TODO: 可消费多个topic Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicCountMap); //Using default decoder List<KafkaStream<byte[], byte[]>> streamList = consumerMap.get(topicName); //The number of items in the list is #streams, Each Stream supoorts an iterator over message/metadata pair KafkaStream<byte[], byte[]> stream = streamList.get(0); //KafkaStream[K,V] K代表partitio Key的类型,V代表Message Value的类型 ConsumerIterator<byte[], byte[]> it = stream.iterator(); MsgIterator iter = new MsgIterator(it); return iter; }
/**消费消息 [指定Topic] 指定线程 * * @param topicName 队列名称 * @param numStreams Number of streams to return * @return A list of MsgIterator each of which provides an iterator over message over allowed topics */ static List<MsgIterator> consume(String topicName, int numStreams, String groupId) { ConsumerConnector consumerConnector = KafkaHelper.getConsumer(groupId); Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); //(topic, #stream) pair topicCountMap.put(topicName, numStreams); //TODO: 可消费多个topic Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicCountMap); //Using default decoder List<KafkaStream<byte[], byte[]>> streamList = consumerMap.get(topicName); //The number of items in the list is #streams, Each Stream supoorts an iterator over message/metadata pair List<MsgIterator> iterList = new ArrayList<MsgIterator>(); for (KafkaStream<byte[], byte[]> stream : streamList) { ConsumerIterator<byte[], byte[]> it = stream.iterator(); MsgIterator iter = new MsgIterator(it); iterList.add(iter); } //KafkaStream[K,V] K代表partitio Key的类型,V代表Message Value的类型 return iterList; }
public void run(int numThreads) { Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, new Integer(numThreads)); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); // now launch all the threads // executor = Executors.newFixedThreadPool(numThreads); // now create an object to consume the messages // int threadNumber = 0; for (final KafkaStream stream : streams) { executor.submit(new ConsumerThread(stream, receiveGeoEvent, threadNumber)); threadNumber++; } }