我们从Python开源项目中,提取了以下21个代码示例,用于说明如何使用kafka.SimpleClient()。
def test_send_broker_unaware_request_fail(self, load_metadata, conn): mocked_conns = { ('kafka01', 9092): MagicMock(), ('kafka02', 9092): MagicMock() } for val in mocked_conns.values(): mock_conn(val, success=False) def mock_get_conn(host, port, afi): return mocked_conns[(host, port)] conn.side_effect = mock_get_conn client = SimpleClient(hosts=['kafka01:9092', 'kafka02:9092']) req = KafkaProtocol.encode_metadata_request() with self.assertRaises(KafkaUnavailableError): client._send_broker_unaware_request(payloads=['fake request'], encoder_fn=MagicMock(return_value='fake encoded message'), decoder_fn=lambda x: x) for key, conn in six.iteritems(mocked_conns): conn.send.assert_called_with('fake encoded message')
def test_get_leader_for_unassigned_partitions(self, protocol, conn): mock_conn(conn) brokers = [ BrokerMetadata(0, 'broker_1', 4567, None), BrokerMetadata(1, 'broker_2', 5678, None) ] resp0_brokers = list(map(itemgetter(0, 1, 2), brokers)) topics = [ (NO_LEADER, 'topic_no_partitions', []), (UNKNOWN_TOPIC_OR_PARTITION, 'topic_unknown', []), ] protocol.decode_metadata_response.return_value = MetadataResponse[0](resp0_brokers, topics) client = SimpleClient(hosts=['broker_1:4567']) self.assertDictEqual({}, client.topics_to_brokers) with self.assertRaises(LeaderNotAvailableError): client._get_leader_for_partition('topic_no_partitions', 0) with self.assertRaises(UnknownTopicOrPartitionError): client._get_leader_for_partition('topic_unknown', 0)
def test_send_produce_request_raises_when_topic_unknown(self, protocol, conn): mock_conn(conn) brokers = [ BrokerMetadata(0, 'broker_1', 4567, None), BrokerMetadata(1, 'broker_2', 5678, None) ] resp0_brokers = list(map(itemgetter(0, 1, 2), brokers)) topics = [ (UNKNOWN_TOPIC_OR_PARTITION, 'topic_doesnt_exist', []), ] protocol.decode_metadata_response.return_value = MetadataResponse[0](resp0_brokers, topics) client = SimpleClient(hosts=['broker_1:4567']) requests = [ProduceRequestPayload( "topic_doesnt_exist", 0, [create_message("a"), create_message("b")])] with self.assertRaises(FailedPayloadsError): client.send_produce_request(requests)
def test_producer_sync_fail_on_error(self): error = FailedPayloadsError('failure') with patch.object(SimpleClient, 'load_metadata_for_topics'): with patch.object(SimpleClient, 'ensure_topic_exists'): with patch.object(SimpleClient, 'get_partition_ids_for_topic', return_value=[0, 1]): with patch.object(SimpleClient, '_send_broker_aware_request', return_value = [error]): client = SimpleClient(MagicMock()) producer = SimpleProducer(client, async=False, sync_fail_on_error=False) # This should not raise (response,) = producer.send_messages('foobar', b'test message') self.assertEqual(response, error) producer = SimpleProducer(client, async=False, sync_fail_on_error=True) with self.assertRaises(FailedPayloadsError): producer.send_messages('foobar', b'test message')
def _python_kafka_partitionoffset(self, topic): """ Return offset and partition of the topic """ topic = self.topic client = SimpleClient(self.brokers) partitions = client.topic_partitions[topic] offset_requests = [OffsetRequestPayload(topic, p, -1, 1) for p in partitions.keys()] offsets_responses = client.send_offset_request(offset_requests) for r in offsets_responses: print("partition = %s, offset = %s" % (r.partition, r.offsets[0]))
def _python_kafka_offsetcount(self, topic): """ Count no of offset of the topic """ client = SimpleClient(self.brokers) self.topic = topic partitions = client.topic_partitions[self.topic] offset_requests = [OffsetRequestPayload(topic, p, -1, 1) for p in partitions.keys()] offsets_responses = client.send_offset_request(offset_requests) totaloffset = 0 for r in offsets_responses: totaloffset = totaloffset + r.offsets[0] return totaloffset
def __init__(self, topic, server, client=None, **kwargs): try: import kafka except ImportError: raise OutputError('Lack of kafka module, try to execute `pip install kafka-python>=1.3.1` install it') client = client or kafka.SimpleClient self._producer = None self._topic = topic try: self._kafka = client(server, **kwargs) except Exception, e: raise OutputError('kafka client init failed: %s' % e) self.producer(kafka.SimpleProducer) super(Kafka, self).__init__()
def test_init_with_list(self): with patch.object(SimpleClient, 'load_metadata_for_topics'): client = SimpleClient(hosts=['kafka01:9092', 'kafka02:9092', 'kafka03:9092']) self.assertEqual( sorted([('kafka01', 9092, socket.AF_UNSPEC), ('kafka02', 9092, socket.AF_UNSPEC), ('kafka03', 9092, socket.AF_UNSPEC)]), sorted(client.hosts))
def test_init_with_csv(self): with patch.object(SimpleClient, 'load_metadata_for_topics'): client = SimpleClient(hosts='kafka01:9092,kafka02:9092,kafka03:9092') self.assertEqual( sorted([('kafka01', 9092, socket.AF_UNSPEC), ('kafka02', 9092, socket.AF_UNSPEC), ('kafka03', 9092, socket.AF_UNSPEC)]), sorted(client.hosts))
def test_init_with_unicode_csv(self): with patch.object(SimpleClient, 'load_metadata_for_topics'): client = SimpleClient(hosts=u'kafka01:9092,kafka02:9092,kafka03:9092') self.assertEqual( sorted([('kafka01', 9092, socket.AF_UNSPEC), ('kafka02', 9092, socket.AF_UNSPEC), ('kafka03', 9092, socket.AF_UNSPEC)]), sorted(client.hosts))
def test_send_broker_unaware_request(self): mocked_conns = { ('kafka01', 9092): MagicMock(), ('kafka02', 9092): MagicMock(), ('kafka03', 9092): MagicMock() } # inject BrokerConnection side effects mock_conn(mocked_conns[('kafka01', 9092)], success=False) mock_conn(mocked_conns[('kafka03', 9092)], success=False) future = Future() mocked_conns[('kafka02', 9092)].send.return_value = future mocked_conns[('kafka02', 9092)].recv.side_effect = lambda: future.success('valid response') def mock_get_conn(host, port, afi): return mocked_conns[(host, port)] # patch to avoid making requests before we want it with patch.object(SimpleClient, 'load_metadata_for_topics'): with patch.object(SimpleClient, '_get_conn', side_effect=mock_get_conn): client = SimpleClient(hosts='kafka01:9092,kafka02:9092') resp = client._send_broker_unaware_request(payloads=['fake request'], encoder_fn=MagicMock(), decoder_fn=lambda x: x) self.assertEqual('valid response', resp) mocked_conns[('kafka02', 9092)].recv.assert_called_once_with()
def test_has_metadata_for_topic(self, protocol, conn): mock_conn(conn) brokers = [ BrokerMetadata(0, 'broker_1', 4567, None), BrokerMetadata(1, 'broker_2', 5678, None) ] resp0_brokers = list(map(itemgetter(0, 1, 2), brokers)) topics = [ (NO_LEADER, 'topic_still_creating', []), (UNKNOWN_TOPIC_OR_PARTITION, 'topic_doesnt_exist', []), (NO_ERROR, 'topic_noleaders', [ (NO_LEADER, 0, -1, [], []), (NO_LEADER, 1, -1, [], []), ]), ] protocol.decode_metadata_response.return_value = MetadataResponse[0](resp0_brokers, topics) client = SimpleClient(hosts=['broker_1:4567']) # Topics with no partitions return False self.assertFalse(client.has_metadata_for_topic('topic_still_creating')) self.assertFalse(client.has_metadata_for_topic('topic_doesnt_exist')) # Topic with partition metadata, but no leaders return True self.assertTrue(client.has_metadata_for_topic('topic_noleaders'))
def test_ensure_topic_exists(self, decode_metadata_response, conn): mock_conn(conn) brokers = [ BrokerMetadata(0, 'broker_1', 4567, None), BrokerMetadata(1, 'broker_2', 5678, None) ] resp0_brokers = list(map(itemgetter(0, 1, 2), brokers)) topics = [ (NO_LEADER, 'topic_still_creating', []), (UNKNOWN_TOPIC_OR_PARTITION, 'topic_doesnt_exist', []), (NO_ERROR, 'topic_noleaders', [ (NO_LEADER, 0, -1, [], []), (NO_LEADER, 1, -1, [], []), ]), ] decode_metadata_response.return_value = MetadataResponse[0](resp0_brokers, topics) client = SimpleClient(hosts=['broker_1:4567']) with self.assertRaises(UnknownTopicOrPartitionError): client.ensure_topic_exists('topic_doesnt_exist', timeout=1) with self.assertRaises(KafkaTimeoutError): client.ensure_topic_exists('topic_still_creating', timeout=1) # This should not raise client.ensure_topic_exists('topic_noleaders', timeout=1)
def test_get_leader_for_partitions_reloads_metadata(self, protocol, conn): "Get leader for partitions reload metadata if it is not available" mock_conn(conn) brokers = [ BrokerMetadata(0, 'broker_1', 4567, None), BrokerMetadata(1, 'broker_2', 5678, None) ] resp0_brokers = list(map(itemgetter(0, 1, 2), brokers)) topics = [ (NO_LEADER, 'topic_no_partitions', []) ] protocol.decode_metadata_response.return_value = MetadataResponse[0](resp0_brokers, topics) client = SimpleClient(hosts=['broker_1:4567']) # topic metadata is loaded but empty self.assertDictEqual({}, client.topics_to_brokers) topics = [ (NO_ERROR, 'topic_one_partition', [ (NO_ERROR, 0, 0, [0, 1], [0, 1]) ]) ] protocol.decode_metadata_response.return_value = MetadataResponse[0](resp0_brokers, topics) # calling _get_leader_for_partition (from any broker aware request) # will try loading metadata again for the same topic leader = client._get_leader_for_partition('topic_one_partition', 0) self.assertEqual(brokers[0], leader) self.assertDictEqual({ TopicPartition('topic_one_partition', 0): brokers[0]}, client.topics_to_brokers)
def test_correlation_rollover(self): with patch.object(SimpleClient, 'load_metadata_for_topics'): big_num = 2**31 - 3 client = SimpleClient(hosts=[], correlation_id=big_num) self.assertEqual(big_num + 1, client._next_id()) self.assertEqual(big_num + 2, client._next_id()) self.assertEqual(0, client._next_id())
def setUp(self): super(KafkaIntegrationTestCase, self).setUp() if not os.environ.get('KAFKA_VERSION'): self.skipTest('Integration test requires KAFKA_VERSION') if not self.topic: topic = "%s-%s" % (self.id()[self.id().rindex(".") + 1:], random_string(10)) self.topic = topic if self.create_client: self.client = SimpleClient('%s:%d' % (self.server.host, self.server.port)) self.client.ensure_topic_exists(self.topic) self._messages = {}
def simple_client(kafka_broker): return SimpleClient(get_connect_str(kafka_broker))
def assert_message_count(self, topic, check_count, timeout=10, partitions=None, at_least=False): hosts = ','.join(['%s:%d' % (broker.host, broker.port) for broker in self.brokers]) client = SimpleClient(hosts, timeout=2) consumer = SimpleConsumer(client, None, topic, partitions=partitions, auto_commit=False, iter_timeout=timeout) started_at = time.time() pending = -1 while pending < check_count and (time.time() - started_at < timeout): try: pending = consumer.pending(partitions) except FailedPayloadsError: pass time.sleep(0.5) consumer.stop() client.close() if pending < check_count: self.fail('Too few pending messages: found %d, expected %d' % (pending, check_count)) elif pending > check_count and not at_least: self.fail('Too many pending messages: found %d, expected %d' % (pending, check_count)) return True
def __init__( self, spark, df_schema, key_deserializer, value_deserializer, host, topic, port=9092, ): """Initialize context manager Parameters `key_deserializer` and `value_deserializer` are callables which get bytes as input and should return python structures as output. Args: spark (SparklySession): currently active SparklySession df_schema (pyspark.sql.types.StructType): schema of dataframe to be generated key_deserializer (function): function used to deserialize the key value_deserializer (function): function used to deserialize the value host (basestring): host or ip address of the kafka server to connect to topic (basestring): Kafka topic to monitor port (int): port number of the Kafka server to connect to """ self.spark = spark self.topic = topic self.df_schema = df_schema self.key_deser, self.val_deser = key_deserializer, value_deserializer self.host, self.port = host, port self._df = None self.count = 0 kafka_client = SimpleClient(host) kafka_client.ensure_topic_exists(topic)
def kafka_get_topics_offsets(host, topic, port=9092): """Return available partitions and their offsets for the given topic. Args: host (str): Kafka host. topic (str): Kafka topic. port (int): Kafka port. Returns: [(int, int, int)]: [(partition, start_offset, end_offset)]. """ brokers = ['{}:{}'.format(host, port)] client = SimpleClient(brokers) offsets = [] partitions = client.get_partition_ids_for_topic(topic) offsets_responses_end = client.send_offset_request( [OffsetRequestPayload(topic, partition, -1, 1) for partition in partitions] ) offsets_responses_start = client.send_offset_request( [OffsetRequestPayload(topic, partition, -2, 1) for partition in partitions] ) for start_offset, end_offset in zip(offsets_responses_start, offsets_responses_end): offsets.append((start_offset.partition, start_offset.offsets[0], end_offset.offsets[0])) return offsets
def test_get_leader_exceptions_when_noleader(self, protocol, conn): mock_conn(conn) brokers = [ BrokerMetadata(0, 'broker_1', 4567, None), BrokerMetadata(1, 'broker_2', 5678, None) ] resp0_brokers = list(map(itemgetter(0, 1, 2), brokers)) topics = [ (NO_ERROR, 'topic_noleader', [ (NO_LEADER, 0, -1, [], []), (NO_LEADER, 1, -1, [], []), ]), ] protocol.decode_metadata_response.return_value = MetadataResponse[0](resp0_brokers, topics) client = SimpleClient(hosts=['broker_1:4567']) self.assertDictEqual( { TopicPartition('topic_noleader', 0): None, TopicPartition('topic_noleader', 1): None }, client.topics_to_brokers) # No leader partitions -- raise LeaderNotAvailableError with self.assertRaises(LeaderNotAvailableError): self.assertIsNone(client._get_leader_for_partition('topic_noleader', 0)) with self.assertRaises(LeaderNotAvailableError): self.assertIsNone(client._get_leader_for_partition('topic_noleader', 1)) # Unknown partitions -- raise UnknownTopicOrPartitionError with self.assertRaises(UnknownTopicOrPartitionError): self.assertIsNone(client._get_leader_for_partition('topic_noleader', 2)) topics = [ (NO_ERROR, 'topic_noleader', [ (NO_ERROR, 0, 0, [0, 1], [0, 1]), (NO_ERROR, 1, 1, [1, 0], [1, 0]) ]), ] protocol.decode_metadata_response.return_value = MetadataResponse[0](resp0_brokers, topics) self.assertEqual(brokers[0], client._get_leader_for_partition('topic_noleader', 0)) self.assertEqual(brokers[1], client._get_leader_for_partition('topic_noleader', 1))