我们从Python开源项目中,提取了以下48个代码示例,用于说明如何使用logging.LogRecord()。
def format(self, record): """Apply little arrow and colors to the record. Arrow and colors are only applied to sphinxcontrib.versioning log statements. :param logging.LogRecord record: The log record object to log. """ formatted = super(ColorFormatter, self).format(record) if self.verbose or not record.name.startswith(self.SPECIAL_SCOPE): return formatted # Arrow. formatted = '=> ' + formatted # Colors. if not self.colors: return formatted if record.levelno >= logging.ERROR: formatted = str(colorclass.Color.red(formatted)) elif record.levelno >= logging.WARNING: formatted = str(colorclass.Color.yellow(formatted)) else: formatted = str(colorclass.Color.cyan(formatted)) return formatted
def makeRecord(self, name, level, fn, lno, msg, args, exc_info, func=None, extra=None, sinfo=None): """ Create a LogRecord. :param name: The name of the logger that produced the log record. :param level: The logging level (severity) associated with the logging record. :param fn: The name of the file (if known) where the log entry was created. :param lno: The line number (if known) in the file where the log entry was created. :param msg: The log message (or message template). :param args: Ordinal message format arguments (if any). :param exc_info: Exception information to be included in the log entry. :param func: The function (if known) where the log entry was created. :param extra: Extra information (if any) to add to the log record. :param sinfo: Stack trace information (if known) for the log entry. """ # Do we have named format arguments? if extra and 'log_props' in extra: return StructuredLogRecord(name, level, fn, lno, msg, args, exc_info, func, sinfo, extra['log_props']) return super().makeRecord(name, level, fn, lno, msg, args, exc_info, func, extra, sinfo)
def makeRecord(self, name, level, fn, lno, msg, args, exc_info, func=None, extra=None, sinfo=None): """ Create a `LogRecord`. :param name: The name of the logger that produced the log record. :param level: The logging level (severity) associated with the logging record. :param fn: The name of the file (if known) where the log entry was created. :param lno: The line number (if known) in the file where the log entry was created. :param msg: The log message (or message template). :param args: Ordinal message format arguments (if any). :param exc_info: Exception information to be included in the log entry. :param func: The function (if known) where the log entry was created. :param extra: Extra information (if any) to add to the log record. :param sinfo: Stack trace information (if known) for the log entry. """ # Do we have named format arguments? if extra and 'log_props' in extra: return StructuredLogRecord(name, level, fn, lno, msg, args, exc_info, func, sinfo, extra['log_props']) return super().makeRecord(name, level, fn, lno, msg, args, exc_info, func, extra, sinfo)
def _get_local_timestamp(record): """ Get the record's UTC timestamp as an ISO-formatted date / time string. :param record: The LogRecord. :type record: StructuredLogRecord :return: The ISO-formatted date / time string. :rtype: str """ timestamp = datetime.fromtimestamp( timestamp=record.created, tz=tzlocal() ) return timestamp.isoformat(sep=' ')
def __is_kms_encrypt_request(self, record): # pylint: disable=no-self-use # type: (logging.LogRecord) -> bool """Determine if a record contains a kms:Encrypt request. :param record: Logging record to filter :type record: logging.LogRecord :rtype: bool """ try: return all(( record.name == 'botocore.endpoint', record.msg.startswith('Making request'), cast(tuple, record.args)[-1]['headers']['X-Amz-Target'] == 'TrentService.Encrypt' )) except Exception: # pylint: disable=broad-except return False
def __is_kms_response_with_plaintext(self, record): # pylint: disable=no-self-use # type: (logging.LogRecord) -> bool """Determine if a record contains a KMS response with plaintext. :param record: Logging record to filter :type record: logging.LogRecord :rtype: bool """ try: return all(( record.name == 'botocore.parsers', record.msg.startswith('Response body:'), b'KeyId' in cast(tuple, record.args)[0], b'Plaintext' in cast(tuple, record.args)[0] )) except Exception: # pylint: disable=broad-except return False
def __init__(self, name, level, fn, lno, msg, args, exc_info, func, extra): logging.LogRecord.__init__(self, name, level, fn, lno, msg, args, exc_info, func) log_details = get_log_details() log_details.update(extra or {}) for k in log_details.iterkeys(): setattr(self, k, log_details[k]) logger_fields = "levelname", "levelno", "process", "thread", "name", \ "filename", "module", "funcName", "lineno" for f in logger_fields: log_details["logger"][f] = getattr(self, f, None) try: correlation_id = request.correlation_id except Exception: correlation_id = None log_details["logger"]["correlation_id"] = correlation_id log_details["logger"]["created"] = datetime.datetime.utcnow().isoformat() + "Z" for k in log_details.iterkeys(): setattr(self, k, log_details[k])
def test_custom_handler(self, mocker): handler = DummyHandler() mock = mocker.MagicMock() handler.emit = mock logger = make_logger() logger.handlers = [handler] disable(NOTSET) logger.debug('test') assert mock.call_count == 1 emit_call = mock.mock_calls[0] name, args, kwargs = emit_call assert name == '' log_record = args[0] assert isinstance(log_record, LogRecord) assert log_record.msg == 'test' assert log_record.levelname == 'DEBUG' del logger
def format(self, record): """Formats the string representation of record. :param logging.LogRecord record: Record to be formatted :returns: Formatted, string representation of record :rtype: str """ out = (logging.StreamHandler.format(self, record) if sys.version_info < (2, 7) else super(ColoredStreamHandler, self).format(record)) if self.colored and record.levelno >= self.red_level: return ''.join((util.ANSI_SGR_RED, out, util.ANSI_SGR_RESET)) else: return out
def test_pretty_xml_handler(self): # Test that a normal, non-XML log record is passed through unchanged stream = io.BytesIO() if PY2 else io.StringIO() stream.isatty = lambda: True h = PrettyXmlHandler(stream=stream) self.assertTrue(h.is_tty()) r = logging.LogRecord(name='baz', level=logging.INFO, pathname='/foo/bar', lineno=1, msg='hello', args=(), exc_info=None) h.emit(r) h.stream.seek(0) self.assertEqual(h.stream.read(), 'hello\n') # Test formatting of an XML record. It should contain newlines and color codes. stream = io.BytesIO() if PY2 else io.StringIO() stream.isatty = lambda: True h = PrettyXmlHandler(stream=stream) r = logging.LogRecord(name='baz', level=logging.DEBUG, pathname='/foo/bar', lineno=1, msg='hello %(xml_foo)s', args=({'xml_foo': b'<?xml version="1.0" encoding="UTF-8"?><foo>bar</foo>'},), exc_info=None) h.emit(r) h.stream.seek(0) self.assertEqual( h.stream.read(), "hello \x1b[36m<?xml version='1.0' encoding='utf-8'?>\x1b[39;49;00m\n\x1b[34;01m<foo\x1b[39;49;00m\x1b[34;01m>\x1b[39;49;00mbar\x1b[34;01m</foo>\x1b[39;49;00m\n\n" )
def test_record_none_exc_info(django_elasticapm_client): # sys.exc_info can return (None, None, None) if no exception is being # handled anywhere on the stack. See: # http://docs.python.org/library/sys.html#sys.exc_info record = logging.LogRecord( 'foo', logging.INFO, pathname=None, lineno=None, msg='test', args=(), exc_info=(None, None, None), ) handler = LoggingHandler() handler.emit(record) assert len(django_elasticapm_client.events) == 1 event = django_elasticapm_client.events.pop(0)['errors'][0] assert event['log']['param_message'] == 'test' assert event['log']['logger_name'] == 'foo' assert event['log']['level'] == 'info' assert 'exception' not in event
def format(self, record): """ Formats the record.msg string by using click.style() The record will have attributes set to it when a user logs a message with any kwargs given. This function looks for any attributes that are in STYLES. If record has any sytle attributes, the record will be styled with the given sytle. This makes click logging with a logger very easy. Args: record (logging.LogRecord): record which gets styled with click.style() """ # Sets the default kwargs kwargs = dict() # checks if any click args were passed along in the logger for kwarg_name in self.STYLES: if hasattr(record, kwarg_name): kwargs[kwarg_name] = getattr(record, kwarg_name) # styles the message of the record if a style was given if kwargs: record.msg = click.style(record.msg, **kwargs) return record
def terminate(self): if self.terminated: return self.terminated = True def flushevents(): while True: try: event = self.event_queue.get(block=False) except (Empty, IOError): break if isinstance(event, logging.LogRecord): logger.handle(event) signal.signal(signal.SIGINT, signal.SIG_IGN) self.procserver.stop() while self.procserver.is_alive(): flushevents() self.procserver.join(0.1) self.ui_channel.close() self.event_queue.close() self.event_queue.setexit() # Wrap Queue to provide API which isn't server implementation specific
def test_connector_log_formatter_dict_format(self): formatter = ConnectorLogFormatter() message = {'message': 'fake_message', 'type': 'message_type'} record = LogRecord('fake_name', 'DEBUG', None, None, message, None, None) record.reseller_name = 'fake_reseller_name' # as return result of formatter.format is a string we will check the substring actual_record = formatter.format(record) assert 'fake_reseller_name' in actual_record assert 'MESSAGE_TYPE' in actual_record assert datetime.now().isoformat(' ')[:-7] in actual_record record.msg = { 'text': 'fake_text' } actual_record = formatter.format(record) assert 'fake_text' in actual_record
def setUp(self): class CheckingFilter(logging.Filter): def __init__(self, cls): self.cls = cls def filter(self, record): t = type(record) if t is not self.cls: msg = 'Unexpected LogRecord type %s, expected %s' % (t, self.cls) raise TypeError(msg) return True BaseTest.setUp(self) self.filter = CheckingFilter(DerivedLogRecord) self.root_logger.addFilter(self.filter) self.orig_factory = logging.getLogRecordFactory()
def format(self, record): """ :param logging.LogRecord record: """ super(HtmlFormatter, self).format(record) if record.funcName: record.funcName = escape_html(str(record.funcName)) if record.name: record.name = escape_html(str(record.name)) if record.msg: record.msg = escape_html(record.getMessage()) if self.use_emoji: print(record.name, record.levelno, record.levelname) if record.levelno == logging.DEBUG: print(record.levelno, record.levelname) record.levelname += ' ' + EMOJI.WHITE_CIRCLE elif record.levelno == logging.INFO: print(record.levelno, record.levelname) record.levelname += ' ' + EMOJI.BLUE_CIRCLE else: record.levelname += ' ' + EMOJI.RED_CIRCLE return self.fmt % record.__dict__
def format(self, record): """Formats the string representation of record. :param logging.LogRecord record: Record to be formatted :returns: Formatted, string representation of record :rtype: str """ out = (logging.StreamHandler.format(self, record) if sys.version_info < (2, 7) else super(StreamHandler, self).format(record)) if self.colored and record.levelno >= self.red_level: return ''.join((util.ANSI_SGR_RED, out, util.ANSI_SGR_RESET)) else: return out
def test_json_formatter(self): name = 'name' line = 42 module = 'some_module' func = 'some_function' msg = {'content': 'sample log'} log_record = LogRecord( name, INFO, module, line, msg, None, None, func=func ) formatter = JSONFormatter() log_result = formatter.format(log_record) result = json.loads(log_result) # check some of the fields to ensure json formatted correctly self.assertEqual(name, result['name']) self.assertEqual(line, result['lineNumber']) self.assertEqual(func, result['functionName']) self.assertEqual(module, result['module']) self.assertEqual('INFO', result['level']) self.assertEqual(msg, result['message'])
def _two_masters_error_logged( self, log_records: List[logging.LogRecord], ) -> bool: """ Return whether a particular error is logged as a DEBUG message. This is prone to being broken as it checks for a string in the DC/OS repository. Args: log_records: Messages logged from the logger. Returns: Whether a particular error is logged as a DEBUG message. """ message = 'Must have 1, 3, 5, 7, or 9 masters' encountered_error = False for record in log_records: if record.levelno == logging.DEBUG and message in str(record.msg): encountered_error = True return encountered_error
def format(self, record): """Formatting function @param object record: :logging.LogRecord: """ entry = { 'name': record.name, 'timestamp': self.formatTime(record, datefmt="%Y-%m-%d %H:%M:%S"), 'level': record.levelname } if hasattr(record, 'message'): entry['message'] = record.message else: entry['message'] = super().format(record) # add exception information if available if record.exc_info is not None: entry['exception'] = { 'message': traceback.format_exception( *record.exc_info)[-1][:-1], 'traceback': traceback.format_exception( *record.exc_info)[:-1] } return entry
def logMessages(self, request): """Write a log message from the Swarm FE to the log. Args: request: A log message request. Returns: Void message. """ Level = api_backend.LogMessagesRequest.LogMessage.Level log = logging.getLogger(__name__) for message in request.messages: level = message.level if message.level is not None else Level.info # Create a log record and override the pathname and lineno. These # messages come from the front end, so it's misleading to say that they # come from api_backend_service. record = logging.LogRecord(name=__name__, level=level.number, pathname='', lineno='', msg=message.message, args=None, exc_info=None) log.handle(record) return message_types.VoidMessage()
def format(self, record: logging.LogRecord) -> str: """Return formatted string of log record. Args: record: The log record. Returns: Formatted string of log record. """ try: record.asctime except AttributeError: record.asctime = self.formatTime(record, self.datefmt) try: record.message except AttributeError: record.message = record.msg s = '{} - {:>8} - {:>26} - {}'.format(record.asctime, record.levelname, record.name, record.message) return s
def handle_new_task(self, task_name, record): """ Do everything needed when a task is starting Params: task_name (str): name of the task that is starting record (logging.LogRecord): log record with all the info Returns: None """ record.msg = ColorFormatter.colored('default', START_TASK_MSG) record.task = task_name self.tasks[task_name] = Task(name=task_name, maxlen=self.buffer_size) if self.should_show_by_depth(): self.pretty_emit(record, is_header=True)
def test_label_nonstring(): """Logging things that aren't string is fine""" result = { "records": [ logging.LogRecord("root", "INFO", "", 0, msg, [], None) for msg in ( "Proper message", 12, {"a": "dict"}, list(), 1.0, ) ], "error": None } model_ = model.Terminal() model_.update_with_result(result) for item in model_: assert isinstance(item.data(model.Label), six.text_type), ( "\"%s\" wasn't a string!" % item.data(model.Label))
def emit(self, record): """ Emit a log record. :param record: The LogRecord. """ self.log_queue.put(record, block=False)
def _build_event_data(record): """ Build an event data dictionary from the specified log record for submission to Seq. :param record: The LogRecord. :type record: StructuredLogRecord :return: A dictionary containing event data representing the log record. :rtype: dict """ if record.args: # Standard (unnamed) format arguments (use 0-base index as property name). log_props_shim = get_global_log_properties(record.name) for (arg_index, arg) in enumerate(record.args or []): log_props_shim[str(arg_index)] = arg event_data = { "Timestamp": _get_local_timestamp(record), "Level": logging.getLevelName(record.levelno), "MessageTemplate": record.getMessage(), "Properties": log_props_shim } elif isinstance(record, StructuredLogRecord): # Named format arguments (and, therefore, log event properties). event_data = { "Timestamp": _get_local_timestamp(record), "Level": logging.getLevelName(record.levelno), "MessageTemplate": record.msg, "Properties": record.log_props } else: # No format arguments; interpret message as-is. event_data = { "Timestamp": _get_local_timestamp(record), "Level": logging.getLevelName(record.levelno), "MessageTemplate": record.getMessage(), "Properties": _global_log_props } return event_data
def __redact_encrypt_request(self, record): # type: (logging.LogRecord) -> None """Redact the ``Plaintext`` value from a kms:Encrypt request. :param record: Logging record to filter :type record: logging.LogRecord """ try: parsed_body = json.loads(self.__to_str(cast(tuple, record.args)[-1]['body'])) parsed_body['Plaintext'] = _REDACTED cast(tuple, record.args)[-1]['body'] = json.dumps(parsed_body, sort_keys=True) except Exception: # pylint: disable=broad-except return
def __redact_key_from_response(self, record): # type: (logging.LogRecord) -> None """Redact the ``Plaintext`` value from a KMS response body. :param record: Logging record to filter :type record: logging.LogRecord """ try: parsed_body = json.loads(self.__to_str(cast(tuple, record.args)[0])) parsed_body['Plaintext'] = _REDACTED new_args = (json.dumps(parsed_body, sort_keys=True),) + cast(tuple, record.args)[1:] record.args = new_args except Exception: # pylint: disable=broad-except return
def __redact_record(self, record): # type: (logging.LogRecord) -> logging.LogRecord """Redact any values from a record, as necessary. :param record: Logging record to filter :type record: logging.LogRecord """ _record = copy.deepcopy(record) if self.__is_kms_encrypt_request(_record): self.__redact_encrypt_request(_record) elif self.__is_kms_response_with_plaintext(_record): self.__redact_key_from_response(_record) return _record
def filter(self, record): # type: (logging.LogRecord) -> bool """Determines whether to filter record. :param record: Logging record to filter :type record: logging.LogRecord :rtype: bool """ return record.name not in self.__blacklist
def create_record(name='test_logger', level='DEBUG', msg='foobar'): return LogRecord(name=name, level=LEVEL_MAP[level], msg=msg, lineno=None, args=None, exc_info=None, pathname=None) # LoggerFilter
def assertLogRecords(self, records, matches): self.assertEqual(len(records), len(matches)) for rec, match in zip(records, matches): self.assertIsInstance(rec, logging.LogRecord) for k, v in match.items(): self.assertEqual(getattr(rec, k), v)
def __init__(self, records: typing.List[logging.LogRecord]): self._records = records
def filter_by_level(self, minimum_level=logging.NOTSET): minimum_level = self._sanitize_level(minimum_level) def filter_func(rec: logging.LogRecord): return rec.levelno >= minimum_level return self.__filter(filter_func)
def _filter_by_str(self, text: str or None, rec_attrib): if text is None: return self def filter_func(rec: logging.LogRecord): return text.lower() in getattr(rec, rec_attrib) return self.__filter(filter_func)
def __iter__(self) -> typing.Generator[logging.LogRecord, None, None]: for x in self._records: yield x
def handle_record(self, record: logging.LogRecord): raise NotImplementedError()
def emit(self, record: logging.LogRecord): self._records.append(record) for follower in self._followers: follower.handle_record(record)
def records(self) -> typing.Iterator[logging.LogRecord]: return self.filter_records(**self.filters)
def handle_record(self, record: logging.LogRecord): if self.filter_record(record, **self.filters): I.tab_log_write(self.format(record), str(self.colors[record.levelname]))
def _send(self): content = [] for rec in self.filter_records(): assert isinstance(rec, logging.LogRecord) content.append(self.format(rec)) url = pastebin.create_new_paste('\n'.join(content)) if url: SENTRY.captureMessage('Logfile', extra={'log_url': url}) self.tab_log_write('Log file sent; thank you !') else: self.tab_log_write('Could not send log file')
def shouldFlush(self, record): """Should the buffer be automatically flushed? :param logging.LogRecord record: log record to be considered :returns: False because the buffer should never be auto-flushed :rtype: bool """ return False
def emit(self, record): """Log the specified logging record. :param logging.LogRecord record: Record to be formatted """ self._delete = False # logging handlers use old style classes in Python 2.6 so # super() cannot be used if sys.version_info < (2, 7): # pragma: no cover logging.StreamHandler.emit(self, record) else: super(TempHandler, self).emit(record)
def filter(self, record: logging.LogRecord): if record.exc_info == (None, None, None): return False else: return True
def __init__(self, level=logging.NOTSET): # type: (int) -> None super(MemoryLogHandler, self).__init__(level) self._records = [] # type: List[logging.LogRecord] self.max_level_emitted = logging.NOTSET