Python logging 模块,LogRecord() 实例源码

我们从Python开源项目中,提取了以下48个代码示例,用于说明如何使用logging.LogRecord()

项目:sphinxcontrib-versioning    作者:Robpol86    | 项目源码 | 文件源码
def format(self, record):
        """Apply little arrow and colors to the record.

        Arrow and colors are only applied to sphinxcontrib.versioning log statements.

        :param logging.LogRecord record: The log record object to log.
        """
        formatted = super(ColorFormatter, self).format(record)
        if self.verbose or not record.name.startswith(self.SPECIAL_SCOPE):
            return formatted

        # Arrow.
        formatted = '=> ' + formatted

        # Colors.
        if not self.colors:
            return formatted
        if record.levelno >= logging.ERROR:
            formatted = str(colorclass.Color.red(formatted))
        elif record.levelno >= logging.WARNING:
            formatted = str(colorclass.Color.yellow(formatted))
        else:
            formatted = str(colorclass.Color.cyan(formatted))
        return formatted
项目:seqlog    作者:tintoy    | 项目源码 | 文件源码
def makeRecord(self, name, level, fn, lno, msg, args, exc_info, func=None, extra=None, sinfo=None):
        """
        Create a LogRecord.

        :param name: The name of the logger that produced the log record.
        :param level: The logging level (severity) associated with the logging record.
        :param fn: The name of the file (if known) where the log entry was created.
        :param lno: The line number (if known) in the file where the log entry was created.
        :param msg: The log message (or message template).
        :param args: Ordinal message format arguments (if any).
        :param exc_info: Exception information to be included in the log entry.
        :param func: The function (if known) where the log entry was created.
        :param extra: Extra information (if any) to add to the log record.
        :param sinfo: Stack trace information (if known) for the log entry.
        """

        # Do we have named format arguments?
        if extra and 'log_props' in extra:
            return StructuredLogRecord(name, level, fn, lno, msg, args, exc_info, func, sinfo, extra['log_props'])

        return super().makeRecord(name, level, fn, lno, msg, args, exc_info, func, extra, sinfo)
项目:seqlog    作者:tintoy    | 项目源码 | 文件源码
def makeRecord(self, name, level, fn, lno, msg, args, exc_info, func=None, extra=None, sinfo=None):
        """
        Create a `LogRecord`.

        :param name: The name of the logger that produced the log record.
        :param level: The logging level (severity) associated with the logging record.
        :param fn: The name of the file (if known) where the log entry was created.
        :param lno: The line number (if known) in the file where the log entry was created.
        :param msg: The log message (or message template).
        :param args: Ordinal message format arguments (if any).
        :param exc_info: Exception information to be included in the log entry.
        :param func: The function (if known) where the log entry was created.
        :param extra: Extra information (if any) to add to the log record.
        :param sinfo: Stack trace information (if known) for the log entry.
        """

        # Do we have named format arguments?
        if extra and 'log_props' in extra:
            return StructuredLogRecord(name, level, fn, lno, msg, args, exc_info, func, sinfo, extra['log_props'])

        return super().makeRecord(name, level, fn, lno, msg, args, exc_info, func, extra, sinfo)
项目:seqlog    作者:tintoy    | 项目源码 | 文件源码
def _get_local_timestamp(record):
    """
    Get the record's UTC timestamp as an ISO-formatted date / time string.

    :param record: The LogRecord.
    :type record: StructuredLogRecord
    :return: The ISO-formatted date / time string.
    :rtype: str
    """

    timestamp = datetime.fromtimestamp(
        timestamp=record.created,
        tz=tzlocal()
    )

    return timestamp.isoformat(sep=' ')
项目:aws-encryption-sdk-cli    作者:awslabs    | 项目源码 | 文件源码
def __is_kms_encrypt_request(self, record):  # pylint: disable=no-self-use
        # type: (logging.LogRecord) -> bool
        """Determine if a record contains a kms:Encrypt request.

        :param record: Logging record to filter
        :type record: logging.LogRecord
        :rtype: bool
        """
        try:
            return all((
                record.name == 'botocore.endpoint',
                record.msg.startswith('Making request'),
                cast(tuple, record.args)[-1]['headers']['X-Amz-Target'] == 'TrentService.Encrypt'
            ))
        except Exception:  # pylint: disable=broad-except
            return False
项目:aws-encryption-sdk-cli    作者:awslabs    | 项目源码 | 文件源码
def __is_kms_response_with_plaintext(self, record):  # pylint: disable=no-self-use
        # type: (logging.LogRecord) -> bool
        """Determine if a record contains a KMS response with plaintext.

        :param record: Logging record to filter
        :type record: logging.LogRecord
        :rtype: bool
        """
        try:
            return all((
                record.name == 'botocore.parsers',
                record.msg.startswith('Response body:'),
                b'KeyId' in cast(tuple, record.args)[0],
                b'Plaintext' in cast(tuple, record.args)[0]
            ))
        except Exception:  # pylint: disable=broad-except
            return False
项目:drift    作者:dgnorth    | 项目源码 | 文件源码
def __init__(self, name, level, fn, lno, msg, args, exc_info, func, extra):
        logging.LogRecord.__init__(self, name, level, fn, lno, msg, args, exc_info, func)
        log_details = get_log_details()
        log_details.update(extra or {})
        for k in log_details.iterkeys():
            setattr(self, k, log_details[k])
        logger_fields = "levelname", "levelno", "process", "thread", "name", \
                        "filename", "module", "funcName", "lineno"
        for f in logger_fields:
            log_details["logger"][f] = getattr(self, f, None)
        try:
            correlation_id = request.correlation_id
        except Exception:
            correlation_id = None

        log_details["logger"]["correlation_id"] = correlation_id
        log_details["logger"]["created"] = datetime.datetime.utcnow().isoformat() + "Z"
        for k in log_details.iterkeys():
            setattr(self, k, log_details[k])
项目:EMFT    作者:132nd-etcher    | 项目源码 | 文件源码
def test_custom_handler(self, mocker):
        handler = DummyHandler()
        mock = mocker.MagicMock()
        handler.emit = mock

        logger = make_logger()
        logger.handlers = [handler]

        disable(NOTSET)
        logger.debug('test')

        assert mock.call_count == 1
        emit_call = mock.mock_calls[0]
        name, args, kwargs = emit_call
        assert name == ''
        log_record = args[0]
        assert isinstance(log_record, LogRecord)
        assert log_record.msg == 'test'
        assert log_record.levelname == 'DEBUG'

        del logger
项目:TCP-IP    作者:JackZ0    | 项目源码 | 文件源码
def format(self, record):
        """Formats the string representation of record.

        :param logging.LogRecord record: Record to be formatted

        :returns: Formatted, string representation of record
        :rtype: str

        """
        out = (logging.StreamHandler.format(self, record)
               if sys.version_info < (2, 7)
               else super(ColoredStreamHandler, self).format(record))
        if self.colored and record.levelno >= self.red_level:
            return ''.join((util.ANSI_SGR_RED, out, util.ANSI_SGR_RESET))
        else:
            return out
项目:exchangelib    作者:ecederstrand    | 项目源码 | 文件源码
def test_pretty_xml_handler(self):
        # Test that a normal, non-XML log record is passed through unchanged
        stream = io.BytesIO() if PY2 else io.StringIO()
        stream.isatty = lambda: True
        h = PrettyXmlHandler(stream=stream)
        self.assertTrue(h.is_tty())
        r = logging.LogRecord(name='baz', level=logging.INFO, pathname='/foo/bar', lineno=1, msg='hello', args=(), exc_info=None)
        h.emit(r)
        h.stream.seek(0)
        self.assertEqual(h.stream.read(), 'hello\n')

        # Test formatting of an XML record. It should contain newlines and color codes.
        stream = io.BytesIO() if PY2 else io.StringIO()
        stream.isatty = lambda: True
        h = PrettyXmlHandler(stream=stream)
        r = logging.LogRecord(name='baz', level=logging.DEBUG, pathname='/foo/bar', lineno=1, msg='hello %(xml_foo)s', args=({'xml_foo': b'<?xml version="1.0" encoding="UTF-8"?><foo>bar</foo>'},), exc_info=None)
        h.emit(r)
        h.stream.seek(0)
        self.assertEqual(
            h.stream.read(),
            "hello \x1b[36m<?xml version='1.0' encoding='utf-8'?>\x1b[39;49;00m\n\x1b[34;01m<foo\x1b[39;49;00m\x1b[34;01m>\x1b[39;49;00mbar\x1b[34;01m</foo>\x1b[39;49;00m\n\n"
        )
项目:apm-agent-python    作者:elastic    | 项目源码 | 文件源码
def test_record_none_exc_info(django_elasticapm_client):
    # sys.exc_info can return (None, None, None) if no exception is being
    # handled anywhere on the stack. See:
    #  http://docs.python.org/library/sys.html#sys.exc_info
    record = logging.LogRecord(
        'foo',
        logging.INFO,
        pathname=None,
        lineno=None,
        msg='test',
        args=(),
        exc_info=(None, None, None),
    )
    handler = LoggingHandler()
    handler.emit(record)

    assert len(django_elasticapm_client.events) == 1
    event = django_elasticapm_client.events.pop(0)['errors'][0]

    assert event['log']['param_message'] == 'test'
    assert event['log']['logger_name'] == 'foo'
    assert event['log']['level'] == 'info'
    assert 'exception' not in event
项目:two1-python    作者:21dotco    | 项目源码 | 文件源码
def format(self, record):
        """ Formats the record.msg string by using click.style()

            The record will have attributes set to it when a user logs a message
            with any kwargs given. This function looks for any attributes that
            are in STYLES. If record has any sytle attributes, the record will
            be styled with the given sytle. This makes click logging with a logger very easy.

        Args:
            record (logging.LogRecord): record which gets styled with click.style()
        """
        # Sets the default kwargs
        kwargs = dict()

        # checks if any click args were passed along in the logger
        for kwarg_name in self.STYLES:
            if hasattr(record, kwarg_name):
                kwargs[kwarg_name] = getattr(record, kwarg_name)

        # styles the message of the record if a style was given
        if kwargs:
            record.msg = click.style(record.msg, **kwargs)

        return record
项目:isar    作者:ilbers    | 项目源码 | 文件源码
def terminate(self):
        if self.terminated:
            return
        self.terminated = True
        def flushevents():
            while True:
                try:
                    event = self.event_queue.get(block=False)
                except (Empty, IOError):
                    break
                if isinstance(event, logging.LogRecord):
                    logger.handle(event)

        signal.signal(signal.SIGINT, signal.SIG_IGN)
        self.procserver.stop()

        while self.procserver.is_alive():
            flushevents()
            self.procserver.join(0.1)

        self.ui_channel.close()
        self.event_queue.close()
        self.event_queue.setexit()

# Wrap Queue to provide API which isn't server implementation specific
项目:fallball-connector    作者:ingrammicro    | 项目源码 | 文件源码
def test_connector_log_formatter_dict_format(self):
        formatter = ConnectorLogFormatter()
        message = {'message': 'fake_message', 'type': 'message_type'}
        record = LogRecord('fake_name', 'DEBUG', None, None, message,
                           None, None)
        record.reseller_name = 'fake_reseller_name'
        # as return result of formatter.format is a string we will check the substring
        actual_record = formatter.format(record)
        assert 'fake_reseller_name' in actual_record
        assert 'MESSAGE_TYPE' in actual_record
        assert datetime.now().isoformat(' ')[:-7] in actual_record
        record.msg = {
            'text': 'fake_text'
        }
        actual_record = formatter.format(record)
        assert 'fake_text' in actual_record
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def setUp(self):
        class CheckingFilter(logging.Filter):
            def __init__(self, cls):
                self.cls = cls

            def filter(self, record):
                t = type(record)
                if t is not self.cls:
                    msg = 'Unexpected LogRecord type %s, expected %s' % (t,
                            self.cls)
                    raise TypeError(msg)
                return True

        BaseTest.setUp(self)
        self.filter = CheckingFilter(DerivedLogRecord)
        self.root_logger.addFilter(self.filter)
        self.orig_factory = logging.getLogRecordFactory()
项目:python-telegram-handler    作者:sashgorokhov    | 项目源码 | 文件源码
def format(self, record):
        """
        :param logging.LogRecord record:
        """
        super(HtmlFormatter, self).format(record)

        if record.funcName:
            record.funcName = escape_html(str(record.funcName))
        if record.name:
            record.name = escape_html(str(record.name))
        if record.msg:
            record.msg = escape_html(record.getMessage())
        if self.use_emoji:
            print(record.name, record.levelno, record.levelname)
            if record.levelno == logging.DEBUG:
                print(record.levelno, record.levelname)
                record.levelname += ' ' + EMOJI.WHITE_CIRCLE
            elif record.levelno == logging.INFO:
                print(record.levelno, record.levelname)
                record.levelname += ' ' + EMOJI.BLUE_CIRCLE
            else:
                record.levelname += ' ' + EMOJI.RED_CIRCLE

        return self.fmt % record.__dict__
项目:certbot    作者:nikoloskii    | 项目源码 | 文件源码
def format(self, record):
        """Formats the string representation of record.

        :param logging.LogRecord record: Record to be formatted

        :returns: Formatted, string representation of record
        :rtype: str

        """
        out = (logging.StreamHandler.format(self, record)
               if sys.version_info < (2, 7)
               else super(StreamHandler, self).format(record))
        if self.colored and record.levelno >= self.red_level:
            return ''.join((util.ANSI_SGR_RED, out, util.ANSI_SGR_RESET))
        else:
            return out
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def setUp(self):
        class CheckingFilter(logging.Filter):
            def __init__(self, cls):
                self.cls = cls

            def filter(self, record):
                t = type(record)
                if t is not self.cls:
                    msg = 'Unexpected LogRecord type %s, expected %s' % (t,
                            self.cls)
                    raise TypeError(msg)
                return True

        BaseTest.setUp(self)
        self.filter = CheckingFilter(DerivedLogRecord)
        self.root_logger.addFilter(self.filter)
        self.orig_factory = logging.getLogRecordFactory()
项目:foil    作者:portfoliome    | 项目源码 | 文件源码
def test_json_formatter(self):
        name = 'name'
        line = 42
        module = 'some_module'
        func = 'some_function'
        msg = {'content': 'sample log'}

        log_record = LogRecord(
            name, INFO, module, line, msg, None, None, func=func
        )
        formatter = JSONFormatter()

        log_result = formatter.format(log_record)
        result = json.loads(log_result)

        # check some of the fields to ensure json formatted correctly
        self.assertEqual(name, result['name'])
        self.assertEqual(line, result['lineNumber'])
        self.assertEqual(func, result['functionName'])
        self.assertEqual(module, result['module'])
        self.assertEqual('INFO', result['level'])
        self.assertEqual(msg, result['message'])
项目:dcos-e2e    作者:mesosphere    | 项目源码 | 文件源码
def _two_masters_error_logged(
        self,
        log_records: List[logging.LogRecord],
    ) -> bool:
        """
        Return whether a particular error is logged as a DEBUG message.

        This is prone to being broken as it checks for a string in the DC/OS
        repository.

        Args:
            log_records: Messages logged from the logger.

        Returns:
            Whether a particular error is logged as a DEBUG message.
        """
        message = 'Must have 1, 3, 5, 7, or 9 masters'
        encountered_error = False
        for record in log_records:
            if record.levelno == logging.DEBUG and message in str(record.msg):
                encountered_error = True
        return encountered_error
项目:qudi    作者:Ulm-IQO    | 项目源码 | 文件源码
def format(self, record):
        """Formatting function

          @param object record: :logging.LogRecord:
        """
        entry = {
                'name': record.name,
                'timestamp': self.formatTime(record,
                    datefmt="%Y-%m-%d %H:%M:%S"),
                'level': record.levelname
        }
        if hasattr(record, 'message'):
            entry['message'] = record.message
        else:
            entry['message'] = super().format(record)
        # add exception information if available
        if record.exc_info is not None:
            entry['exception'] = {
                    'message': traceback.format_exception(
                        *record.exc_info)[-1][:-1],
                    'traceback': traceback.format_exception(
                        *record.exc_info)[:-1]
                    }

        return entry
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def setUp(self):
        class CheckingFilter(logging.Filter):
            def __init__(self, cls):
                self.cls = cls

            def filter(self, record):
                t = type(record)
                if t is not self.cls:
                    msg = 'Unexpected LogRecord type %s, expected %s' % (t,
                            self.cls)
                    raise TypeError(msg)
                return True

        BaseTest.setUp(self)
        self.filter = CheckingFilter(DerivedLogRecord)
        self.root_logger.addFilter(self.filter)
        self.orig_factory = logging.getLogRecordFactory()
项目:endpoints-python    作者:cloudendpoints    | 项目源码 | 文件源码
def logMessages(self, request):
    """Write a log message from the Swarm FE to the log.

    Args:
      request: A log message request.

    Returns:
      Void message.
    """
    Level = api_backend.LogMessagesRequest.LogMessage.Level
    log = logging.getLogger(__name__)
    for message in request.messages:
      level = message.level if message.level is not None else Level.info
      # Create a log record and override the pathname and lineno.  These
      # messages come from the front end, so it's misleading to say that they
      # come from api_backend_service.
      record = logging.LogRecord(name=__name__, level=level.number, pathname='',
                                 lineno='', msg=message.message, args=None,
                                 exc_info=None)
      log.handle(record)

    return message_types.VoidMessage()
项目:openadms-node    作者:dabamos    | 项目源码 | 文件源码
def format(self, record: logging.LogRecord) -> str:
        """Return formatted string of log record.

        Args:
            record: The log record.

        Returns:
            Formatted string of log record.
        """
        try:
            record.asctime
        except AttributeError:
            record.asctime = self.formatTime(record, self.datefmt)

        try:
            record.message
        except AttributeError:
            record.message = record.msg

        s = '{} - {:>8} - {:>26} - {}'.format(record.asctime,
                                              record.levelname,
                                              record.name,
                                              record.message)

        return s
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def setUp(self):
        class CheckingFilter(logging.Filter):
            def __init__(self, cls):
                self.cls = cls

            def filter(self, record):
                t = type(record)
                if t is not self.cls:
                    msg = 'Unexpected LogRecord type %s, expected %s' % (t,
                            self.cls)
                    raise TypeError(msg)
                return True

        BaseTest.setUp(self)
        self.filter = CheckingFilter(DerivedLogRecord)
        self.root_logger.addFilter(self.filter)
        self.orig_factory = logging.getLogRecordFactory()
项目:lago    作者:lago-project    | 项目源码 | 文件源码
def handle_new_task(self, task_name, record):
        """
        Do everything needed when a task is starting

        Params:
            task_name (str): name of the task that is starting
            record (logging.LogRecord): log record with all the info

        Returns:
            None
        """
        record.msg = ColorFormatter.colored('default', START_TASK_MSG)
        record.task = task_name

        self.tasks[task_name] = Task(name=task_name, maxlen=self.buffer_size)
        if self.should_show_by_depth():
            self.pretty_emit(record, is_header=True)
项目:pyblish-lite    作者:pyblish    | 项目源码 | 文件源码
def test_label_nonstring():
    """Logging things that aren't string is fine"""

    result = {
        "records": [
            logging.LogRecord("root", "INFO", "", 0, msg, [], None)
            for msg in (
                "Proper message",
                12,
                {"a": "dict"},
                list(),
                1.0,
            )
        ],
        "error": None
    }

    model_ = model.Terminal()
    model_.update_with_result(result)

    for item in model_:
        assert isinstance(item.data(model.Label), six.text_type), (
            "\"%s\" wasn't a string!" % item.data(model.Label))
项目:seqlog    作者:tintoy    | 项目源码 | 文件源码
def emit(self, record):
        """
        Emit a log record.

        :param record: The LogRecord.
        """

        self.log_queue.put(record, block=False)
项目:seqlog    作者:tintoy    | 项目源码 | 文件源码
def _build_event_data(record):
    """
    Build an event data dictionary from the specified log record for submission to Seq.

    :param record: The LogRecord.
    :type record: StructuredLogRecord
    :return: A dictionary containing event data representing the log record.
    :rtype: dict
    """

    if record.args:
        # Standard (unnamed) format arguments (use 0-base index as property name).
        log_props_shim = get_global_log_properties(record.name)
        for (arg_index, arg) in enumerate(record.args or []):
            log_props_shim[str(arg_index)] = arg

        event_data = {
            "Timestamp": _get_local_timestamp(record),
            "Level": logging.getLevelName(record.levelno),
            "MessageTemplate": record.getMessage(),
            "Properties": log_props_shim
        }
    elif isinstance(record, StructuredLogRecord):
        # Named format arguments (and, therefore, log event properties).
        event_data = {
            "Timestamp": _get_local_timestamp(record),
            "Level": logging.getLevelName(record.levelno),
            "MessageTemplate": record.msg,
            "Properties": record.log_props
        }
    else:
        # No format arguments; interpret message as-is.
        event_data = {
            "Timestamp": _get_local_timestamp(record),
            "Level": logging.getLevelName(record.levelno),
            "MessageTemplate": record.getMessage(),
            "Properties": _global_log_props
        }

    return event_data
项目:aws-encryption-sdk-cli    作者:awslabs    | 项目源码 | 文件源码
def __redact_encrypt_request(self, record):
        # type: (logging.LogRecord) -> None
        """Redact the ``Plaintext`` value from a kms:Encrypt request.

        :param record: Logging record to filter
        :type record: logging.LogRecord
        """
        try:
            parsed_body = json.loads(self.__to_str(cast(tuple, record.args)[-1]['body']))
            parsed_body['Plaintext'] = _REDACTED
            cast(tuple, record.args)[-1]['body'] = json.dumps(parsed_body, sort_keys=True)
        except Exception:  # pylint: disable=broad-except
            return
项目:aws-encryption-sdk-cli    作者:awslabs    | 项目源码 | 文件源码
def __redact_key_from_response(self, record):
        # type: (logging.LogRecord) -> None
        """Redact the ``Plaintext`` value from a KMS response body.

        :param record: Logging record to filter
        :type record: logging.LogRecord
        """
        try:
            parsed_body = json.loads(self.__to_str(cast(tuple, record.args)[0]))
            parsed_body['Plaintext'] = _REDACTED
            new_args = (json.dumps(parsed_body, sort_keys=True),) + cast(tuple, record.args)[1:]
            record.args = new_args
        except Exception:  # pylint: disable=broad-except
            return
项目:aws-encryption-sdk-cli    作者:awslabs    | 项目源码 | 文件源码
def __redact_record(self, record):
        # type: (logging.LogRecord) -> logging.LogRecord
        """Redact any values from a record, as necessary.

        :param record: Logging record to filter
        :type record: logging.LogRecord
        """
        _record = copy.deepcopy(record)
        if self.__is_kms_encrypt_request(_record):
            self.__redact_encrypt_request(_record)
        elif self.__is_kms_response_with_plaintext(_record):
            self.__redact_key_from_response(_record)
        return _record
项目:aws-encryption-sdk-cli    作者:awslabs    | 项目源码 | 文件源码
def filter(self, record):
        # type: (logging.LogRecord) -> bool
        """Determines whether to filter record.

        :param record: Logging record to filter
        :type record: logging.LogRecord
        :rtype: bool
        """
        return record.name not in self.__blacklist
项目:belogging    作者:georgeyk    | 项目源码 | 文件源码
def create_record(name='test_logger', level='DEBUG', msg='foobar'):
    return LogRecord(name=name, level=LEVEL_MAP[level], msg=msg, lineno=None,
                     args=None, exc_info=None, pathname=None)

# LoggerFilter
项目:devsecops-example-helloworld    作者:boozallen    | 项目源码 | 文件源码
def assertLogRecords(self, records, matches):
        self.assertEqual(len(records), len(matches))
        for rec, match in zip(records, matches):
            self.assertIsInstance(rec, logging.LogRecord)
            for k, v in match.items():
                self.assertEqual(getattr(rec, k), v)
项目:EMFT    作者:132nd-etcher    | 项目源码 | 文件源码
def __init__(self, records: typing.List[logging.LogRecord]):
        self._records = records
项目:EMFT    作者:132nd-etcher    | 项目源码 | 文件源码
def filter_by_level(self, minimum_level=logging.NOTSET):

        minimum_level = self._sanitize_level(minimum_level)

        def filter_func(rec: logging.LogRecord):
            return rec.levelno >= minimum_level

        return self.__filter(filter_func)
项目:EMFT    作者:132nd-etcher    | 项目源码 | 文件源码
def _filter_by_str(self, text: str or None, rec_attrib):

        if text is None:
            return self

        def filter_func(rec: logging.LogRecord):
            return text.lower() in getattr(rec, rec_attrib)

        return self.__filter(filter_func)
项目:EMFT    作者:132nd-etcher    | 项目源码 | 文件源码
def __iter__(self) -> typing.Generator[logging.LogRecord, None, None]:
        for x in self._records:
            yield x
项目:EMFT    作者:132nd-etcher    | 项目源码 | 文件源码
def handle_record(self, record: logging.LogRecord):
        raise NotImplementedError()
项目:EMFT    作者:132nd-etcher    | 项目源码 | 文件源码
def emit(self, record: logging.LogRecord):
        self._records.append(record)
        for follower in self._followers:
            follower.handle_record(record)
项目:EMFT    作者:132nd-etcher    | 项目源码 | 文件源码
def records(self) -> typing.Iterator[logging.LogRecord]:
        return self.filter_records(**self.filters)
项目:EMFT    作者:132nd-etcher    | 项目源码 | 文件源码
def handle_record(self, record: logging.LogRecord):
        if self.filter_record(record, **self.filters):
            I.tab_log_write(self.format(record), str(self.colors[record.levelname]))
项目:EMFT    作者:132nd-etcher    | 项目源码 | 文件源码
def _send(self):
        content = []
        for rec in self.filter_records():
            assert isinstance(rec, logging.LogRecord)
            content.append(self.format(rec))
        url = pastebin.create_new_paste('\n'.join(content))
        if url:
            SENTRY.captureMessage('Logfile', extra={'log_url': url})
            self.tab_log_write('Log file sent; thank you !')
        else:
            self.tab_log_write('Could not send log file')
项目:TCP-IP    作者:JackZ0    | 项目源码 | 文件源码
def shouldFlush(self, record):
        """Should the buffer be automatically flushed?

        :param logging.LogRecord record: log record to be considered

        :returns: False because the buffer should never be auto-flushed
        :rtype: bool

        """
        return False
项目:TCP-IP    作者:JackZ0    | 项目源码 | 文件源码
def emit(self, record):
        """Log the specified logging record.

        :param logging.LogRecord record: Record to be formatted

        """
        self._delete = False
        # logging handlers use old style classes in Python 2.6 so
        # super() cannot be used
        if sys.version_info < (2, 7):  # pragma: no cover
            logging.StreamHandler.emit(self, record)
        else:
            super(TempHandler, self).emit(record)
项目:midi    作者:MicroTransactionsMatterToo    | 项目源码 | 文件源码
def filter(self, record: logging.LogRecord):
        if record.exc_info == (None, None, None):
            return False
        else:
            return True
项目:filters    作者:eflglobal    | 项目源码 | 文件源码
def __init__(self, level=logging.NOTSET):
        # type: (int) -> None
        super(MemoryLogHandler, self).__init__(level)

        self._records           = [] # type: List[logging.LogRecord]
        self.max_level_emitted  = logging.NOTSET