/** * * @param connector * @param topics * @param processThreads */ @SuppressWarnings("unchecked") public OldApiTopicConsumer(ConsumerContext context) { this.consumerContext = context; try { Class<?> deserializerClass = Class .forName(context.getProperties().getProperty("value.deserializer")); deserializer = (Deserializer<Object>) deserializerClass.newInstance(); } catch (Exception e) { } this.connector = kafka.consumer.Consumer .createJavaConsumerConnector(new ConsumerConfig(context.getProperties())); int poolSize = consumerContext.getMessageHandlers().size(); this.fetchExecutor = new StandardThreadExecutor(poolSize, poolSize, 0, TimeUnit.SECONDS, poolSize, new StandardThreadFactory("KafkaFetcher")); this.defaultProcessExecutor = new StandardThreadExecutor(1, context.getMaxProcessThreads(), 30, TimeUnit.SECONDS, context.getMaxProcessThreads(), new StandardThreadFactory("KafkaProcessor"), new PoolFullRunsPolicy()); logger.info( "Kafka Conumer ThreadPool initialized,fetchPool Size:{},defalutProcessPool Size:{} ", poolSize, context.getMaxProcessThreads()); }
public KafkaDataProvider(String zookeeper, String topic, String groupId) { super(MessageAndMetadata.class); Properties props = new Properties(); props.put("zookeeper.connect", zookeeper); props.put("group.id", groupId); props.put("zookeeper.session.timeout.ms", "30000"); props.put("auto.commit.interval.ms", "1000"); props.put("fetch.message.max.bytes", "4194304"); consumer = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(props)); Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, 1); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0); iter = stream.iterator(); }
private static ConsumerConfig createConsumerConfig(String zookeeper, String groupId) { /** * this method used to set kafka-consumer configuration * * Args : * m_zookeeper: zookeeper address with port * m_groupId : kafka-consumer consumer group * * Return : * an object of ConnsumerConfig * */ Properties props = new Properties(); props.put("zookeeper.connect", zookeeper); props.put("group.id", groupId); props.put("zookeeper.session.timeout.ms", "400"); props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "1000"); return new ConsumerConfig(props); }
private ConsumerConfig createConsumerConfig(String groupId, String consumerId) { final Properties props = new Properties(); props.put("zookeeper.connect", fZooKeeper); props.put("group.id", groupId); props.put("consumer.id", consumerId); //props.put("auto.commit.enable", "false"); // additional settings: start with our defaults, then pull in configured // overrides props.putAll(KafkaInternalDefaults); for (String key : KafkaConsumerKeys) { transferSettingIfProvided(props, key, "kafka"); } return new ConsumerConfig(props); }
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { _collector = spoutOutputCollector; Properties props = new Properties(); props.put("zookeeper.connect", conf.get(OSMIngest.ZOOKEEPERS)); props.put("group.id", groupId); props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "1000"); ConsumerConfig consumerConfig = new ConsumerConfig(props); ConsumerConnector consumer = Consumer.createJavaConsumerConnector(consumerConfig); Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, 1); Map<String, List<KafkaStream<String, String>>> consumerMap = consumer.createMessageStreams(topicCountMap, new StringDecoder(new VerifiableProperties()), new StringDecoder(new VerifiableProperties())); List<KafkaStream<String, String>> streams = consumerMap.get(topic); KafkaStream<String, String> stream = null; if (streams.size() == 1) { stream = streams.get(0); } else { log.error("Streams should be of size 1"); } kafkaIterator = stream.iterator(); }
@SuppressWarnings("unchecked") public void prepare() { Properties props = geneConsumerProp(); for(String topicName : topic.keySet()){ ConsumerConnector consumer = kafka.consumer.Consumer .createJavaConsumerConnector(new ConsumerConfig(props)); consumerConnMap.put(topicName, consumer); } if(distributed!=null){ try { logger.warn("zkDistributed is start..."); zkDistributed = ZkDistributed.getSingleZkDistributed(distributed); zkDistributed.zkRegistration(); } catch (Exception e) { // TODO Auto-generated catch block logger.error("zkRegistration fail:{}",ExceptionUtil.getErrorMessage(e)); } } }
public void reconnConsumer(String topicName){ //停止topic 对应的conn ConsumerConnector consumerConn = consumerConnMap.get(topicName); consumerConn.commitOffsets(true); consumerConn.shutdown(); consumerConnMap.remove(topicName); //停止topic 对应的stream消耗线程 ExecutorService es = executorMap.get(topicName); es.shutdownNow(); executorMap.remove(topicName); Properties prop = geneConsumerProp(); ConsumerConnector newConsumerConn = kafka.consumer.Consumer .createJavaConsumerConnector(new ConsumerConfig(prop)); consumerConnMap.put(topicName, newConsumerConn); addNewConsumer(topicName, topic.get(topicName)); }
public KafkaConsumerConnector(String zk, String groupName) { //Get group id which should be unique for table so as to keep offsets clean for multiple runs. String groupId = "voltdb-" + groupName; //TODO: Should get this from properties file or something as override? Properties props = new Properties(); props.put("zookeeper.connect", zk); props.put("group.id", groupId); props.put("zookeeper.session.timeout.ms", "400"); props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "1000"); props.put("auto.commit.enable", "true"); props.put("auto.offset.reset", "smallest"); props.put("rebalance.backoff.ms", "10000"); m_consumerConfig = new ConsumerConfig(props); m_consumer = kafka.consumer.Consumer.createJavaConsumerConnector(m_consumerConfig); }
@Override public CompletionService<Histogram> startConsumers() { final ConsumerConfig consumerConfig = new ConsumerConfig(props); consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig); // Create message streams final Map<String, Integer> topicMap = new HashMap<>(); topicMap.put(topic, numThreads); final Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicMap); final List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); // Pass each stream to a consumer that will read from the stream in its own thread. for (final KafkaStream<byte[], byte[]> stream : streams) { executorCompletionService.submit(new BlockingKafkaMessageConsumer(stream)); } return executorCompletionService; }
/** * {@inheritDoc} */ @Override public void initialize() throws StreamingException { ConsumerConfig consumerConfig = new ConsumerConfig(kafkaProperties); consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig); Map<String, Integer> topicCountMap = Maps.newHashMap(); topicCountMap.put(topic, TOPIC_COUNT); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicCountMap); KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0); consumerIterator = stream.iterator(); }
public static void main(String[] args) throws Exception { if (id == null) throw new IllegalStateException("Undefined HC_ID"); if (zk == null) throw new IllegalStateException("Undefined HC_ZK"); out.println("Starting " + HttpClient.class.getSimpleName()); out.println("Using zk:" + zk + ", id:" + id); Properties props = new Properties(); props.put("zookeeper.connect", zk); props.put("group.id", id); props.put("zookeeper.session.timeout.ms", "400"); props.put("zookeeper.sync.time.ms", "200"); ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props)); KafkaStream<byte[],byte[]> stream = consumer.createMessageStreams(Collections.singletonMap(id, 1)).get(id).get(0); consume(consumer, stream); }
/** * Creates default consumer config. * * @param zooKeeper ZooKeeper address <server:port>. * @param grpId Group Id for kafka subscriber. * @return Kafka consumer configuration. */ private ConsumerConfig createDefaultConsumerConfig(String zooKeeper, String grpId) { A.notNull(zooKeeper, "zookeeper"); A.notNull(grpId, "groupId"); Properties props = new Properties(); props.put("zookeeper.connect", zooKeeper); props.put("group.id", grpId); props.put("zookeeper.session.timeout.ms", "400"); props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "1000"); props.put("auto.offset.reset", "smallest"); return new ConsumerConfig(props); }
public List<KafkaStream<byte[], byte[]>> createKafkaStream( String zookeeperConnectString, String topic, int partitions ) { //create consumer Properties consumerProps = new Properties(); consumerProps.put("zookeeper.connect", zookeeperConnectString); consumerProps.put("group.id", "testClient"); consumerProps.put("zookeeper.session.timeout.ms", "6000"); consumerProps.put("zookeeper.sync.time.ms", "200"); consumerProps.put("auto.commit.interval.ms", "1000"); consumerProps.put("consumer.timeout.ms", "500"); ConsumerConfig consumerConfig = new ConsumerConfig(consumerProps); ConsumerConnector consumer = Consumer.createJavaConsumerConnector(consumerConfig); Map<String, Integer> topicCountMap = new HashMap<>(); topicCountMap.put(topic, partitions); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); return consumerMap.get(topic); }
public static List<KafkaStream<byte[], byte[]>> createKafkaStream(String zookeeperConnectString, String topic, int partitions) { //create consumer Properties consumerProps = new Properties(); consumerProps.put("zookeeper.connect", zookeeperConnectString); consumerProps.put("group.id", "testClient"); consumerProps.put("zookeeper.session.timeout.ms", "6000"); consumerProps.put("zookeeper.sync.time.ms", "200"); consumerProps.put("auto.commit.interval.ms", "1000"); consumerProps.put("consumer.timeout.ms", "500"); ConsumerConfig consumerConfig = new ConsumerConfig(consumerProps); ConsumerConnector consumer = Consumer.createJavaConsumerConnector(consumerConfig); Map<String, Integer> topicCountMap = new HashMap<>(); topicCountMap.put(topic, partitions); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); return consumerMap.get(topic); }
public List<KafkaStream<byte[], byte[]>> consume(String topic) { Properties consumerProperties = TestUtils.createConsumerProperties( ZK_HOST, UUID.randomUUID().toString(), "client", TIMEOUT); Map<String, Integer> topicCountMap = new HashMap<>(); topicCountMap.put(topic, 1); // not sure why is this 1 Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = Consumer.createJavaConsumerConnector (new ConsumerConfig(consumerProperties)).createMessageStreams(topicCountMap); return consumerMap.get(topic); }
@Test public void testKafkaLogAppender() { Properties consumerProps = new Properties(); consumerProps.put("zookeeper.connect", zookeeper); consumerProps.put("group.id", "kafka-log-appender-test"); consumerProps.put("auto.offset.reset", "smallest"); consumerProps.put("schema.registry.url", schemaRegistry); Map<String, Integer> topicMap = new HashMap<String, Integer>(); topicMap.put(topic, 1); ConsumerIterator<String, Object> iterator = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumerProps)) .createMessageStreams(topicMap, new StringDecoder(null), new KafkaAvroDecoder(new VerifiableProperties(consumerProps))) .get(topic).get(0).iterator(); String testMessage = "I am a test message"; logger.info(testMessage); MessageAndMetadata<String, Object> messageAndMetadata = iterator.next(); GenericRecord logLine = (GenericRecord) messageAndMetadata.message(); assertEquals(logLine.get("line").toString(), testMessage); assertEquals(logLine.get("logtypeid"), KafkaLogAppender.InfoLogTypeId); assertNotNull(logLine.get("source")); assertEquals(((Map<CharSequence, Object>) logLine.get("timings")).size(), 1); assertEquals(((Map<CharSequence, Object>) logLine.get("tag")).size(), 2); }
private static ConsumerConfig createConsumerConfig(String zookeeper, String groupId, String offset) { // http://kafka.apache.org/08/configuration.html Properties props = new Properties(); props.put("zookeeper.connect", zookeeper); props.put("zookeeper.session.timeout.ms", "400"); props.put("zookeeper.sync.time.ms", "200"); props.put("group.id", groupId); // Turn off managing the offset in zookeeper and always start at the tail // if we enable this in the future make sure to set 'auto.commit.interval.ms' props.put("auto.commit.enable", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("auto.offset.reset", offset); return new ConsumerConfig(props); }
private void setUpConsumer( Map<String, Integer> topicMap, MessageHandler<?> handler, Properties consumerProps ) { _executors = new HashMap<String, ExecutorService>(); _topicConsumers = new HashMap<String, ConsumerConnector>(); for ( String topic : topicMap.keySet() ) { String normalizedTopic = topic.replace( ".", "_" ); String normalizedConsumerGroupId = getGroupId( consumerProps.getProperty( "group.id" ), normalizedTopic ); consumerProps.setProperty( "group.id", normalizedConsumerGroupId ); LOG.warn( "Consuming topic '" + topic + "' with group.id '" + normalizedConsumerGroupId + "'" ); LOG.warn( consumerProps.toString() ); ConsumerConfig topicConfig = new ConsumerConfig( consumerProps ); _topicConsumers.put( topic, kafka.consumer.Consumer.createJavaConsumerConnector( topicConfig ) ); } _topicMap = topicMap; _handler = handler; }
@Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { logger.info("Opened"); this.collector = collector; logger.info(" topic = " + kafkaSpoutConfig.kafkaConsumerConfiguration.getTopic()); this.spoutName = String.format("%s-%d", context.getThisComponentId(), context.getThisTaskId()); Properties kafkaProperties = KafkaConsumerProperties.createKafkaProperties(kafkaSpoutConfig.kafkaConsumerConfiguration); // Have to use a different consumer.id for each spout so use the storm taskId. Otherwise, // zookeeper complains about a conflicted ephemeral node when there is more than one spout // reading from a topic kafkaProperties.setProperty("consumer.id", String.valueOf(context.getThisTaskId())); ConsumerConfig consumerConfig = new ConsumerConfig(kafkaProperties); this.consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig); }
KafkaConsumerSuite(String zkConnectString, String topic) { _topic = topic; Properties consumeProps = new Properties(); consumeProps.put("zookeeper.connect", zkConnectString); consumeProps.put("group.id", _topic+"-"+System.nanoTime()); consumeProps.put("zookeeper.session.timeout.ms", "10000"); consumeProps.put("zookeeper.sync.time.ms", "10000"); consumeProps.put("auto.commit.interval.ms", "10000"); consumeProps.put("_consumer.timeout.ms", "10000"); _consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumeProps)); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = _consumer.createMessageStreams(ImmutableMap.of(this._topic, 1)); List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(this._topic); _stream = streams.get(0); _iterator = _stream.iterator(); }
public KafkaConsumer(final String topic) { this.topic = topic; Properties props = null; try { //read in default configuration InputStream in = getClass().getResourceAsStream("/consumer.properties"); props = new Properties(); props.load(in); config = new ConsumerConfig(props); //in.close(); } catch (IOException e) { e.printStackTrace(); } }
public KafkaConsumer(final String topic, final String configFile) { this.topic = topic; Properties props = null; try { FileReader reader = new FileReader(configFile); props = new Properties(); props.load(reader); config = new ConsumerConfig(props); //in.close(); } catch (IOException e) { e.printStackTrace(); } }
KafkaConsumerService(MetricRegistry metrics, ConsumerConfig consumerConfig, Map<String, Integer> topics, Decoder<K> keyDecoder, Decoder<V> valueDecoder, MessageHandler<K, V> messageHandler) { MDC.put("group_id", consumerConfig.groupId()); this.metrics = metrics; this.consumerConfig = consumerConfig; this.topics = topics; this.keyDecoder = keyDecoder; this.valueDecoder = valueDecoder; this.messageHandler = messageHandler; }
public static void main(String[] args) { Properties props = new Properties(); props.put("zookeeper.connect","10.15.62.76:2181"); props.put("group.id","mygroup001"); props.put("zookeeper.session.timeout.ms","40000"); props.put("zookeeper.sync.time.ms","200"); props.put("auto.commit.interval.ms","1000"); ConsumerConfig consumerConfig = new ConsumerConfig(props); ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig); Map<String,Integer> topicCountMap = new HashMap<String,Integer>(); topicCountMap.put("my-topic",new Integer(1)); System.out.println("zzzzzzzzzzzzz"); Map<String,List<KafkaStream<byte[],byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); List<KafkaStream<byte[], byte[]>> streams = consumerMap.get("my-topic"); KafkaStream<byte[], byte[]> stream = streams.get(0); ConsumerIterator<byte[], byte[]> it = stream.iterator(); System.out.println("before while..."); while(it.hasNext()){ System.out.println(new String(it.next().message())); } }
/** * Create a Kafka consumer. */ @Override public void open() { // these consumers use ZooKeeper for commit, offset and segment consumption tracking // TODO: consider using SimpleConsumer the same way the Hadoop consumer job does to avoid ZK dependency // TODO: use the task details from TopologyContext in the normal open method ConsumerConfig consumerConfig = new ConsumerConfig(kafkaProperties); consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig); // consumer with just one thread since the real parallelism is handled by Storm already Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, new Integer(1)); Map<String, List<KafkaMessageStream>> consumerMap = consumerConnector.createMessageStreams(topicCountMap); KafkaMessageStream stream = consumerMap.get(topic).get(0); consumerIterator = stream.iterator(); }
@SuppressWarnings("unchecked") @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this._collector = collector; Properties props = new Properties(); props.put("zk.connect", "10.15.62.104:2181"); props.put("groupid", "group1"); ConsumerConfig consumerConfig = new ConsumerConfig(props); ConsumerConnector consumer = kafka.consumer.Consumer .createJavaConsumerConnector(consumerConfig); Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put("newtopic", new Integer(1)); Map<String, List<KafkaMessageStream>> consumerMap = consumer .createMessageStreams(topicCountMap); KafkaMessageStream stream = consumerMap.get("newtopic").get(0); this.it = stream.iterator(); }
@SuppressWarnings("unchecked") @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this._collector = collector; this.logger = Logger.getLogger(BoltCassandra.class.getClass().getName()); // Construct kafka part Properties props = new Properties(); props.put("zk.connect", "10.15.62.75:2181"); props.put("groupid", "sec-group-1"); // ConsumerConfig consumerConfig = new ConsumerConfig(props); ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig); Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put("sec-stream-one", new Integer(1)); // Map<String, List<KafkaMessageStream>> consumerMap = consumer.createMessageStreams(topicCountMap); KafkaMessageStream stream = consumerMap.get("sec-stream-one").get(0); // this.it = stream.iterator(); }
public void KafkaInit(){ Properties props = new Properties(); props.put("zookeeper.connect", "10.15.62.75:2181,10.15.62.76:2181,10.15.62.77:2181"); props.put("group.id", "CadalSec"); ConsumerConfig consumerConfig = new ConsumerConfig(props); ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig); Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put("Read-common", new Integer(1));// 第二个参数是指用几个流,多个流是为了并行处理。 Map<String, List<KafkaStream<byte[],byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); KafkaStream<byte[], byte[]> stream = consumerMap.get("Read-common").get(0);// 这里只有一个流,所以得get(0)就可以了。 this.it = stream.iterator(); }
private void KafkaInit(){ Properties props = new Properties(); props.put("zookeeper.connect", "10.15.62.75:2181,10.15.62.76:2181,10.15.62.77:2181"); props.put("group.id", "RecRecPage"); ConsumerConfig consumerConfig = new ConsumerConfig(props); ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig); Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put("Rec-recPage", new Integer(1));// 第二个参数是指用几个流,多个流是为了并行处理。 Map<String, List<KafkaStream<byte[],byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); KafkaStream<byte[], byte[]> stream = consumerMap.get("Rec-recPage").get(0);// 这里只有一个流,所以得get(0)就可以了。 this.it = stream.iterator(); }
private void KafkaInit(){ Properties props = new Properties(); props.put("zookeeper.connect", "10.15.62.75:2181,10.15.62.76:2181,10.15.62.77:2181"); props.put("group.id", "RecTagRecPage"); ConsumerConfig consumerConfig = new ConsumerConfig(props); ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig); Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put("Rec-recPageTagTag", new Integer(1));// 第二个参数是指用几个流,多个流是为了并行处理。 Map<String, List<KafkaStream<byte[],byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); KafkaStream<byte[], byte[]> stream = consumerMap.get("Rec-recPageTagTag").get(0);// 这里只有一个流,所以得get(0)就可以了。 this.it = stream.iterator(); }
private void KafkaInit(){ Properties props = new Properties(); props.put("zookeeper.connect", "10.15.62.75:2181,10.15.62.76:2181,10.15.62.77:2181"); props.put("group.id", "RecPersonalPage"); ConsumerConfig consumerConfig = new ConsumerConfig(props); ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig); Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put("Rec-personalPage", new Integer(1));// 第二个参数是指用几个流,多个流是为了并行处理。 Map<String, List<KafkaStream<byte[],byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); KafkaStream<byte[], byte[]> stream = consumerMap.get("Rec-personalPage").get(0);// 这里只有一个流,所以得get(0)就可以了。 this.it = stream.iterator(); }
private void KafkaInit(){ Properties props = new Properties(); props.put("zookeeper.connect", "10.15.62.75:2181,10.15.62.76:2181,10.15.62.77:2181"); props.put("group.id", "RecTagBook"); ConsumerConfig consumerConfig = new ConsumerConfig(props); ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig); Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put("Rec-recPageTagBook", new Integer(1));// 第二个参数是指用几个流,多个流是为了并行处理。 Map<String, List<KafkaStream<byte[],byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); KafkaStream<byte[], byte[]> stream = consumerMap.get("Rec-recPageTagBook").get(0);// 这里只有一个流,所以得get(0)就可以了。 this.it = stream.iterator(); }
private void KafkaInit(){ Properties props = new Properties(); props.put("zookeeper.connect", "10.15.62.75:2181,10.15.62.76:2181,10.15.62.77:2181"); props.put("group.id", "RecPersonalPageUser"); ConsumerConfig consumerConfig = new ConsumerConfig(props); ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig); Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put("Rec-personalPageUser", new Integer(1));// 第二个参数是指用几个流,多个流是为了并行处理。 Map<String, List<KafkaStream<byte[],byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); KafkaStream<byte[], byte[]> stream = consumerMap.get("Rec-personalPageUser").get(0);// 这里只有一个流,所以得get(0)就可以了。 this.it = stream.iterator(); }
private void KafkaInit(){ Properties props = new Properties(); props.put("zookeeper.connect", "10.15.62.75:2181,10.15.62.76:2181,10.15.62.77:2181"); props.put("group.id", "RecHomePage"); ConsumerConfig consumerConfig = new ConsumerConfig(props); ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig); Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put("Rec-homePage", new Integer(1));// 第二个参数是指用几个流,多个流是为了并行处理。 Map<String, List<KafkaStream<byte[],byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); KafkaStream<byte[], byte[]> stream = consumerMap.get("Rec-homePage").get(0);// 这里只有一个流,所以得get(0)就可以了。 this.it = stream.iterator(); }
private void KafkaInit(){ Properties props = new Properties(); props.put("zookeeper.connect", "10.15.62.75:2181,10.15.62.76:2181,10.15.62.77:2181"); props.put("group.id", "SearchClick"); ConsumerConfig consumerConfig = new ConsumerConfig(props); ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig); Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put("Search-click", new Integer(1));// 第二个参数是指用几个流,多个流是为了并行处理。 Map<String, List<KafkaStream<byte[],byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); KafkaStream<byte[], byte[]> stream = consumerMap.get("Search-click").get(0);// 这里只有一个流,所以得get(0)就可以了。 this.it = stream.iterator(); }
private void KafkaInit(){ Properties props = new Properties(); props.put("zookeeper.connect", "10.15.62.75:2181,10.15.62.76:2181,10.15.62.77:2181"); props.put("group.id", "SearchTerm"); ConsumerConfig consumerConfig = new ConsumerConfig(props); ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig); Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put("Search-query", new Integer(1));// 第二个参数是指用几个流,多个流是为了并行处理。 Map<String, List<KafkaStream<byte[],byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); KafkaStream<byte[], byte[]> stream = consumerMap.get("Search-query").get(0);// 这里只有一个流,所以得get(0)就可以了。 this.it = stream.iterator(); }
private void KafkaInit(){ Properties props = new Properties(); props.put("zookeeper.connect", "10.15.62.75:2181,10.15.62.76:2181,10.15.62.77:2181"); props.put("group.id", "PersonalReply"); ConsumerConfig consumerConfig = new ConsumerConfig(props); ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig); Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put("Personal-reply", new Integer(1));// 第二个参数是指用几个流,多个流是为了并行处理。 Map<String, List<KafkaStream<byte[],byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); KafkaStream<byte[], byte[]> stream = consumerMap.get("Personal-reply").get(0);// 这里只有一个流,所以得get(0)就可以了。 this.it = stream.iterator(); }