我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用kafka.KafkaConsumer()。
def check_kafka_msg(topic='events', nbr_msg=100): ## Collect Messages from Bus consumer = KafkaConsumer( bootstrap_servers=get_external_ip()+':'+str(KAFKA_BROKER_PORT), auto_offset_reset='earliest') consumer.subscribe([topic]) counter = 0 for message in consumer: counter = counter + 1 if counter == nbr_msg: break return counter
def _python_kafka_consumer(self, topic): """ Populate total message dump into console """ self.topic = topic # KafkaConsumer(auto_offset_reset='earliest', enable_auto_commit=False) KafkaConsumer(auto_offset_reset='latest', enable_auto_commit=False, group_id=None) consumer = KafkaConsumer(topic, bootstrap_servers=self.brokers) for msg in consumer: return msg
def test_kafka_fixture(self): consumer = KafkaConsumer( self.topic, bootstrap_servers='kafka.docker:9092', key_deserializer=lambda item: json.loads(item.decode('utf-8')), value_deserializer=lambda item: json.loads(item.decode('utf-8')), auto_offset_reset='earliest', ) actual_data = [] for i in range(5): message = next(consumer) data = {'key': message.key, 'value': message.value} actual_data.append(data) expected_data = self.spark.read.json( absolute_path(__file__, 'resources', 'test_fixtures', 'kafka.json') ) self.assertDataFrameEqual(expected_data, actual_data)
def get_offset_start(brokers, topic=mjolnir.kafka.TOPIC_RESULT): """Find the current ending offset for all partitions in topic. By calling this prior to producing requests we know all responses come after these offsets. TODO: This naming doesn't feel right... Parameters ---------- brokers : list of str topic : str Returns ------- list of int """ consumer = kafka.KafkaConsumer(bootstrap_servers=brokers, api_version=mjolnir.kafka.BROKER_VERSION) parts = consumer.partitions_for_topic(topic) if parts is None: return None partitions = [kafka.TopicPartition(topic, p) for p in parts] consumer.assign(partitions) return [consumer.position(p) for p in partitions]
def consume(): """Consumes events from partitionlag topic""" LOGGER.setLevel(APPLICATION_LOGGING_LEVEL) LOGGER.info("Starting lagreader") LOGGER.debug('Set Logging Level to ' + APPLICATION_LOGGING_LEVEL) LOGGER.debug('Listening on Kafka at: ' + KAFKA_URI) consumer = KafkaConsumer(group_id='lagConsumerGroup', bootstrap_servers=KAFKA_URI) consumer.subscribe(topics=['partitionlag']) partition_lag_dict = PartitionLagDict() last_writetime = datetime.datetime.now() for msg in consumer: jsonstring = msg.value partitionlag = PartitionLag.from_json(jsonstring) partition_lag_dict.addPartitionLag(partitionlag) LOGGER.debug(str(partitionlag.eventdate) + " Received partitionlag event: " \ + "partition: " + str(partitionlag.partition) \ + " lag: " + str(partitionlag.lag)) LOGGER.debug(str(datetime.datetime.now()) + ' Received partitionlag: ' \ + partition_lag_dict.toString()) last_writetime = _notifylag_conditionally(partition_lag_dict, last_writetime)
def getOffsets(self, topic, partitions, group): """ ??topic?partition?group, ??offsets?? """ try: # ????zookeeper-storage api??offsets?? # ?????group?offsets?????UnknownTopicOrPartitionError?? tp = self.client.send_offset_fetch_request(group, [OffsetRequestPayload(topic, p, -1, 1) for p in partitions]) offsets = {p.partition: p.offset for p in tp} except UnknownTopicOrPartitionError: # ???????kafka-storage api??offsets?? consumer = KafkaConsumer(group_id=group, bootstrap_servers=self.broker, enable_auto_commit=False) tp = [TopicPartition(topic, p) for p in partitions] consumer.assign(tp) offsets = {p.partition: consumer.position(p) for p in tp} return offsets
def consumer_msg(serverlist): consumer = KafkaConsumer('oa_qian', group_id='my-group', bootstrap_servers=[serverlist]) for message in consumer: msg = bytes.decode(message.value) msglist = msg.split(";") username = msglist[0] password = msglist[1] key = msglist[2] type = msglist[3] oa = Attendance(username, password, key) if (type == '1'): result = oa.singin() print result if(result == 'success'): print oa.log_record(username, type) else: result = oa.singout() print result if(result == 'success'): print oa.log_record(username, type)
def __init__(self, config): assert isinstance(config, FetcherConfig), "Wrong configuration" log.debug("New fetcher with master_rpc_addr=%s, rpc_addr=%s" % (config.master_rpc_addr, config.fetcher_rpc_addr)) self._config = config self.master_addr = self._config.master_rpc_addr if not self.master_addr.startswith("http://"): self.master_addr = "http://%s" % self.master_addr self._host, self._port = self._config.fetcher_rpc_addr.split(":") self._port = int(self._port) self._sys_config = self._pull_sys_config_from_master() self.isRunning = False self.rpcServer = self._create_rpc_server() self.producer = KafkaProducer(bootstrap_servers=[self._sys_config.kafka_addr, ]) self.consumer = KafkaConsumer(bootstrap_servers=[self._sys_config.kafka_addr, ], auto_offset_reset='earliest', consumer_timeout_ms=self._sys_config.kafka_consumer_timeout_ms) self.downloader = Downloader(clients=self._sys_config.downloader_clients, timeout=self._sys_config.downloader_timeout) self._task_manager = TaskManager(self._sys_config, self._config) self._reporter_manager = ReporterManager(self._sys_config) self._request_task = {} self.taskDict = {} self._subscribe_lock = threading.Lock() # NOTE: Fetcher?????Master????? self._addr = None
def est1_kafka(self): # kafka queue not testable? fixtureA = {"listing_id":1,"drone": 2, "owner": 2, "description": "please rent myseediestdrone!", "time_posted": "2016-10-24T04:28:48.932Z", "price_per_day": 10.0} fixtureB = {"listing_id":2,"drone": 3, "owner": 3, "description": "please rent myforgeddrone!", "time_posted": "2016-10-24T04:28:48.991Z", "price_per_day": 14.0} producer = KafkaProducer(bootstrap_servers='kafka:9092') producer.send('new-listings-topic', json.dumps(fixtureA).encode('utf-8')) producer.send('new-listings-topic', json.dumps(fixtureB).encode('utf-8')) print("going to sleep for 30 seconds...") time.sleep(30) # allow kafka container to think... print("finished sleeping!") try: consumer = KafkaConsumer('new-listings-topic', group_id='listing-indexer', bootstrap_servers=['kafka:9092']) self.assertIsNotNone(consumer) except: print("consumer not formed") #consumer = KafkaConsumer('new-listings-topic', group_id='listing-indexer', bootstrap_servers=['kafka:9092']) for message in consumer: m = json.loads((message.value).decode('utf-8')) print(m)
def __init__(self, hosts='127.0.0.1:9092', cafile=None, certfile=None, keyfile=None, crlfile=None): self._hosts = hosts self._consumer = kafka.KafkaConsumer( bootstrap_servers=self._hosts, ssl_cafile=cafile, ssl_certfile=certfile, ssl_keyfile=keyfile, ssl_crlfile=crlfile, consumer_timeout_ms=-1, enable_auto_commit=True, auto_offset_reset='latest', )
def run(self): consumer = KafkaConsumer(bootstrap_servers='localhost:9092', auto_offset_reset='earliest') consumer.subscribe(['my-topic']) self.valid = 0 self.invalid = 0 for message in consumer: if len(message.value) == msg_size: self.valid += 1 else: self.invalid += 1 if consumer_stop.is_set(): break consumer.close()
def main(): # consumer = KafkaConsumer("topic3", group_id="group1", # bootstrap_servers='localhost:9092') # for message in consumer: # print(message) # print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(message.timestamp / 1000.0))) threads = [ Consumer() ] for t in threads: t.start() logger.info('Thread started') # logger.info('Sleeping for 100 seconds') #time.sleep(100)
def test_write_kafka_dataframe(self): self.expected_data.write_ext.kafka( 'kafka.docker', self.topic, key_serializer=self.json_encoder, value_serializer=self.json_encoder, ) consumer = KafkaConsumer( self.topic, bootstrap_servers='kafka.docker:9092', key_deserializer=lambda item: json.loads(item.decode('utf-8')), value_deserializer=lambda item: json.loads(item.decode('utf-8')), auto_offset_reset='earliest', ) actual_data = [] for i in range(self.expected_data.count()): message = next(consumer) data = {'key': message.key, 'value': message.value} actual_data.append(data) self.assertDataFrameEqual(self.expected_data, actual_data)
def __init__(self, location, topic, group, partition_id): self._location = location self._group = group self._topic = topic self._consumer = KafkaConsumer( bootstrap_servers=self._location, group_id=self._group, max_partition_fetch_bytes=10485760, consumer_timeout_ms=100, client_id="%s-%s" % (self._topic, str(partition_id) if partition_id is not None else "all"), request_timeout_ms=120 * 1000, ) if partition_id is not None: self._partition_ids = [TopicPartition(self._topic, partition_id)] self._consumer.assign(self._partition_ids) else: self._partition_ids = [TopicPartition(self._topic, pid) for pid in self._consumer.partitions_for_topic(self._topic)] self._consumer.subscribe(topics=[self._topic]) if self._consumer._use_consumer_group(): self._consumer._coordinator.ensure_coordinator_known() self._consumer._coordinator.ensure_active_group() self._consumer._update_fetch_positions(self._partition_ids) self._start_looping_call()
def python_kafka_consumer_performance(topic=topic): print("\n>>> Connect Kafka in {} by kafka-python as consumer". format(bootstrap_servers)) consumer = KafkaConsumer( bootstrap_servers=bootstrap_servers, auto_offset_reset = 'earliest', # start at earliest topic group_id = None # do no offest commit ) msg_consumed_count = 0 consumer_start = time.time() consumer.subscribe([topic]) for msg in consumer: msg_consumed_count += 1 if msg_consumed_count >= msg_count: break consumer_timing = time.time() - consumer_start consumer.close() return consumer_timing
def __init__(self, kafka_host=None, model_cls=None, model_kwargs=None, debug=False): self.model_cls = model_cls self.model_kwargs = model_kwargs or {} kafka_kwargs = {} if kafka_host is not None: kafka_kwargs['bootstrap_servers'] = kafka_host self.consumer = KafkaConsumer( self.input_topic, group_id=self.group_id, max_partition_fetch_bytes=self.max_message_size, consumer_timeout_ms=100, **kafka_kwargs) self.producer = KafkaProducer( max_request_size=self.max_message_size, **kafka_kwargs) self.debug = debug
def client(self): if not self._client: kwargs = self._get_client_config() self._client = kafka.KafkaConsumer(**kwargs) tps = self._get_topic_partitions() self._client.assign(tps) backend = get_offset_backend() for tp in tps: backend.seek(self, tp.topic, tp.partition) self._client.committed(tp) return self._client
def check_kafka_is_running(): # Verify we can connect to Kafka time.sleep(2) consumer = KafkaConsumer(bootstrap_servers=get_external_ip()+':'+str(KAFKA_BROKER_PORT), auto_offset_reset='earliest') mytopic = consumer.topics() return 1
def _get_result_offsets(self): """Get the latest offsets for all partitions in topic""" consumer = kafka.KafkaConsumer(bootstrap_servers=self.brokers, auto_offset_reset='latest', api_version=mjolnir.kafka.BROKER_VERSION) partitions = [kafka.TopicPartition(self.topic_result, p) for p in consumer.partitions_for_topic(self.topic_result)] consumer.assign(partitions) consumer.seek_to_end() offsets = [consumer.position(tp) for tp in partitions] consumer.close() return offsets
def consumer(self): params = {} if self._group_id: params['group_id'] = self._group_id if self._auto_offset_reset: params['auto_offset_reset'] = self._auto_offset_reset params['bootstrap_servers'] = self._bootstrap_servers if not self._consumer: self._consumer = KafkaConsumer(**params) self._consumer.subscribe(self._receive_from) return self._consumer
def __init__(self, broker, topic): self.consumer = KafkaConsumer(topic, bootstrap_servers=broker)
def main(conf): # Enable to topics/feeds topics = [ 'openbmp.parsed.router', 'openbmp.parsed.peer', 'openbmp.parsed.collector', 'openbmp.parsed.bmp_stat', 'openbmp.parsed.unicast_prefix', 'openbmp.parsed.ls_node', 'openbmp.parsed.ls_link', 'openbmp.parsed.ls_prefix', 'openbmp.parsed.l3vpn' ] # Read config file with open(conf, 'r') as f: config_content = yaml.load(f) bootstrap_server = config_content['bootstrap_servers'] try: # connect and bind to topics print ("Connecting to kafka... takes a minute to load offsets and topics, please wait") consumer = kafka.KafkaConsumer( *topics, bootstrap_servers=bootstrap_server, client_id="dev-testing" + str(time.time()), group_id="dev-testing" + str(time.time()), enable_auto_commit=True, auto_commit_interval_ms=1000, auto_offset_reset="largest" ) print ("Now consuming/waiting for messages...") for m in consumer: process_message(m) except kafka.common.KafkaUnavailableError as err: print ("Kafka Error: %s" % str(err)) except KeyboardInterrupt: print ("User stop requested")
def main(): parser = argparse.ArgumentParser(description='Feed Apache Samza metrics into Prometheus.') parser.add_argument('--brokers', metavar='BROKERS', type=str, required=True, help='list of comma-separated kafka brokers: host[:port],host[:port],...') parser.add_argument('--port', metavar='PORT', type=int, nargs='?', default=8080, help='port to serve metrics to Prometheus (default: 8080)') parser.add_argument('--topic', metavar='TOPIC', type=str, nargs='?',default='samza-metrics', help='name of topic to consume (default: "samza-metrics")') parser.add_argument('--from-beginning', action='store_const', const=True, help='consume topic from offset 0') parser.add_argument('--include-jobs-regex', metavar='INCLUDE_JOBS_REGEX', type=str, nargs='?', default='.*', help='only include jobs which match the given regex') parser.add_argument('--ttl', metavar='GAUGES_TTL', type=int, nargs='?', help='time in seconds after which a metric (or label set) is no longer reported when not updated (default: 60s)') args = parser.parse_args() brokers = args.brokers.split(',') consumer = KafkaConsumer(args.topic, group_id=KAFKA_GROUP_ID, bootstrap_servers=brokers) start_http_server(args.port) set_gauges_ttl(args.ttl) if args.from_beginning: consumer.seek_to_beginning() start_ttl_watchdog_thread() try: consume_topic(consumer, args.brokers, re.compile(args.include_jobs_regex)) except KeyboardInterrupt: pass # FIXME : should we close consumer ? print('Shutting down')
def intranet_topic(): consumer = KafkaConsumer('haproxy_logs', group_id='haproxy_logs', bootstrap_servers=bootstrap_servers) for message in consumer: Msg = message.value.strip() try: tt = time.strftime('%Y%m%d', time.localtime()) H_key = 'haproxy2_topic_%s' % tt if Msg: val = Msg.split('{') if len(val) >= 2: Topic = val[1].split('}')[0] Rtime = val[0].split()[8] Rtime = int(Rtime.split('/')[4]) if ':' in Topic: Topic = str(Topic.split(':')[0]) if '|' in Topic: Topic = str(Topic.split('|')[0]) if '.baihe.com' in Topic: Key = 'haproxy2_logs_%s_%s' % (tt,Topic) Rt_Key = 'Rtime2_%s_%s' % (tt,Topic) for KEY in (H_key,Key,Rt_Key): rc.expire(KEY, 86400) rc.sadd(H_key,Topic) rc.incr(Key) if Rtime: rc.lpush(Rt_Key, Rtime) except Exception as e: #loging.write() continue sys.exit()
def run_consumer(self): '''core consumer code''' bootstrap_server = self.config.get('consumer', 'kafka_bootstrap') consumer_group = self.config.get('consumer', 'kafka_consumer_group') offset_reset = self.config.get( 'consumer', 'kafka_auto_offset_reset') self.consumer = KafkaConsumer(bootstrap_servers=bootstrap_server,\ consumer_timeout_ms=60000,\ group_id=consumer_group,\ auto_offset_reset=offset_reset) topic_whitelist = self.config.get( 'consumer', 'topic_whitelist') self.logger.info("Topic list is " + topic_whitelist) self.consumer.subscribe(topic_whitelist.split(","), None, self) self.logger.info("Consumer " + self.consumer_id + " starting.... " + str(self.consumer.assignment())) signal.signal(signal.SIGINT, self.exit_gracefully) signal.signal(signal.SIGTERM, self.exit_gracefully) while not self.shutting_down: for message in self.consumer: consumer_message = MessageInfo(message.topic, message.partition, message.key,\ message.value, message.offset) self.process_message(consumer_message) if self.shutting_down: break self.check_for_rotation() for part in self.partitions: self.partitions[part].writer.close() self.logger.info("Graceful shutdown of consumer " + str(self.consumer_id) + " successful")
def __init__(self, topic, host=None, decoder=None): self.log = log.getChild(self.__class__.__name__) if host is not None: additional = dict(bootstrap_servers=[host]) else: additional = dict() self._consumer = KafkaConsumer(topic, consumer_timeout_ms=KAFKA_CONSUMER_TIMEOUT_MS, **additional) self._stream_queue = make_stream_queue(_as_closable_stream(self._consumer)) self._all = [] self._decoder = decoder
def poll(topic, offset=0, hostname=None, port_num=None, max_timeout=100): hostname, port_num = insure_host_port(hostname, port_num) server = hostname+':'+str(port_num) topic_partition = TopicPartition(topic, partition) consumer = KafkaConsumer(bootstrap_servers=server, group_id=None) consumer.assign([topic_partition]) consumer.seek(topic_partition, offset) msgs = consumer.poll(max_timeout).values() consumer.close() if len(msgs) > 0: return msgs[0] else: return {}
def __init__(self, kafka_URI,topic_str): consumer = KafkaConsumer(bootstrap_servers=kafka_URI, auto_offset_reset='earliest', enable_auto_commit=False) consumer.subscribe([topic_str]) self.consumer = consumer
def get_user_tweets(username): producer = KafkaProducer(bootstrap_servers='kafka:9092') consumer = KafkaConsumer('twitterCheckpoint', bootstrap_servers=['kafka:9092']) msgSent = producer.send('twitterUser', username.encode('utf-8')) # wait for msg to be send back by producer msgReceived = None while not msgReceived: for msg in consumer: msgReceived = msg.value.decode('utf-8') if msgReceived==username: return
def get_page_posts(page): producer = KafkaProducer(bootstrap_servers='kafka:9092') consumer = KafkaConsumer('fbCheckpoint', bootstrap_servers=['kafka:9092']) msgSent = producer.send('fbPage', page.encode('utf-8')) # wait for msg to be send back by producer msgReceived = None while not msgReceived: for msg in consumer: msgReceived = msg.value.decode('utf-8') if msgReceived==page: return
def get_subreddit_posts(subr): producer = KafkaProducer(bootstrap_servers='kafka:9092') consumer = KafkaConsumer('redditCheckpoint', bootstrap_servers=['kafka:9092']) msgSent = producer.send('subreddit', subr.encode('utf-8')) # wait for msg to be send back by producer msgReceived = None while not msgReceived: for msg in consumer: msgReceived = msg.value.decode('utf-8') if msgReceived==subr: return
def main(): # Enable to topics/feeds topics = [ 'openbmp.parsed.unicast_prefix' # 'openbmp.parsed.router', 'openbmp.parsed.peer', 'openbmp.parsed.collector', # 'openbmp.parsed.bmp_stat', 'openbmp.parsed.unicast_prefix', 'openbmp.parsed.ls_node', # 'openbmp.parsed.ls_link', 'openbmp.parsed.ls_prefix' ] # Read config file with open('config.yaml', 'r') as f: config_content = yaml.load(f) bootstrap_server = config_content['bootstrap_servers'] try: # connect and bind to topics print ("Connecting to kafka... takes a minute to load offsets and topics, please wait") consumer = kafka.KafkaConsumer( *topics, bootstrap_servers=bootstrap_server, client_id="dev-testing" + str(time.time()), group_id="dev-testing" + str(time.time()), enable_auto_commit=True, auto_commit_interval_ms=1000, auto_offset_reset="largest" ) print ("Now consuming/waiting for messages...") for m in consumer: process_message(m) except kafka.common.KafkaUnavailableError as err: print(("Kafka Error: %s" % str(err))) except KeyboardInterrupt: print ("User stop requested")
def main(): # Enable to topics/feeds topics = [ 'openbmp.parsed.router', 'openbmp.parsed.peer', 'openbmp.parsed.collector', 'openbmp.parsed.bmp_stat', 'openbmp.parsed.unicast_prefix', 'openbmp.parsed.ls_node', 'openbmp.parsed.ls_link', 'openbmp.parsed.ls_prefix', 'openbmp.parsed.l3vpn' ] # Read config file with open('config.yaml', 'r') as f: config_content = yaml.load(f) bootstrap_server = config_content['bootstrap_servers'] try: # connect and bind to topics print("Connecting to kafka... takes a minute to load offsets and topics, please wait") consumer = kafka.KafkaConsumer( *topics, bootstrap_servers=bootstrap_server, client_id="dev-testing" + str(time.time()), group_id="dev-testing" + str(time.time()), enable_auto_commit=True, auto_commit_interval_ms=1000, auto_offset_reset="largest" ) print("Now consuming/waiting for messages...") for m in consumer: process_message(m) except kafka.common.KafkaUnavailableError as err: print("Kafka Error: %s" % str(err)) except KeyboardInterrupt: print("User stop requested")
def __init__(self, mongo_db, topic): """ Fetching data from kafka and insert it to mongodb :param mongo_db: mongodb cursor :type mongo_db: cursor :param topic: name of kafka topic :type topic: :py:class:`str` """ Thread.__init__(self) self.kafka_consumer = KafkaConsumer(bootstrap_servers=['localhost:9092'], auto_offset_reset='earliest') self.mongo_db = mongo_db self.topic = topic
def __init__(self, hosts='127.0.0.1:9092', topic='default', timeout=None, callback=None, job_size=1048576, cafile=None, certfile=None, keyfile=None, crlfile=None, proc_ttl=5000, offset_policy='latest'): self._hosts = hosts self._topic = topic self._timeout = timeout self._callback = callback self._pool = None self._proc_ttl = proc_ttl self._logger = logging.getLogger('kq') self._consumer = kafka.KafkaConsumer( self._topic, group_id=self._topic, bootstrap_servers=self._hosts, max_partition_fetch_bytes=job_size * 2, ssl_cafile=cafile, ssl_certfile=certfile, ssl_keyfile=keyfile, ssl_crlfile=crlfile, consumer_timeout_ms=-1, enable_auto_commit=False, auto_offset_reset=offset_policy, )
def consumer(self): """Return the Kafka consumer object. :return: Kafka consumer object. :rtype: kafka.consumer.KafkaConsumer """ return self._consumer
def test_session_timeout_larger_than_request_timeout_raises(self): with self.assertRaises(KafkaConfigurationError): KafkaConsumer(bootstrap_servers='localhost:9092', session_timeout_ms=60000, request_timeout_ms=40000)
def test_fetch_max_wait_larger_than_request_timeout_raises(self): with self.assertRaises(KafkaConfigurationError): KafkaConsumer(bootstrap_servers='localhost:9092', fetch_max_wait_ms=41000, request_timeout_ms=40000)
def test_subscription_copy(self): consumer = KafkaConsumer('foo', api_version=(0, 10)) sub = consumer.subscription() assert sub is not consumer.subscription() assert sub == set(['foo']) sub.add('fizz') assert consumer.subscription() == set(['foo'])
def start(self): ''' Startup the kafka consumer. ''' log.debug('Creating the consumer using the bootstrap servers: %s and the group ID: %s', self.bootstrap_servers, self.group_id) try: self.consumer = kafka.KafkaConsumer(bootstrap_servers=self.bootstrap_servers, group_id=self.group_id) except kafka.errors.NoBrokersAvailable as err: log.error(err, exc_info=True) raise ListenerException(err) log.debug('Subscribing to the %s topic', self.topic) self.consumer.subscribe(topics=[self.topic])
def run(self): consumer = KafkaConsumer( 'topic2', bootstrap_servers='localhost:9092') #, #auto_offset_reset='earliest') #consumer.subscribe(['topic1']) for message in consumer: print(message) print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(message.timestamp / 1000.0)))
def run(self): consumer = KafkaConsumer(bootstrap_servers=self.kafka_host, group_id=self.group_id, auto_offset_reset='latest') consumer.subscribe(self.sub_topics) for message in consumer: for s in self.subscribers: if s.should_call(message): self.executor.submit(s.call, message)
def __init__(self, CC_obj: object, auto_offset_reset: str="largest"): """ :param CC_obj: :param auto_offset_reset: """ self.configuration = CC_obj.configuration self.hostIP = self.configuration['kafkaserver']['host'] self.hostPort = self.configuration['kafkaserver']['port'] self.consumer = KafkaConsumer(bootstrap_servers=str(self.hostIP)+":"+str(self.hostPort), api_version=(0,10), auto_offset_reset=auto_offset_reset)
def __init__(self): self._consumer = KafkaConsumer( bootstrap_servers=kafka_bootstrap_server, group_id=self._consumer_group_id, auto_offset_reset = self._offset_reset, enable_auto_commit=True )
def __init__(self, topics, consumer_group, offset_reset = 'earliest', auto_commit = False): self._consumer_thread = Thread(target=self.do_consume) self._consumer_thread.setDaemon(True) self._auto_commit = auto_commit self._consumer = KafkaConsumer( bootstrap_servers=kafka_bootstrap_server, group_id=consumer_group, auto_offset_reset = offset_reset, enable_auto_commit= auto_commit ) self._consumer.subscribe(topics)
def getConnection(self): self.connection=KafkaConsumer(bootstrap_servers=[self.broker+":"+self.port]) # Kafka consumer connection
def _acquire(self, partitions): if not self.consumer: self.consumer = KafkaConsumer(partitions, **self.config) else: self.consumer.set_topic_partitions(partitions) if self.post_rebalance_callback: self.post_rebalance_callback(partitions) # set_topic_partitions causes a metadata request, which may fail on the # first try.