我们从Python开源项目中,提取了以下16个代码示例,用于说明如何使用logging.logger()。
def build_yt_api(): """Build the YouTube API for future use""" data = datatools.get_data() if "google_api_key" not in data["discord"]["keys"]: logger.warning("No API key found with name 'google_api_key'") logger.info("Please add your Google API key with name 'google_api_key' " "in data.json to use YouTube features of the music module") return False logger.debug("Building YouTube discovery API") ytdevkey = data["discord"]["keys"]["google_api_key"] try: global ytdiscoveryapi ytdiscoveryapi = googleapiclient.discovery.build("youtube", "v3", developerKey=ytdevkey) logger.debug("YouTube API build successful") return True except Exception as e: logger.exception(e) logger.warning("HTTP error connecting to YouTube API, YouTube won't be available") return False
def build_sc_api(): """Build the SoundCloud API for future use""" data = datatools.get_data() if "soundcloud_client_id" not in data["discord"]["keys"]: logger.warning("No API key found with name 'soundcloud_client_id'") logger.info("Please add your SoundCloud client id with name 'soundcloud_client_id' " "in data.json to use Soundcloud features of the music module") return False try: global scclient scclient = soundcloud.Client(client_id=data["discord"]["keys"]["soundcloud_client_id"]) logger.debug("SoundCloud build successful") return True except Exception as e: logger.exception(e) return False
def build_spotify_api(): """Build the Spotify API for future use""" data = datatools.get_data() if "spotify_client_id" not in data["discord"]["keys"]: logger.warning("No API key found with name 'spotify_client_id'") logger.info("Please add your Spotify client id with name 'spotify_client_id' " "in data.json to use Spotify features of the music module") return False if "spotify_client_secret" not in data["discord"]["keys"]: logger.warning("No API key found with name 'spotify_client_secret'") logger.info("Please add your Spotify client secret with name 'spotify_client_secret' " "in data.json to use Spotify features of the music module") return False try: global spclient client_credentials_manager = SpotifyClientCredentials( data["discord"]["keys"]["spotify_client_id"], data["discord"]["keys"]["spotify_client_secret"]) spclient = spotipy.Spotify(client_credentials_manager=client_credentials_manager) logger.debug("Spotify build successful") return True except Exception as e: logger.exception(e) return False
def get_sc_tracks(result): if result.kind == "track": logger.debug("SoundCloud Track {}".format(result.title)) return [[result.stream_url, result.title]] elif result.kind == "user": track_list = [] logger.debug("SoundCloud User {}".format(result.username)) tracks = scclient.get("/users/{}/tracks".format(result.id), limit=50) for t in tracks: track_list.append([t.stream_url, t.title]) return track_list elif result.kind == "playlist": track_list = [] logger.debug("SoundCloud Playlist {}".format(result.title)) playlist = scclient.get("/playlists/{}".format(result.id), limit=50) tracks = playlist.tracks for t in tracks: track_list.append([t["stream_url"], t["title"]]) return track_list return None
def log_error(self, log_msg, sql='', err_msg= '', ex = None): if ex: for arg in ex.args: err_msg += str(arg) while err_msg.endswith('\n'): err_msg = err_msg[:-1] msg = """ <red>========================================================================== ERROR tijdens : {} SQL: {} ERROR: {} ==========================================================================</>""".format(log_msg, sql, err_msg) if 'on_errors' in self.config and self.config['on_errors'] == 'throw': raise Exception(msg) else: self.logger.error(self.strip_formatting_tags(msg)) self.errors.append(msg) if self.to_console: Logger.pprint(msg) # even wachten opdat de log-msg niet verward raakt door de foutmelding sleep(0.1)
def log(request): 'return a logging.logger instance for this test session' return pytest.log.getChild(request.node.name.partition('[')[0])
def __init__(self, opsim_db, db_config=None, logger=None): """ Constructor. Parameters ---------- opsim_db : str sqlite3 db file containing observing plan. db_config : dict, optional Dictionary of database connection parameters. Parameters for connecting to fatboy.phys.washington.edu from a whitelisted machine will be used. logger : logging.logger, optional Logger object. """ self.gen = ObservationMetaDataGenerator(database=opsim_db, driver='sqlite') if db_config is not None: self.db_config = db_config else: self.db_config = dict(database='LSSTCATSIM', port=1433, host='fatboy.phys.washington.edu', driver='mssql+pymssql') if logger is None: logging.basicConfig(format="%(message)s", level=logging.INFO, stream=sys.stdout) logger = logging.getLogger() self.logger = logger
def start_logger(filename,level): global swan_logger if swan_logger is not None: return swan_logger level=getattr(logging,level.upper()) log_handler=logging.FileHandler(filename=filename) log_handler.setLevel(level) logger=logging.getLogger(LOGGERNAME) logger.setLevel(level) logger.addHandler(log_handler) swan_logger=logger return logger
def get_logger(): """Get the current logger object :returns: logger :rtype: logging.logger """ global swan_logger return swan_logger
def __init__(self): self.logger = None #type: logging.logger self.start_time = datetime.now() # type: datetime.datetime self.last_start_time = datetime.now() # type: datetime.datetime self.errors = [] #type: List[str] self.to_console = True # type: bool self.filename = '' # type: str
def create_logger(logger_type: str = LoggerTypes.MAIN, runid: float = 0, configs={}, to_console: bool = True, filename_args = '') -> 'Logger': logger = logging.getLogger(logger_type) # create file path, filename = Logger.__create_path_and_filename_by_type(logger_type, runid, configs, filename_args) if len(logger.handlers) == 0: # create formatter if logger_type == LoggerTypes.MAIN: formatter = logging.Formatter('%(asctime)s - %(message)s') else: formatter = logging.Formatter('%(message)s') fileHandler = logging.FileHandler(path + filename) fileHandler.setFormatter(formatter) logger.addHandler(fileHandler) # if to_console: # consoleHandler = logging.StreamHandler() # consoleHandler.setFormatter(formatter) # logger.addHandler(consoleHandler) logger.setLevel(logging.INFO) # logger.errors = [] log_obj = Logger() log_obj.logger = logger log_obj.start_time = datetime.now() log_obj.last_start_time = datetime.now() log_obj.errors = [] # logger.errors log_obj.to_console = to_console if 'log_to_console' in configs: log_obj.to_console = configs['log_to_console'] log_obj.filename = filename log_obj.config = configs return log_obj
def log_simple(self, msg: str) -> None: self.logger.info(self.strip_formatting_tags(msg)) if self.to_console: Logger.pprint(msg)
def log(self, descr: str, last_start_time: datetime = None, rowcount: int = -1, newline: bool = False, indent_level=0) -> None: if not last_start_time: last_start_time = self.last_start_time newline_str = '' #type: str if newline: newline_str = '\r\n' end_time = datetime.now() global_time_descr = '' if last_start_time: global_elapsed_time = end_time - self.start_time elapsed_time_since_last_log = end_time - last_start_time global_time_descr = self.time_descr(global_elapsed_time) if descr: if indent_level >= 4: descr = '-' + descr for i in range(indent_level): descr = ' ' + descr for i in range(len(self.strip_formatting_tags(descr)), 60): # uitvullen tot 50 positities descr += ' ' # divmod(elapsedTime.total_seconds(), 60) if descr and self.logger: if global_time_descr: if rowcount >= 0: descr = '{0} <lightgray>executed on {1} rows in {2} ({3} since start) {4}</>'.format(descr, rowcount, self.time_descr(elapsed_time_since_last_log), global_time_descr, newline_str) else: descr = '{0} <lightgray>executed in {1} ({2} since start) {3}</>'.format(descr, self.time_descr(elapsed_time_since_last_log), global_time_descr, newline_str) else: descr = '{}{}'.format(descr, newline_str) self.logger.info(self.strip_formatting_tags(descr)) if self.to_console: Logger.pprint(descr) self.last_start_time = end_time
def get_ytvideos(query, ilogger): """ Gets either a list of videos from a playlist or a single video, using the first result of a YouTube search Args: query (str): The YouTube search query ilogger (logging.logger): The logger to log API calls to Returns: queue (list): The items obtained from the YouTube search """ queue = [] # Search YouTube search_result = ytdiscoveryapi.search().list( q=query, part="id,snippet", maxResults=1, type="video,playlist" ).execute() if not search_result["items"]: return [] # Get video/playlist title title = search_result["items"][0]["snippet"]["title"] ilogger.info("Queueing {}".format(title)) # Queue video if video if search_result["items"][0]["id"]["kind"] == "youtube#video": # Get ID of video videoid = search_result["items"][0]["id"]["videoId"] # Append video to queue queue.append(["https://www.youtube.com/watch?v={}".format(videoid), title]) # Queue playlist if playlist elif search_result["items"][0]["id"]["kind"] == "youtube#playlist": queue = get_queue_from_playlist(search_result["items"][0]["id"]["playlistId"]) return queue
def make_instance_catalog(self, obsHistID, band, boundLength, outfile=None): """ Method to create instance catalogs. Parameters ---------- obsHistID : int obsHistID for the desired visit from the opsim db file. band : str Desired LSST filter to use, ugrizy. boundLength : float Radius in degrees of sky cone in which to produce objects. outfile : str, optional File name of the instance catalog to be produced. If None, a default name will be generated, e.g., phosim_input_0000230_r_0.3deg.txt. """ if outfile is None: outfile = 'phosim_input_%07i_%s_%.1fdeg.txt' % (obsHistID, band, boundLength) obs_md = self.gen.getObservationMetaData(obsHistID=obsHistID, boundLength=boundLength)[0] do_header = True for objid in self.star_objs: self.logger.info("processing %s", objid) db_obj = CatalogDBObject.from_objid(objid, **self.db_config) phosim_object = PhoSimCatalogPoint(db_obj, obs_metadata=obs_md) if do_header: with open(outfile, 'w') as file_obj: phosim_object.write_header(file_obj) do_header = False phosim_object.write_catalog(outfile, write_mode='a', write_header=False, chunk_size=20000) for objid in self.gal_objs: self.logger.info("processing %s", objid) db_obj = CatalogDBObject.from_objid(objid, **self.db_config) phosim_object = PhoSimCatalogSersic2D(db_obj, obs_metadata=obs_md) phosim_object.write_catalog(outfile, write_mode='a', write_header=False, chunk_size=20000)
def get_module_logger(modulename = 'experiment', loglevel = logging.INFO): """get a logging.logger instance with reasonable defaults Create a new logger and configure its name, loglevel, formatter and output stream handling. 1. initialize a logger with name from arg 'modulename' 2. set loglevel from arg 'loglevel' 3. configure matching streamhandler 4. set formatting swag 5. return the logger """ loglevels = {'debug': logging.DEBUG, 'info': logging.INFO, 'warn': logging.WARNING} if type(loglevel) is str: try: loglevel = loglevels[loglevel] except: loglevel = logging.INFO if modulename.startswith('smp_graphs'): modulename = '.'.join(modulename.split('.')[1:]) if len(modulename) > 20: modulename = modulename[-20:] # create logger logger = logging.getLogger(modulename) logger.setLevel(loglevel) # create console handler and set level to debug ch = logging.StreamHandler() ch.setLevel(loglevel) # create formatter # formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') formatter = logging.Formatter('%(levelname)8s: %(name)20s: %(message)s') # formatter = logging.Formatter('{levelname:8}s: %(name)20s: %(message)s') # formatter = logging.Formatter('%(name)s: %(levelname)s: %(message)s') # add formatter to ch ch.setFormatter(formatter) # add ch to logger logger.addHandler(ch) # suppress double log output logger.propagate = False return logger # function composition # https://mathieularose.com/function-composition-in-python/