我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用kafka.KafkaProducer()。
def simulate(): """simulate temperature events for machines""" LOGGER.setLevel(APPLICATION_LOGGING_LEVEL) LOGGER.info("Starting producer") LOGGER.debug('Set Logging Level to ' + APPLICATION_LOGGING_LEVEL) LOGGER.debug('Writing to Kafka listening at: ' + KAFKA_URI) producer = KafkaProducer(bootstrap_servers=KAFKA_URI) last_temperatures = {} while True: for i in range(PUBLISH_NUMBER_OF_SENSORS): sensor = 'sensor' + str(i) temperature = _get_temperature(sensor, last_temperatures) message = MachineTemperature(sensor, temperature, datetime.datetime.utcnow()).to_json() producer.send(SENSOR_TEMPERATURE_TOPIC, str.encode(message), key=sensor.encode()) LOGGER.info(str(PUBLISH_NUMBER_OF_SENSORS) + " messages published") time.sleep(PUBLISH_DELAY_IN_SECONDS)
def __init__(self, brokers, n_workers=5, topic_work=mjolnir.kafka.TOPIC_REQUEST, topic_result=mjolnir.kafka.TOPIC_RESULT, topic_complete=mjolnir.kafka.TOPIC_COMPLETE, max_request_size=4*1024*1024): self.brokers = brokers self.n_workers = n_workers self.topic_work = topic_work self.topic_result = topic_result self.topic_complete = topic_complete # Standard producer for query results self.producer = kafka.KafkaProducer(bootstrap_servers=brokers, max_request_size=max_request_size, compression_type='gzip', api_version=mjolnir.kafka.BROKER_VERSION) # More reliable producer for reflecting end run sigils. As this # is only used for sigils and not large messages like es responses # compression is unnecessary here. self.ack_all_producer = kafka.KafkaProducer(bootstrap_servers=brokers, acks='all', api_version=mjolnir.kafka.BROKER_VERSION) # TODO: 10 items? No clue how many is appropriate...10 seems reasonable # enough. We want enough to keep the workers busy, but not so many # that the commited offsets are siginficantly ahead of the work # actually being performed. self.work_queue = Queue.Queue(10)
def kafka(TOPIC=None): # Lazy init of the Kafka producer # global PRODUCER if PRODUCER is None: PRODUCER = KafkaProducer( bootstrap_servers=KAFKA_BOOSTRAP_SERVERS, sasl_mechanism=KAFKA_SASL_MECHANISM, sasl_plain_username=KAFKA_USER, sasl_plain_password=KAFKA_PASSWORD) try: future = PRODUCER.send(TOPIC, request.get_data()) future.get(timeout=60) return "OK", 200, None except KafkaTimeoutError: return "Internal Server Error", 500, None
def __init__(self, config): assert isinstance(config, FetcherConfig), "Wrong configuration" log.debug("New fetcher with master_rpc_addr=%s, rpc_addr=%s" % (config.master_rpc_addr, config.fetcher_rpc_addr)) self._config = config self.master_addr = self._config.master_rpc_addr if not self.master_addr.startswith("http://"): self.master_addr = "http://%s" % self.master_addr self._host, self._port = self._config.fetcher_rpc_addr.split(":") self._port = int(self._port) self._sys_config = self._pull_sys_config_from_master() self.isRunning = False self.rpcServer = self._create_rpc_server() self.producer = KafkaProducer(bootstrap_servers=[self._sys_config.kafka_addr, ]) self.consumer = KafkaConsumer(bootstrap_servers=[self._sys_config.kafka_addr, ], auto_offset_reset='earliest', consumer_timeout_ms=self._sys_config.kafka_consumer_timeout_ms) self.downloader = Downloader(clients=self._sys_config.downloader_clients, timeout=self._sys_config.downloader_timeout) self._task_manager = TaskManager(self._sys_config, self._config) self._reporter_manager = ReporterManager(self._sys_config) self._request_task = {} self.taskDict = {} self._subscribe_lock = threading.Lock() # NOTE: Fetcher?????Master????? self._addr = None
def est1_kafka(self): # kafka queue not testable? fixtureA = {"listing_id":1,"drone": 2, "owner": 2, "description": "please rent myseediestdrone!", "time_posted": "2016-10-24T04:28:48.932Z", "price_per_day": 10.0} fixtureB = {"listing_id":2,"drone": 3, "owner": 3, "description": "please rent myforgeddrone!", "time_posted": "2016-10-24T04:28:48.991Z", "price_per_day": 14.0} producer = KafkaProducer(bootstrap_servers='kafka:9092') producer.send('new-listings-topic', json.dumps(fixtureA).encode('utf-8')) producer.send('new-listings-topic', json.dumps(fixtureB).encode('utf-8')) print("going to sleep for 30 seconds...") time.sleep(30) # allow kafka container to think... print("finished sleeping!") try: consumer = KafkaConsumer('new-listings-topic', group_id='listing-indexer', bootstrap_servers=['kafka:9092']) self.assertIsNotNone(consumer) except: print("consumer not formed") #consumer = KafkaConsumer('new-listings-topic', group_id='listing-indexer', bootstrap_servers=['kafka:9092']) for message in consumer: m = json.loads((message.value).decode('utf-8')) print(m)
def process_data(self, msg): result = 'ok' _data = msg['filename'] + ': ' + msg['data'] self.log.debug(msg['collectors'] + _data) producer = KafkaProducer(bootstrap_servers=self.kfk_server) future = producer.send(self.topic, _data) # Block for 'synchronous' sends try: record_metadata = future.get(timeout=10) except KafkaError: # Decide what to do if produce request failed... self.log.error(traceback.format_exc()) result = 'Fail' finally: producer.close() # return record_metadata.topic, record_metadata.partition, record_metadata.offset return result,
def main(): parser = argparse.ArgumentParser() parser.add_argument('topic') parser.add_argument('filename') parser.add_argument('--kafka-host') parser.add_argument('--validate', action='store_true') args = parser.parse_args() if args.validate: with open(args.filename, 'rt') as f: data = json.dumps(json.load(f)).encode('utf8') else: with open(args.filename, 'rb') as f: data = f.read() logging.basicConfig(level=logging.ERROR) kafka_kwargs = {} if args.kafka_host: kafka_kwargs['bootstrap_servers'] = args.kafka_host producer = KafkaProducer( max_request_size=104857600, **kafka_kwargs) producer.send(args.topic, data) producer.flush() print('Pushed {} bytes to {}'.format(len(data), args.topic))
def __init__(self, kafka_host=None, model_cls=None, model_kwargs=None, debug=False): self.model_cls = model_cls self.model_kwargs = model_kwargs or {} kafka_kwargs = {} if kafka_host is not None: kafka_kwargs['bootstrap_servers'] = kafka_host self.consumer = KafkaConsumer( self.input_topic, group_id=self.group_id, max_partition_fetch_bytes=self.max_message_size, consumer_timeout_ms=100, **kafka_kwargs) self.producer = KafkaProducer( max_request_size=self.max_message_size, **kafka_kwargs) self.debug = debug
def client(self): if not self._client: kwargs = self._get_client_config() self._client = kafka.KafkaProducer(**kwargs) return self._client
def _make_producer(brokers): return kafka.KafkaProducer(bootstrap_servers=brokers, compression_type='gzip', api_version=mjolnir.kafka.BROKER_VERSION)
def getKafkaProducer(self): try: self._producer = kafka.KafkaProducer(bootstrap_servers=['{}:{}'.format(self._ip_address, self._port)]) except kafka.errors.NoBrokersAvailable as e: LOG.error("BroadViewPublisher: NoBrokersAvailable {}".format(e)) except: LOG.error("Unexpected error: {}".format(sys.exc_info()[0]))
def __init__(self, delay, topic, servers='localhost:29092', data_file="", random_data=False, file_ip="", continuously=False): self._path_to_file = data_file self._delay = delay self.status = ProducerStatus.Created if self._path_to_file: self._file = open(data_file, "r") self._topic = topic self._producer = KafkaProducer(bootstrap_servers=servers) self._continuously = continuously self._random_data = random_data if self._random_data: self._random_sflow = GeneratingRandomSFLOW(file_ip)
def producer(self): if not self._producer: self._producer = KafkaProducer( bootstrap_servers=self._bootstrap_servers, ) return self._producer
def __init__(self, broker, topic): """ Instantiate a producer given broker and topic :param broker: ip and port number of broker, e.g. 127.0.0.1:9092 :param topic: name of the topic :return: None """ self.producer = KafkaProducer(bootstrap_servers=broker, value_serializer=lambda v: ujson.dumps(v).encode('utf-8')) self.topic = topic logger.info("Setup kafka producer at {} with topic {}".format(broker, topic))
def producer_msg(self): try: producer = KafkaProducer(bootstrap_servers=[self.serverlist]) producer.send('oa_qian', (self.msg).encode("utf-8")) producer.flush() producer.close(timeout=60) return "success" except Exception as e: log.error(e) return "error" # if __name__ == '__main__': # test code # mq_s = Mq_s('kafka.sunqb.com:9092', 'sunqingbiao;sun;890897;1') # mq_s.producer_msg()
def produce_to_kafka(schema, args, config): topic = config['kafka']['topic'] producer = KafkaProducer(bootstrap_servers = config['kafka']['brokers']) def f_produce(topic, partition, key, value): producer.send(topic, key = key, value = value, partition = partition) partition_count = 1 + max(producer.partitions_for(topic)) try: bootstrap(f_produce, partition_count, schema, args.database, args.table, config) except KeyboardInterrupt: sys.exit(1) producer.flush() producer.close()
def produce_to_bruce(schema, args, config): topic = config['kafka']['topic'] if args.partition_count: partition_count = args.partition_count else: print 'fetch partition info for topic ' + topic producer = KafkaProducer(bootstrap_servers = config['kafka']['brokers']) partition_count = 1 + max(producer.partitions_for(topic)) producer.close() socket = bruce.open_bruce_socket() # batching socket send buff = [] def flush_buff(): for msg in buff: socket.sendto(msg, '/var/run/bruce/bruce.socket') del buff[:] def f_produce(topic, partition, key, value): if len(buff) < 1000: buff.append(bruce.create_msg(partition, topic, bytes(key), bytes(value))) else: flush_buff() try: bootstrap(f_produce, partition_count, schema, args.database, args.table, config) flush_buff() except KeyboardInterrupt: sys.exit(1) finally: socket.close()
def run(self): self.producer = KafkaProducer(bootstrap_servers='localhost:9092') super(fake_access_producer, self).run()
def run(self): self.producer = KafkaProducer(bootstrap_servers='localhost:9092') super(fake_error_producer, self).run()
def sink_dstream(self, dstream): if self._producer is None: self._producer = kafka.KafkaProducer( bootstrap_servers="{0}:{1}".format(self._host, self._port)) dstream.foreachRDD(self._persist)
def __init__(self): self.producer = KafkaProducer(bootstrap_servers=['localhost:9092']) self.merchants_list = []
def close(self): from kafka import KafkaProducer self.inter.set() producer = KafkaProducer(bootstrap_servers=self.servers) producer.send(self.topic, self.end) producer.flush()
def publish(topics, message, hostname=None, port_num=None): hostname, port_num = insure_host_port(hostname, port_num) server = hostname+':'+str(port_num) publisher = KafkaProducer(bootstrap_servers=server) for topic in topics: publisher.send(topic, message.encode('utf-8'))
def __init__(self, kafka_URI, topic_str): self.producer = KafkaProducer(bootstrap_servers=kafka_URI) self.topic = topic_str
def get_user_tweets(username): producer = KafkaProducer(bootstrap_servers='kafka:9092') consumer = KafkaConsumer('twitterCheckpoint', bootstrap_servers=['kafka:9092']) msgSent = producer.send('twitterUser', username.encode('utf-8')) # wait for msg to be send back by producer msgReceived = None while not msgReceived: for msg in consumer: msgReceived = msg.value.decode('utf-8') if msgReceived==username: return
def get_page_posts(page): producer = KafkaProducer(bootstrap_servers='kafka:9092') consumer = KafkaConsumer('fbCheckpoint', bootstrap_servers=['kafka:9092']) msgSent = producer.send('fbPage', page.encode('utf-8')) # wait for msg to be send back by producer msgReceived = None while not msgReceived: for msg in consumer: msgReceived = msg.value.decode('utf-8') if msgReceived==page: return
def get_subreddit_posts(subr): producer = KafkaProducer(bootstrap_servers='kafka:9092') consumer = KafkaConsumer('redditCheckpoint', bootstrap_servers=['kafka:9092']) msgSent = producer.send('subreddit', subr.encode('utf-8')) # wait for msg to be send back by producer msgReceived = None while not msgReceived: for msg in consumer: msgReceived = msg.value.decode('utf-8') if msgReceived==subr: return
def get_user_tweets(username): # Connect to Kafka producer = KafkaProducer(bootstrap_servers='kafka:9092') # Twitter API api = authenticate_api() tweets = 0 need_update = True try: for page in Cursor(api.user_timeline, screen_name=username, count=200).pages(16): for status in page: status = status._json msg = producer.send('data', json.dumps(format_tweet(status)).encode('utf-8')) tweets += 1 print(tweets) with open('test1.jsonl', 'a') as f: f.write(json.dumps(format_tweet(status))+'\n') # Flush kafka producer producer.flush() # Follow Twitter's Rate limit sleep(2) except Exception as e: print(e) pass # Flush kafka producer producer.flush() return username
def get_reddit_submissions(subreddit): # Connect to Kafka producer = KafkaProducer(bootstrap_servers='kafka:9092') # Reddit API reddit = authenticate_api() submissions = 0 try: for submission in reddit.subreddit(subreddit).new(): sub = format_submission(submission) if submissions > 1000: break msg = producer.send('data', json.dumps(sub).encode('utf-8')) submissions += 1 print(submissions) with open('test.jsonl', 'a') as f: f.write(json.dumps(sub)+'\n') # Flush kafka producer producer.flush() except Exception as e: with open('Errors.txt', 'a') as f: f.write(str(type(e))+'\n') f.write(str(e)+'\n') # Flush kafka producer producer.flush() return subreddit
def __init__(self, sensor_node_id, kafka_topic='wifi'): """ Wifi Sniffer class to structure/store data from wifi devices. Data is collected using Scapy. :param sensor_node_id: node id :type sensor_node_id: :py:class:`str` :param kafka_topic: name of kafka topic :type kafka_topic: :py:class:`str` """ Thread.__init__(self) self.sensor_node_id = sensor_node_id self.kafka_topic = kafka_topic self.kafka_server = '{}:{}'.format(KAFKA_SERVER, KAFKA_PORT) self.kafka_producer = KafkaProducer(bootstrap_servers=['{}:{}'.format(KAFKA_SERVER, KAFKA_PORT)])
def __init__(self, sensor_node_id, kafka_topic='bluetooth'): """ Bluetooth Sniffer class to structure/store data from bluetooth devices :param sensor_node_id: node id :type sensor_node_id: :py:class:`str` :param kafka_topic: name of kafka topic :type kafka_topic: :py:class:`str` """ Thread.__init__(self) self.sensor_node_id = sensor_node_id self.kafka_topic = kafka_topic self.kafka_server = '{}:{}'.format(KAFKA_SERVER, KAFKA_PORT) self.kafka_producer = KafkaProducer(bootstrap_servers=['{}:{}'.format(KAFKA_SERVER, KAFKA_PORT)])
def __init__(self, sensor_node_id, kafka_topic='environment'): """ Environment Sniffer class to structure/store data from envinonmental devices. Data is received from Arduino. :param sensor_node_id: node id :type sensor_node_id: :py:class:`str` :param kafka_topic: name of kafka topic :type kafka_topic: :py:class:`str` """ Thread.__init__(self) self.sensor_node_id = sensor_node_id self.kafka_topic = kafka_topic self.kafka_server = '{}:{}'.format(KAFKA_SERVER, KAFKA_PORT) self.kafka_producer = KafkaProducer(bootstrap_servers=['{}:{}'.format(KAFKA_SERVER, KAFKA_PORT)])
def __init__(self, hosts='127.0.0.1:9092', topic='default', timeout=None, compression=None, acks=1, retries=0, job_size=1048576, cafile=None, certfile=None, keyfile=None, crlfile=None): self._hosts = hosts self._topic = topic self._timeout = timeout self._logger = logging.getLogger('kq') self._producer = kafka.KafkaProducer( bootstrap_servers=self._hosts, compression_type=compression, acks=acks, retries=retries, max_request_size=job_size, buffer_memory=max(job_size, 33554432), ssl_cafile=cafile, ssl_certfile=certfile, ssl_keyfile=keyfile, ssl_crlfile=crlfile )
def producer(self): """Return the Kafka producer object. :return: Kafka producer object. :rtype: kafka.producer.KafkaProducer """ return self._producer
def test_kafka_producer_gc_cleanup(): threads = threading.active_count() producer = KafkaProducer(api_version='0.9') # set api_version explicitly to avoid auto-detection assert threading.active_count() == threads + 1 del(producer) gc.collect() assert threading.active_count() == threads
def run(self): producer = KafkaProducer(bootstrap_servers='localhost:9092') self.sent = 0 while not producer_stop.is_set(): producer.send('my-topic', self.big_msg) self.sent += 1 producer.flush()
def run(self): producer = KafkaProducer(bootstrap_servers='localhost:9092') while True: producer.send('my-topic', b"test") producer.send('my-topic', b"\xc2Hola, mundo!") time.sleep(1)
def start(self): try: self.producer = kafka.KafkaProducer(bootstrap_servers=self.bootstrap_servers) except kafka.errors.NoBrokersAvailable as err: log.error(err, exc_info=True) raise NapalmLogsException(err)
def run(self): producer = KafkaProducer(value_serializer=lambda m: json.dumps(m).encode('ascii'), bootstrap_servers='localhost:9092') producer.send('topic3', {'Producer1': 'value1'}) print('Sent Prod1 Message') time.sleep(3) self.run()
def run(self): producer = KafkaProducer(value_serializer=lambda m: json.dumps(m).encode('ascii'), bootstrap_servers='localhost:9092') producer.send('topic3', {'Producer2': 'value2'}) print('Sent Prod2 Message') time.sleep(2) self.run()
def run(self): producer = KafkaProducer(bootstrap_servers='localhost:9092') producer.send('topic2', b"Topic2 Test2 Bytes String") producer.send('topic2', b"Msg 2!!") print('Sent Messages') time.sleep(1)
def __init__(self, config): kafka_host = config.get('kafka', 'bootstrap_servers') # producer self.pub_topic = config.get('kafka', 'pub_topic') self.producer = KafkaProducer(bootstrap_servers = kafka_host)
def _publish_data(self, data): producer = KafkaProducer(bootstrap_servers='kafka.docker', key_serializer=self.key_serializer, value_serializer=self.value_serializer) for item in data: producer.send(self.topic, key=item['key'], value=item['value']) producer.flush() producer.close()
def write_data(self, df, host, topic, port): producer = KafkaProducer( bootstrap_servers=['{}:{}'.format(host, port)], key_serializer=pickle.dumps, value_serializer=pickle.dumps, ) rows = df.collect() for row in rows: producer.send(topic, key=row.key, value=row.value) producer.flush() return len(rows)
def _create(self): self._producer = KafkaProducer(bootstrap_servers=self._location, retries=5, compression_type=self._compression)
def __init__(self, location, topic_done, partitioner, compression): self._location = location self._topic_done = topic_done self._partitioner = partitioner self._compression = compression self._producer = KafkaProducer(bootstrap_servers=self._location, partitioner=partitioner, retries=5, compression_type=self._compression)
def __init__(self, output, cluster, topic, diskPath=None, avroSchema=None, targetBrokers=None, targetTopic=None, compressType="snappy"): """ ??????? :?? cluster: ??Kafka???? :?? topic: ??Kafka Topic?? :?? output: ???? disk/kafka :disk: :?? diskPath: ??????, ??: None :?? avroSchema: ??AVRO????????AVRO Schema???????, ??: None :kafka: :?? targetBrokers: ????Kafka Broker????, ??: None :?? targetTopic: ????Kafka Topic??, ??: None :?? compressType: ????Kafka Topic???????, ??: snappy ????("lz4", "snappy", "gzip") """ self.output = output if output == "disk": if not diskPath or not os.path.isdir(diskPath): raise ValueError("Invalid disk path.") filename = "%s_%s.data" % (cluster, topic) filepath = os.path.join(diskPath, filename) if os.path.exists(filepath): raise IOError("File already exists.") self.handler = open(filepath, "ab+") if avroSchema: self.avroSchema = eval(avroSchema) else: self.avroSchema = None elif output == "kafka": self.handler = KafkaProducer(bootstrap_servers=targetBrokers.split(","), compression_type=compressType) self.targetTopic = targetTopic else: raise ValueError("Invalid output parameter.")