我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用logging.makeLogRecord()。
def _handle_logging(self, level, message): """ Handles an RPC logging request. :param str level: One of "debug", "info", "warning", or "error". :param str message: The log message. """ # manually create a record to log to the standard file handler. # we format it to match the regular logs, but tack on the '.js' to # indicate that it came from javascript. record = logging.makeLogRecord({ "levelname": level.upper(), "name": "%s.js" % (self.logger.name,), "msg": message, }) # forward this message to the base file handler so that it is logged # appropriately. if sgtk.LogManager().base_file_handler: sgtk.LogManager().base_file_handler.handle(record)
def handle(self): ''' Handle multiple requests - each expected to be a 4-byte length, followed by the LogRecord in pickle format. ''' while 1: chunk = self.connection.recv(4) if len(chunk) < 4: break slen = struct.unpack('>L', chunk)[0] chunk = self.connection.recv(slen) while len(chunk) < slen: chunk = chunk + self.connection.recv(slen - len(chunk)) obj = self.unPickle(chunk) record = logging.makeLogRecord(obj) self.handleLogRecord(record)
def test_multiline_formatter(): import logging from ldap2pg.config import MultilineFormatter formatter = MultilineFormatter("prefix: %(message)s") base_record = dict( name='pouet', level=logging.DEBUG, fn="(unknown file)", lno=0, args=(), exc_info=None, ) record = logging.makeLogRecord(dict(base_record, msg="single line")) payload = formatter.format(record) assert "prefix: single line" == payload record = logging.makeLogRecord(dict(base_record, msg="Uno\nDos\nTres")) payload = formatter.format(record) wanted = """\ prefix: Uno prefix: Dos prefix: Tres\ """.replace(' ', '') assert wanted == payload
def read_logs(self): try: while True: datagram = self.socket.recv(8192) chunk = datagram[0:4] struct.unpack(">L", chunk)[0] chunk = datagram[4:] obj = cPickle.loads(chunk) record = logging.makeLogRecord(obj) if (record.levelno >= self.level): logger = logging.getLogger(record.name) logger.handle(record) except Exception as e: print "ERROR: " + str(e) finally: self.socket.close()
def test_logger_format(self): """Test: CONFIG: logger format.""" config = SRBConfig(logger) config.logger_add_stream_handler() config.logger_add_file_handler('logs.log') fmt = '%(name)s - %(levelname)s - %(message)s' formatter = logging.Formatter(fmt) config.logger_format = fmt tst_record = { 'name': 'test_logger', 'level': logging.DEBUG, 'pathname': os.path.realpath(__file__), 'lineno': 42, 'msg': 'test_msg', 'args': None, 'exc_info': None, 'func': 'test_logger_format' } rec = logging.makeLogRecord(tst_record) self.assertEqual(formatter.format(rec), config.logger_stream_handler.format(rec))
def handle(self): """Handle multiple requests - each expected to be a 4-byte length, followed by the LogRecord in pickle format. Logs the record according to whatever policy is configured locally. """ while True: chunk = self.connection.recv(4) if len(chunk) < 4: break slen = struct.unpack('>L', chunk)[0] chunk = self.connection.recv(slen) while len(chunk) < slen: chunk = chunk + self.connection.recv(slen - len(chunk)) obj = self.unpickle(chunk) record = logging.makeLogRecord(obj) self.handle_log_record(record)
def test_error_handling(self): h = TestStreamHandler(BadStream()) r = logging.makeLogRecord({}) old_raise = logging.raiseExceptions old_stderr = sys.stderr try: h.handle(r) self.assertIs(h.error_record, r) h = logging.StreamHandler(BadStream()) sys.stderr = sio = io.StringIO() h.handle(r) self.assertIn('\nRuntimeError: deliberate mistake\n', sio.getvalue()) logging.raiseExceptions = False sys.stderr = sio = io.StringIO() h.handle(r) self.assertEqual('', sio.getvalue()) finally: logging.raiseExceptions = old_raise sys.stderr = old_stderr # -- The following section could be moved into a server_helper.py module # -- if it proves to be of wider utility than just test_logging
def test_basic(self): sockmap = {} server = TestSMTPServer(('localhost', 0), self.process_message, 0.001, sockmap) server.start() addr = ('localhost', server.port) h = logging.handlers.SMTPHandler(addr, 'me', 'you', 'Log', timeout=5.0) self.assertEqual(h.toaddrs, ['you']) self.messages = [] r = logging.makeLogRecord({'msg': 'Hello'}) self.handled = threading.Event() h.handle(r) self.handled.wait(5.0) # 14314: don't wait forever server.stop() self.assertTrue(self.handled.is_set()) self.assertEqual(len(self.messages), 1) peer, mailfrom, rcpttos, data = self.messages[0] self.assertEqual(mailfrom, 'me') self.assertEqual(rcpttos, ['you']) self.assertIn('\nSubject: Log\n', data) self.assertTrue(data.endswith('\n\nHello')) h.close()
def test_basic(self): sockmap = {} server = TestSMTPServer((HOST, 0), self.process_message, 0.001, sockmap) server.start() addr = (HOST, server.port) h = logging.handlers.SMTPHandler(addr, 'me', 'you', 'Log', timeout=self.TIMEOUT) self.assertEqual(h.toaddrs, ['you']) self.messages = [] r = logging.makeLogRecord({'msg': 'Hello \u2713'}) self.handled = threading.Event() h.handle(r) self.handled.wait(self.TIMEOUT) # 14314: don't wait forever server.stop() self.assertTrue(self.handled.is_set()) self.assertEqual(len(self.messages), 1) peer, mailfrom, rcpttos, data = self.messages[0] self.assertEqual(mailfrom, 'me') self.assertEqual(rcpttos, ['you']) self.assertIn('\nSubject: Log\n', data) self.assertTrue(data.endswith('\n\nHello \u2713')) h.close()
def handle(self): """ Handle multiple requests - each expected to be a 4-byte length, followed by the LogRecord in pickle format. Logs the record according to whatever policy is configured locally. """ while True: chunk = self.connection.recv(4) if len(chunk) < 4: break slen = struct.unpack('>L', chunk)[0] chunk = self.connection.recv(slen) while len(chunk) < slen: chunk = chunk + self.connection.recv(slen - len(chunk)) obj = self.unPickle(chunk) record = logging.makeLogRecord(obj) self.handleLogRecord(record)
def test_basic(self): sockmap = {} server = TestSMTPServer(('localhost', 0), self.process_message, 0.001, sockmap) server.start() addr = ('localhost', server.port) h = logging.handlers.SMTPHandler(addr, 'me', 'you', 'Log', timeout=self.TIMEOUT) self.assertEqual(h.toaddrs, ['you']) self.messages = [] r = logging.makeLogRecord({'msg': 'Hello'}) self.handled = threading.Event() h.handle(r) self.handled.wait(self.TIMEOUT) # 14314: don't wait forever server.stop() self.assertTrue(self.handled.is_set()) self.assertEqual(len(self.messages), 1) peer, mailfrom, rcpttos, data = self.messages[0] self.assertEqual(mailfrom, 'me') self.assertEqual(rcpttos, ['you']) self.assertIn('\nSubject: Log\n', data) self.assertTrue(data.endswith('\n\nHello')) h.close()
def test_color_handler(): import logging from ldap2pg.config import ColoredStreamHandler handler = ColoredStreamHandler() record = logging.makeLogRecord(dict( name='pouet', level=logging.DEBUG, fn="(unknown file)", msg="Message", lno=0, args=(), exc_info=None, )) payload = handler.format(record) assert "\033[0" in payload
def update(self,frame,events): if self._socket: while self._socket.new_data: t,s = self._socket.recv() self.on_log(logging.makeLogRecord(s))
def recent_events(self,events): if self._socket and self._socket.new_data: t,s = self._socket.recv() self.on_log(logging.makeLogRecord(s)) self.alpha -= min(.2,events['dt'])
def test_emitReturnsLoggedMessageId(self): """ MDKHandler.emit returns the LoggedMessageId from the Session. """ mdk, tracer = create_mdk_with_faketracer() session = mdk.session() handler = MDKHandler(mdk, lambda: session) record = logging.makeLogRecord({"name": "", "levelname": "INFO", "message": "hello"}) mid = handler.emit(record) self.assertIsInstance(mid, LoggedMessageId) self.assertEqual(mid.traceId, session._context.traceId)
def test_emitReturnsLoggedMessageId(self): """ MDKLoggingHandler.emit returns the LoggedMessageId. """ mdk, tracer = create_mdk_with_faketracer() handler = MDKLoggingHandler(mdk) record = logging.makeLogRecord({"name": "", "levelname": "INFO", "message": "hello"}) mid = handler.emit(record) self.assertIsInstance(mid, LoggedMessageId)
def handle(self): """Deal with the incoming log data""" while True: chunk = self.connection.recv(4) if len(chunk) < 4: break struct_len = struct.unpack('>L', chunk)[0] chunk = self.connection.recv(struct_len) while len(chunk) < struct_len: chunk = chunk + self.connection.recv(struct_len - len(chunk)) obj = self.unpickle(chunk) record = logging.makeLogRecord(obj) self.handle_log_record(record)
def run(self): """Run loop. Receives log messages from connected publishers and logs them via a python logging interface. """ log = logging.getLogger('sip.logging_aggregator') fail_count = 0 fail_count_limit = 100 # Exponential relaxation of timeout in event loop. timeout = np.logspace(-6, -2, fail_count_limit) while not self._stop_requested.is_set(): try: topic, values = self._subscriber.recv_multipart(zmq.NOBLOCK) str_values = values.decode('utf-8') try: dict_values = json.loads(str_values) record = logging.makeLogRecord(dict_values) log.handle(record) fail_count = 0 except json.decoder.JSONDecodeError: print('ERROR: Unable to convert JSON log record.') raise except zmq.ZMQError as e: if e.errno == zmq.EAGAIN: fail_count += 1 else: raise # Re-raise the exception if fail_count < fail_count_limit: _timeout = timeout[fail_count] else: _timeout = timeout[-1] self._stop_requested.wait(_timeout)
def handle(self): """Handle multiple requests - each expected to be of 4-byte length, followed by the LogRecord in pickle format. Logs the record according to whatever policy is configured locally.""" while True: chunk = self.connection.recv(4) if len(chunk) < 4: break slen = struct.unpack(">L", chunk)[0] chunk = self.connection.recv(slen) while len(chunk) < slen: chunk = chunk + self.connection.recv(slen - len(chunk)) obj = self.unpickle(chunk) record = logging.makeLogRecord(obj) self.handle_log_record(record)
def get_record(self, name=None): result = dict(self.common) if name is not None: result.update(self.variants[name]) return logging.makeLogRecord(result)
def test_html_formatter_format(): formatter = formatters.HtmlFormatter() logrecord = logging.makeLogRecord(dict(name='<foo>', func='<module>', msg='Whatsup?', funcName='test')) s = formatter.format(logrecord) assert '<foo>' not in s assert '<module>' not in s
def test_html_formatter_emoji(): formatter = formatters.HtmlFormatter(use_emoji=True) emoji_level_map = { formatters.EMOJI.WHITE_CIRCLE: [logging.DEBUG], formatters.EMOJI.BLUE_CIRCLE: [logging.INFO], formatters.EMOJI.RED_CIRCLE: [logging.WARNING, logging.ERROR] } for emoji, levels in emoji_level_map.items(): for level in levels: logrecord = logging.makeLogRecord({'levelno': level, 'levelname': logging.getLevelName(level)}) s = formatter.format(logrecord) assert s.find(emoji) > 0, 'Emoji not found in %s' % level
def test_emit(handler): record = logging.makeLogRecord({'msg': 'hello'}) with mock.patch('requests.post') as patch: handler.emit(record) assert patch.called assert patch.call_count == 1 assert patch.call_args[1]['json']['chat_id'] == 'bar' assert 'hello' in patch.call_args[1]['json']['text'] assert patch.call_args[1]['json']['parse_mode'] == 'HTML'
def test_emit_http_exception(handler): record = logging.makeLogRecord({'msg': 'hello'}) with mock.patch('requests.post') as patch: response = requests.Response() response.status_code = 500 response._content = 'Server error'.encode() patch.return_value = response handler.emit(record) assert telegram_handler.handlers.logger.handlers[0].messages['error'] assert telegram_handler.handlers.logger.handlers[0].messages['debug']
def test_emit_telegram_error(handler): record = logging.makeLogRecord({'msg': 'hello'}) with mock.patch('requests.post') as patch: response = requests.Response() response.status_code = 200 response._content = json.dumps({'ok': False}).encode() patch.return_value = response handler.emit(record) assert telegram_handler.handlers.logger.handlers[0].messages['warning']
def test_adds_padding(self): self.handler.emit(logging.makeLogRecord({})) self.d.infobox.assert_called_once_with(mock.ANY, 4, 10)
def test_args_in_msg_get_replaced(self): assert len('123456') <= self.handler.width self.handler.emit(logging.makeLogRecord( {'msg': '123%s', 'args': (456,)})) self.d.infobox.assert_called_once_with('123456', mock.ANY, mock.ANY)
def test_wraps_nospace_is_greedy(self): assert len('1234567') > self.handler.width self.handler.emit(logging.makeLogRecord({'msg': '1234567'})) self.d.infobox.assert_called_once_with('123456\n7', mock.ANY, mock.ANY)
def test_wraps_at_whitespace(self): assert len('123 567') > self.handler.width self.handler.emit(logging.makeLogRecord({'msg': '123 567'})) self.d.infobox.assert_called_once_with('123\n567', mock.ANY, mock.ANY)
def test_only_last_lines_are_printed(self): assert len('a\nb\nc'.split()) > self.handler.height self.handler.emit(logging.makeLogRecord({'msg': 'a\n\nb\nc'})) self.d.infobox.assert_called_once_with('b\nc', mock.ANY, mock.ANY)
def test_race(self): # Issue #14632 refers. def remove_loop(fname, tries): for _ in range(tries): try: os.unlink(fname) except OSError: pass time.sleep(0.004 * random.randint(0, 4)) del_count = 500 log_count = 500 for delay in (False, True): fd, fn = tempfile.mkstemp('.log', 'test_logging-3-') os.close(fd) remover = threading.Thread(target=remove_loop, args=(fn, del_count)) remover.daemon = True remover.start() h = logging.handlers.WatchedFileHandler(fn, delay=delay) f = logging.Formatter('%(asctime)s: %(levelname)s: %(message)s') h.setFormatter(f) try: for _ in range(log_count): time.sleep(0.005) r = logging.makeLogRecord({'msg': 'testing' }) h.handle(r) finally: remover.join() try: h.close() except ValueError: pass if os.path.exists(fn): os.unlink(fn) # Set the locale to the platform-dependent default. I have no idea # why the test does this, but in any case we save the current locale # first and restore it at the end.
def log(self, msg: Message): d = msg._asdict() d['asctime'] = datetime.fromtimestamp(msg.time, self.tz).strftime('%Y-%m-%d %H:%M:%S') d['srcname'] = msg.src.alias d['srcid'] = msg.src.id self.loghandler.emit(logging.makeLogRecord({'msg': self.FORMAT % d}))
def post(self): self.log.handle(logging.makeLogRecord(self.get_json_body()))
def test_empty_filter(self): f = logging.Filter() r = logging.makeLogRecord({'name': 'spam.eggs'}) self.assertTrue(f.filter(r)) # # First, we define our levels. There can be as many as you want - the only # limitations are that they should be integers, the lowest should be > 0 and # larger values mean less information being logged. If you need specific # level values which do not fit into these limitations, you can use a # mapping dictionary to convert between your application levels and the # logging system. #
def test_race(self): # Issue #14632 refers. def remove_loop(fname, tries): for _ in range(tries): try: os.unlink(fname) except OSError: pass time.sleep(0.004 * random.randint(0, 4)) del_count = 500 log_count = 500 for delay in (False, True): fd, fn = tempfile.mkstemp('.log', 'test_logging-3-') os.close(fd) remover = threading.Thread(target=remove_loop, args=(fn, del_count)) remover.daemon = True remover.start() h = logging.handlers.WatchedFileHandler(fn, delay=delay) f = logging.Formatter('%(asctime)s: %(levelname)s: %(message)s') h.setFormatter(f) try: for _ in range(log_count): time.sleep(0.005) r = logging.makeLogRecord({'msg': 'testing' }) h.handle(r) finally: remover.join() h.close() if os.path.exists(fn): os.unlink(fn)
def handle_datagram(self, request): slen = struct.pack('>L', 0) # length of prefix packet = request.packet[len(slen):] obj = pickle.loads(packet) record = logging.makeLogRecord(obj) self.log_output += record.msg + '\n' self.handled.set()
def setUp(self): self.records = [ logging.makeLogRecord({'msg': 'one'}), logging.makeLogRecord({'msg': 'two'}), ]
def test_str_rep(self): r = logging.makeLogRecord({}) s = str(r) self.assertTrue(s.startswith('<LogRecord: ')) self.assertTrue(s.endswith('>'))
def test_multiprocessing(self): r = logging.makeLogRecord({}) self.assertEqual(r.processName, 'MainProcess') try: import multiprocessing as mp r = logging.makeLogRecord({}) self.assertEqual(r.processName, mp.current_process().name) except ImportError: pass
def test_delay(self): os.unlink(self.fn) fh = logging.FileHandler(self.fn, delay=True) self.assertIsNone(fh.stream) self.assertFalse(os.path.exists(self.fn)) fh.handle(logging.makeLogRecord({})) self.assertIsNotNone(fh.stream) self.assertTrue(os.path.exists(self.fn)) fh.close()
def test_rollover(self): fh = logging.handlers.TimedRotatingFileHandler(self.fn, 'S', backupCount=1) fmt = logging.Formatter('%(asctime)s %(message)s') fh.setFormatter(fmt) r1 = logging.makeLogRecord({'msg': 'testing - initial'}) fh.emit(r1) self.assertLogFile(self.fn) time.sleep(1.1) # a little over a second ... r2 = logging.makeLogRecord({'msg': 'testing - after delay'}) fh.emit(r2) fh.close() # At this point, we should have a recent rotated file which we # can test for the existence of. However, in practice, on some # machines which run really slowly, we don't know how far back # in time to go to look for the log file. So, we go back a fair # bit, and stop as soon as we see a rotated file. In theory this # could of course still fail, but the chances are lower. found = False now = datetime.datetime.now() GO_BACK = 5 * 60 # seconds for secs in range(GO_BACK): prev = now - datetime.timedelta(seconds=secs) fn = self.fn + prev.strftime(".%Y-%m-%d_%H-%M-%S") found = os.path.exists(fn) if found: self.rmfiles.append(fn) break msg = 'No rotated files found, went back %d seconds' % GO_BACK if not found: #print additional diagnostics dn, fn = os.path.split(self.fn) files = [f for f in os.listdir(dn) if f.startswith(fn)] print('Test time: %s' % now.strftime("%Y-%m-%d %H-%M-%S"), file=sys.stderr) print('The only matching files are: %s' % files, file=sys.stderr) for f in files: print('Contents of %s:' % f) path = os.path.join(dn, f) with open(path, 'r') as tf: print(tf.read()) self.assertTrue(found, msg=msg)
def test_basic(self): logtype = 'Application' elh = win32evtlog.OpenEventLog(None, logtype) num_recs = win32evtlog.GetNumberOfEventLogRecords(elh) h = logging.handlers.NTEventLogHandler('test_logging') r = logging.makeLogRecord({'msg': 'Test Log Message'}) h.handle(r) h.close() # Now see if the event is recorded self.assertTrue(num_recs < win32evtlog.GetNumberOfEventLogRecords(elh)) flags = win32evtlog.EVENTLOG_BACKWARDS_READ | \ win32evtlog.EVENTLOG_SEQUENTIAL_READ found = False GO_BACK = 100 events = win32evtlog.ReadEventLog(elh, flags, GO_BACK) for e in events: if e.SourceName != 'test_logging': continue msg = win32evtlogutil.SafeFormatMessage(e, logtype) if msg != 'Test Log Message\r\n': continue found = True break msg = 'Record not found in event log, went back %d records' % GO_BACK self.assertTrue(found, msg=msg) # Set the locale to the platform-dependent default. I have no idea # why the test does this, but in any case we save the current locale # first and restore it at the end.