我们从Python开源项目中,提取了以下24个代码示例,用于说明如何使用kafka.TopicPartition()。
def get(self): topic_partitions = self._client.cluster.partitions_for_topic(self.topic) if not topic_partitions: future = self._client.cluster.request_update() log.info("No partitions available, performing metadata update.") self._client.poll(future=future) return {} partitions = [TopicPartition(self.topic, partition_id) for partition_id in topic_partitions] offsets = self.offsets(partitions, -1) committed = self.fetch_committed_offsets(partitions) lags = {} for tp, offset in six.iteritems(offsets): commit_offset = committed[tp] if tp in committed else 0 numerical = commit_offset if isinstance(commit_offset, int) else commit_offset.offset lag = offset - numerical pid = tp.partition if isinstance(tp, TopicPartition) else tp log.debug("Lag for %s (%s): %s, %s, %s", self.topic, pid, offset, commit_offset, lag) lags[pid] = lag return lags
def get_offset_start(brokers, topic=mjolnir.kafka.TOPIC_RESULT): """Find the current ending offset for all partitions in topic. By calling this prior to producing requests we know all responses come after these offsets. TODO: This naming doesn't feel right... Parameters ---------- brokers : list of str topic : str Returns ------- list of int """ consumer = kafka.KafkaConsumer(bootstrap_servers=brokers, api_version=mjolnir.kafka.BROKER_VERSION) parts = consumer.partitions_for_topic(topic) if parts is None: return None partitions = [kafka.TopicPartition(topic, p) for p in parts] consumer.assign(partitions) return [consumer.position(p) for p in partitions]
def __init__(self, location, topic, group, partition_id): self._location = location self._group = group self._topic = topic self._consumer = KafkaConsumer( bootstrap_servers=self._location, group_id=self._group, max_partition_fetch_bytes=10485760, consumer_timeout_ms=100, client_id="%s-%s" % (self._topic, str(partition_id) if partition_id is not None else "all"), request_timeout_ms=120 * 1000, ) if partition_id is not None: self._partition_ids = [TopicPartition(self._topic, partition_id)] self._consumer.assign(self._partition_ids) else: self._partition_ids = [TopicPartition(self._topic, pid) for pid in self._consumer.partitions_for_topic(self._topic)] self._consumer.subscribe(topics=[self._topic]) if self._consumer._use_consumer_group(): self._consumer._coordinator.ensure_coordinator_known() self._consumer._coordinator.ensure_active_group() self._consumer._update_fetch_positions(self._partition_ids) self._start_looping_call()
def seek(self, consumer, topic, partition): KafkaOffset = apps.get_model(app_label='logpipe', model_name='KafkaOffset') tp = kafka.TopicPartition(topic=topic, partition=partition) try: obj = KafkaOffset.objects.get(topic=topic, partition=partition) logger.debug('Seeking to offset "%s" on topic "%s", partition "%s"' % (obj.offset, topic, partition)) consumer.client.seek(tp, obj.offset) except KafkaOffset.DoesNotExist: logger.debug('Seeking to beginning of topic "%s", partition "%s"' % (topic, partition)) consumer.client.seek_to_beginning(tp)
def _get_topic_partitions(self): p = [] partitions = self.client.partitions_for_topic(self.topic_name) if not partitions: raise MissingTopicError('Could not find topic %s. Does it exist?' % self.topic_name) for partition in partitions: tp = kafka.TopicPartition(self.topic_name, partition=partition) p.append(tp) return p
def assign_to_topic_partition(self, topic_partition=None): """Assign a list of TopicPartitions to this consumer. - ``partitions`` (list of `TopicPartition`): Assignment for this instance. """ if isinstance(topic_partition, TopicPartition): topic_partition = [topic_partition] if not self._is_assigned(topic_partition): self.consumer.assign(topic_partition)
def get_position(self, topic_partition=None): """Return offset of the next record that will be fetched. - ``topic_partition`` (TopicPartition): Partition to check """ if isinstance(topic_partition, TopicPartition): return self.consumer.position(topic_partition) else: raise TypeError("topic_partition must be of type TopicPartition, create it with Create TopicPartition keyword.")
def seek(self, offset, topic_partition=None): """Manually specify the fetch offset for a TopicPartition. - ``offset``: Message offset in partition - ``topic_partition`` (`TopicPartition`): Partition for seek operation """ if isinstance(topic_partition, TopicPartition): self.consumer.seek(topic_partition, offset=offset) else: raise TypeError("topic_partition must be of type TopicPartition, create it with Create TopicPartition keyword.")
def seek_to_beginning(self, topic_partition=None): """Seek to the oldest available offset for partitions. - ``topic_partition``: Optionally provide specific TopicPartitions, otherwise default to all assigned partitions. """ if isinstance(topic_partition, TopicPartition): self.consumer.seek_to_beginning(topic_partition) else: raise TypeError("topic_partition must be of type TopicPartition, create it with Create TopicPartition keyword.")
def get_number_of_messages_in_topicpartition(self, topic_partition=None): """Return number of messages in TopicPartition. - ``topic_partition`` (list of TopicPartition) """ if isinstance(topic_partition, TopicPartition): topic_partition = [topic_partition] number_of_messages = 0 assignment = self.consumer.assignment() self.consumer.unsubscribe() for Partition in topic_partition: if not isinstance(Partition, TopicPartition): raise TypeError("topic_partition must be of type TopicPartition, create it with Create TopicPartition keyword.") self.assign_to_topic_partition(Partition) self.consumer.seek_to_end(Partition) end = self.consumer.position(Partition) self.consumer.seek_to_beginning(Partition) start = self.consumer.position(Partition) number_of_messages += end-start self.consumer.unsubscribe() self.consumer.assign(assignment) return number_of_messages
def commit(self, offsets=None): """Commit offsets to kafka, blocking until success or error. - ``offset`` (dict): `{TopicPartition: OffsetAndMetadata}` dict to commit with the configured group_id. Defaults to currently consumed offsets for all subscribed partitions. """ self.consumer.commit(offsets)
def committed(self, topic_partition): """Returns the last committed offset for the given partition, or None if there was no prior commit. - ``topic_partition`` (`TopicPartition`): The partition to check. """ return self.consumer.committed(topic_partition)
def create_topicpartition(self, topic, partition): """Create TopicPartition object - ``topic``: kafka topic name - ``partition``: topic partition number """ return TopicPartition(topic=topic, partition=partition)
def _get_result_offsets(self): """Get the latest offsets for all partitions in topic""" consumer = kafka.KafkaConsumer(bootstrap_servers=self.brokers, auto_offset_reset='latest', api_version=mjolnir.kafka.BROKER_VERSION) partitions = [kafka.TopicPartition(self.topic_result, p) for p in consumer.partitions_for_topic(self.topic_result)] consumer.assign(partitions) consumer.seek_to_end() offsets = [consumer.position(tp) for tp in partitions] consumer.close() return offsets
def poll(topic, offset=0, hostname=None, port_num=None, max_timeout=100): hostname, port_num = insure_host_port(hostname, port_num) server = hostname+':'+str(port_num) topic_partition = TopicPartition(topic, partition) consumer = KafkaConsumer(bootstrap_servers=server, group_id=None) consumer.assign([topic_partition]) consumer.seek(topic_partition, offset) msgs = consumer.poll(max_timeout).values() consumer.close() if len(msgs) > 0: return msgs[0] else: return {}
def info(self): """Print the offset information for all topics and partitions.""" print('Offsets per Topic:') for topic in self._consumer.topics(): print('\nTopic {}:\n'.format(topic)) partitions = self._consumer.partitions_for_topic(topic) if partitions is None: # pragma: no cover print(' Polling failed (please try again)') continue for partition in self._consumer.partitions_for_topic(topic): topic_partition = kafka.TopicPartition(topic, partition) self._consumer.assign([topic_partition]) offset = self._consumer.position(topic_partition) print(' Partition {:<3}: {}'.format(partition, offset))
def offsets(self, partitions, timestamp): """Fetch a single offset before the given timestamp for the set of partitions. Blocks until offset is obtained, or a non-retriable exception is raised Arguments: partitions (iterable of TopicPartition) The partition that needs fetching offset. timestamp (int): timestamp for fetching offset. -1 for the latest available, -2 for the earliest available. Otherwise timestamp is treated as epoch seconds. Returns: dict: TopicPartition and message offsets """ while True: offsets = {} ok = True for future in self._send_offset_request(partitions, timestamp): self._client.poll(future=future) if future.succeeded(): for tp, offset in future.value: offsets[tp] = offset continue if not future.retriable(): raise future.exception # pylint: disable-msg=raising-bad-type if future.exception.invalid_metadata: refresh_future = self._client.cluster.request_update() self._client.poll(future=refresh_future, sleep=True) ok = False break if ok: return offsets
def _send_offset_request(self, partitions, timestamp): """Fetch a single offset before the given timestamp for the partition. Arguments: partitions iterable of TopicPartition: partitions that needs fetching offset timestamp (int): timestamp for fetching offset Returns: list of Future: resolves to the corresponding offset """ topic = partitions[0].topic nodes_per_partitions = {} for partition in partitions: node_id = self._client.cluster.leader_for_partition(partition) if node_id is None: log.debug("Partition %s is unknown for fetching offset," " wait for metadata refresh", partition) return Future().failure(Errors.StaleMetadata(partition)) elif node_id == -1: log.debug("Leader for partition %s unavailable for fetching offset," " wait for metadata refresh", partition) return Future().failure(Errors.LeaderNotAvailableError(partition)) nodes_per_partitions.setdefault(node_id, []).append(partition) # Client returns a future that only fails on network issues # so create a separate future and attach a callback to update it # based on response error codes futures = [] for node_id, partitions in six.iteritems(nodes_per_partitions): request = OffsetRequest[0]( -1, [(topic, [(partition.partition, timestamp, 1) for partition in partitions])] ) future_request = Future() _f = self._client.send(node_id, request) _f.add_callback(self._handle_offset_response, partitions, future_request) _f.add_errback(lambda e: future_request.failure(e)) futures.append(future_request) return futures
def _handle_offset_response(self, partitions, future, response): """Callback for the response of the list offset call above. Arguments: partition (TopicPartition): The partition that was fetched future (Future): the future to update based on response response (OffsetResponse): response from the server Raises: AssertionError: if response does not match partition """ topic, partition_info = response.topics[0] assert len(response.topics) == 1, ( 'OffsetResponse should only be for a single topic') partition_ids = set([part.partition for part in partitions]) result = [] for pi in partition_info: part, error_code, offsets = pi assert topic == partitions[0].topic and part in partition_ids, ( 'OffsetResponse partition does not match OffsetRequest partition') error_type = Errors.for_code(error_code) if error_type is Errors.NoError: assert len(offsets) == 1, 'Expected OffsetResponse with one offset' log.debug("Fetched offset %s for partition %d", offsets[0], part) result.append((TopicPartition(topic, part), offsets[0])) elif error_type in (Errors.NotLeaderForPartitionError, Errors.UnknownTopicOrPartitionError): log.debug("Attempt to fetch offsets for partition %s failed due" " to obsolete leadership information, retrying.", str(partitions)) future.failure(error_type(partitions)) else: log.warning("Attempt to fetch offsets for partition %s failed due to:" " %s", partitions, error_type) future.failure(error_type(partitions)) future.success(result)
def fetch_committed_offsets(self, partitions): """Fetch the current committed offsets for specified partitions Arguments: partitions (list of TopicPartition): partitions to fetch Returns: dict: {TopicPartition: OffsetAndMetadata} """ if not partitions: return {} while True: self._ensure_coordinator_known() # contact coordinator to fetch committed offsets future = self._send_offset_fetch_request(partitions) self._client.poll(future=future) if future.succeeded(): return future.value if not future.retriable(): raise future.exception # pylint: disable-msg=raising-bad-type time.sleep(self.config['retry_backoff_ms'] / 1000.0)
def fetch_lastest_checkpoint(self)->BinLogMetadata: partition = TopicPartition(self._topic, 0) self._consumer.assign([partition]) self._consumer.seek_to_end() offset = self._consumer.position(partition) if offset == None or offset ==0: return None self._consumer.seek(partition, offset-1) for message in self._consumer: metadata_attributes = json.loads(message.value.decode()) metadata = BinLogMetadata(log_pos=metadata_attributes['pos'], log_file=metadata_attributes['file']) return metadata return None
def seek_to_end(self, topic_partition=None): """Seek to the most recent available offset for partitions. - ``topic_partition``: Optionally provide specific `TopicPartitions`, otherwise default to all assigned partitions. """ if isinstance(topic_partition, TopicPartition): self.consumer.seek_to_end(topic_partition) else: raise TypeError("topic_partition must be of type TopicPartition, create it with Create TopicPartition keyword.")
def get_offset_end(brokers, run_id, num_end_sigils, topic=mjolnir.kafka.TOPIC_COMPLETE): """ Find the offset of the last message of our run The 'end run' message gets reflected, by the client running on relforge, back into TOPIC_COMPLETE into all partitions. This reads those partitions and looks for the ending offset of all partitions based on that reflected message Parameters ---------- brokers : list of str run_id : str Unique identifier for this run num_end_sigils : int The number of unique end run sigils to expect. This should be the number of partitions of the topic requests were produced to. topic : str, optional Topic to look for end run messages in Returns ------- list of ints The offset of the end run message for all partitions """ consumer = kafka.KafkaConsumer(bootstrap_servers=brokers, # The topic we are reading from is very low volume, # containing only reflected end run sigils. To make # sure we don't miss one start at the beginning. auto_offset_reset='earliest', value_deserializer=json.loads, api_version=mjolnir.kafka.BROKER_VERSION) parts = consumer.partitions_for_topic(topic=mjolnir.kafka.TOPIC_COMPLETE) if parts is None: raise RuntimeError("topic %s missing" % topic) partitions = [kafka.TopicPartition(topic, p) for p in consumer.partitions_for_topic(topic)] consumer.assign(partitions) # Tracks the maximum reported offset in the response topic offsets_end = [-1] * num_end_sigils # Tracks the sigils that have been seen for the request topics # Uses a set incase duplicate messages are sent somehow, to ensure # we see a message for all expected partitions seen_sigils = set() for message in consumer: if 'run_id' in message.value and message.value['run_id'] == run_id and 'complete' in message.value: print 'found sigil for run %s and partition %d' % (message.value['run_id'], message.value['partition']) for partition, offset in enumerate(message.value['offsets']): offsets_end[partition] = max(offsets_end[partition], offset) seen_sigils.add(message.value['partition']) # Keep reading until all sigils have been reflected. if len(seen_sigils) >= num_end_sigils: consumer.close() return offsets_end consumer.close() raise RuntimeError("Finished consuming, but %d partitions remain" % (len(partitions) - len(seen_sigils)))
def _send_offset_fetch_request(self, partitions): """Fetch the committed offsets for a set of partitions. This is a non-blocking call. The returned future can be polled to get the actual offsets returned from the broker. Arguments: partitions (list of TopicPartition): the partitions to fetch Returns: Future: resolves to dict of offsets: {TopicPartition: int} """ assert self.config['api_version'] >= (0, 8, 1), 'Unsupported Broker API' assert all(map(lambda k: isinstance(k, TopicPartition), partitions)) if not partitions: return Future().success({}) elif self._coordinator_unknown(): return Future().failure(Errors.GroupCoordinatorNotAvailableError) node_id = self._coordinator_id # Verify node is ready if not self._client.ready(node_id): log.debug("Node %s not ready -- failing offset fetch request", node_id) return Future().failure(Errors.NodeNotReadyError) log.debug("Group %s fetching committed offsets for partitions: %s", self.group_id, partitions) # construct the request topic_partitions = collections.defaultdict(set) for tp in partitions: topic_partitions[tp.topic].add(tp.partition) if self.config['api_version'] >= (0, 8, 2): request = OffsetFetchRequest[1]( self.group_id, list(topic_partitions.items()) ) else: request = OffsetFetchRequest[0]( self.group_id, list(topic_partitions.items()) ) # send the request with a callback future = Future() _f = self._client.send(node_id, request) _f.add_callback(self._handle_offset_fetch_response, future) _f.add_errback(self._failed_request, node_id, request, future) return future