我们从Python开源项目中,提取了以下9个代码示例,用于说明如何使用kafka.SimpleProducer()。
def test_producer_sync_fail_on_error(self): error = FailedPayloadsError('failure') with patch.object(SimpleClient, 'load_metadata_for_topics'): with patch.object(SimpleClient, 'ensure_topic_exists'): with patch.object(SimpleClient, 'get_partition_ids_for_topic', return_value=[0, 1]): with patch.object(SimpleClient, '_send_broker_aware_request', return_value = [error]): client = SimpleClient(MagicMock()) producer = SimpleProducer(client, async=False, sync_fail_on_error=False) # This should not raise (response,) = producer.send_messages('foobar', b'test message') self.assertEqual(response, error) producer = SimpleProducer(client, async=False, sync_fail_on_error=True) with self.assertRaises(FailedPayloadsError): producer.send_messages('foobar', b'test message')
def __init__(self, api): self.api = api super(tweepy.StreamListener, self).__init__() client = KafkaClient("localhost:9092") self.producer = SimpleProducer(client, async = True, batch_send_every_n = 1000, batch_send_every_t = 10)
def __init__(self, topic, server, client=None, **kwargs): try: import kafka except ImportError: raise OutputError('Lack of kafka module, try to execute `pip install kafka-python>=1.3.1` install it') client = client or kafka.SimpleClient self._producer = None self._topic = topic try: self._kafka = client(server, **kwargs) except Exception, e: raise OutputError('kafka client init failed: %s' % e) self.producer(kafka.SimpleProducer) super(Kafka, self).__init__()
def __init__(self, name, host='web14', port=51092, **kwargs): QueueBase.QueueBase.__init__(self, name, host, port) self.__queue = [] self.__kafka = KafkaClient('%s:%d' % (host, port)) self.__producer = SimpleProducer(self.__kafka, async=kwargs.get('async', False)) self.__producer.client.ensure_topic_exists(self.name) self.__consumer = SimpleConsumer(self.__kafka, self.name + '_consumer', self.name, auto_commit_every_n=1)
def produce_example_msg(topic, num_messages=1): kafka = KafkaToolClient(KAFKA_URL) producer = SimpleProducer(kafka) for i in range(num_messages): try: producer.send_messages(topic, b'some message') except LeaderNotAvailableError: # Sometimes kafka takes a bit longer to assign a leader to a new # topic time.sleep(10) producer.send_messages(topic, b'some message')
def test_topic_message_types(self): client = MagicMock() def partitions(topic): return [0, 1] client.get_partition_ids_for_topic = partitions producer = SimpleProducer(client, random_start=False) topic = b"test-topic" producer.send_messages(topic, b'hi') assert client.send_produce_request.called
def make_kafka_producer(kafka_znode): kafka_brokers = get_kafka_brokers(kafka_znode) kafka_client = KafkaClient(kafka_brokers) return SimpleProducer( kafka_client, async=False, req_acks=1, random_start=True )
def __init__(self, settings): self.settings = settings self.client = KafkaClient(settings.get("KAFKA_HOSTS")) self.producer = SimpleProducer(self.client) self.producer.send_messages = failedpayloads_wrapper( settings.get("KAFKA_RETRY_TIME", 5))(self.producer.send_messages) super(KafkaHandler, self).__init__()