我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用logging.log()。
def init_logger(self, args): level = logging.INFO if args.verbose: level = logging.VERBOSE if args.debug: level = logging.DEBUG logging.basicConfig(format='%(asctime)s [%(levelname)s] %(message)s', level=level) Rthandler = RotatingFileHandler('arbitrage.log', maxBytes=100*1024*1024,backupCount=10) Rthandler.setLevel(level) formatter = logging.Formatter('%(asctime)-12s [%(levelname)s] %(message)s') Rthandler.setFormatter(formatter) logging.getLogger('').addHandler(Rthandler) logging.getLogger("requests").setLevel(logging.WARNING) logging.getLogger("urllib3").setLevel(logging.WARNING)
def download(url, name, path): print('Downloading: {0}'.format(name)) r = requests.get(url, stream=True) if r.status_code != requests.codes.ok: logging.log(level=logging.ERROR, msg='Unable to connect {0}'.format(url)) r.raise_for_status() total_size = int(r.headers.get('Content-Length')) dir_name = os.path.dirname(path) temp_name = path + '.temp' if not os.path.exists(dir_name): os.makedirs(dir_name) if os.path.exists(temp_name): os.remove(temp_name) with click.progressbar(r.iter_content(1024), length=total_size) as bar, open(temp_name, 'wb') as file: for chunk in bar: file.write(chunk) bar.update(len(chunk)) os.rename(temp_name, path)
def parse(self, response): item = response.css('div.listBox ul li ') hrefs = item.css('div.listimg a::attr(href)').extract() # titles = item.css('div.listInfo h3 p::text').extract() # logging.log(logging.INFO, "parse " + len(hrefs)) # ???????????????????????parse_movie?? for href in hrefs: # logging.log(logging.INFO, "hrefs[" + index + "]=" + href) try: yield scrapy.Request(response.urljoin(href), callback=self.parse_movie) except Exception as e: continue # ???????? next_page_str = u'???' rex = '//div[@class="pagebox"]/a[contains(text(), "%s")]/@href' % next_page_str next_page = response.xpath(rex).extract_first() # ???????????????????????????????????????????? if next_page is not None: next_page = response.urljoin(next_page) yield scrapy.Request(next_page, callback=self.parse)
def open_spider(self, spider): logging.log(logging.INFO, "open_spider") self.con = Connector.connect(**self.config) self.cursor = self.con.cursor()
def clean_hints(p, hints, strict_lvl, warnings): # # fix some common defects in the hints # # don't allow a redundant 'package:' or 'package - ' at start of sdesc # # match case-insensitively, and use a base package name (trim off any # leading 'lib' from package name, remove any soversion or 'devel' # suffix) # if 'sdesc' in hints: colon = re.match(r'^"(.*?)(\s*:|\s+-)', hints['sdesc']) if colon: package_basename = re.sub(r'^lib(.*?)(|-devel|\d*)$', r'\1', p) if package_basename.upper().startswith(colon.group(1).upper()): logging.log(strict_lvl, "package '%s' sdesc starts with '%s'; this is redundant as the UI will show both the package name and sdesc" % (p, ''.join(colon.group(1, 2)))) warnings = True return warnings # # read a single package #
def handleHeader(self, key, value): logging.log(self.getLogLevel(), "Got server header: %s:%s" % (key, value)) if (key.lower() == 'location'): value = self.replaceSecureLinks(value) if (key.lower() == 'content-type'): if (value.find('image') != -1): self.isImageRequest = True logging.debug("Response is image content, not scanning...") if (key.lower() == 'content-encoding'): if (value.find('gzip') != -1): logging.debug("Response is compressed...") self.isCompressed = True elif (key.lower() == 'content-length'): self.contentLength = value elif (key.lower() == 'set-cookie'): self.client.responseHeaders.addRawHeader(key, value) elif (key.lower()== 'strict-transport-security'): logging.log(self.getLogLevel(), "LEO Erasing Strict Transport Security....") else: self.client.setHeader(key, value)
def handleHeader(self, key, value): logging.log(self.getLogLevel(), "Got server header: %s:%s" % (key, value)) if (key.lower() == 'location'): value = self.replaceSecureLinks(value) if (key.lower() == 'content-type'): if (value.find('image') != -1): self.isImageRequest = True logging.debug("Response is image content, not scanning...") if (key.lower() == 'content-encoding'): if (value.find('gzip') != -1): logging.debug("Response is compressed...") self.isCompressed = True elif (key.lower() == 'content-length'): self.contentLength = value elif (key.lower() == 'set-cookie'): self.client.responseHeaders.addRawHeader(key, value) else: self.client.setHeader(key, value)
def __AddFilterToQuery(self, identifier, condition, value, query): """Add a filter condition to a query based on the inputs. Args: identifier: name of the property (or self.__ANCESTOR for ancestors) condition: test condition value: test value passed from the caller query: query to add the filter to """ if identifier != self.__ANCESTOR: filter_condition = '%s %s' % (identifier, condition) logging.log(LOG_LEVEL, 'Setting filter on "%s" with value "%s"', filter_condition, value.__class__) datastore._AddOrAppend(query, filter_condition, value) else: logging.log(LOG_LEVEL, 'Setting ancestor query for ancestor %s', value) query.Ancestor(value)
def __AcceptRegex(self, regex): """Advance and return the symbol if the next symbol matches the regex. Args: regex: the compiled regular expression to attempt acceptance on. Returns: The first group in the expression to allow for convenient access to simple matches. Requires () around some objects in the regex. None if no match is found. """ if self.__next_symbol < len(self.__symbols): match_symbol = self.__symbols[self.__next_symbol] logging.log(LOG_LEVEL, '\taccept %s on symbol %s', regex, match_symbol) match = regex.match(match_symbol) if match: self.__next_symbol += 1 if match.groups(): matched_string = match.group(1) logging.log(LOG_LEVEL, '\taccepted %s', matched_string) return matched_string return None
def __Identifier(self): """Consume an identifier and return it. Returns: The identifier string. If quoted, the surrounding quotes are stripped. """ logging.log(LOG_LEVEL, 'Try Identifier') identifier = self.__AcceptRegex(self.__identifier_regex) if identifier: if identifier.upper() in self.__active_reserved_words: self.__next_symbol -= 1 self.__Error('Identifier is a reserved keyword') else: identifier = self.__AcceptRegex(self.__quoted_identifier_regex) if identifier: identifier = identifier[1:-1].replace('""', '"') return identifier
def __Reference(self): """Consume a parameter reference and return it. Consumes a reference to a positional parameter (:1) or a named parameter (:email). Only consumes a single reference (not lists). Returns: The name of the reference (integer for positional parameters or string for named parameters) to a bind-time parameter. """ logging.log(LOG_LEVEL, 'Try Reference') reference = self.__AcceptRegex(self.__ordinal_regex) if reference: return int(reference) else: reference = self.__AcceptRegex(self.__named_regex) if reference: return reference return None
def __OrderList(self): """Consume variables and sort order for ORDER BY clause.""" identifier = self.__Identifier() if identifier: if self.__Accept('DESC'): self.__orderings.append((identifier, datastore.Query.DESCENDING)) elif self.__Accept('ASC'): self.__orderings.append((identifier, datastore.Query.ASCENDING)) else: self.__orderings.append((identifier, datastore.Query.ASCENDING)) else: self.__Error('Invalid ORDER BY Property') logging.log(LOG_LEVEL, self.__orderings) if self.__Accept(','): return self.__OrderList() return self.__Limit()
def __Offset(self): """Consume the OFFSET clause.""" if self.__Accept('OFFSET'): if self.__offset != -1: self.__Error('Offset already defined in LIMIT clause') offset = self.__AcceptRegex(self.__number_regex) if offset: self.__offset = int(offset) if self.__offset < 0: self.__Error('Bad offset in OFFSET clause') else: logging.log(LOG_LEVEL, 'Set offset to %i', self.__offset) else: self.__Error('Non-number offset in OFFSET clause') return self.__Hint()
def _last_page_number_in_search(response): """Get last page number of search results.""" try: # to get the last page number last_page = response.xpath('//ul[@data-id="SearchResultsPagination"]/li[last()-1]/a/div/text()').extract() return int(last_page[0]) except IndexError: # if there is no page number # get the reason from the page reason = response.xpath('//p[@class="text-lead"]/text()').extract() # and if it contains the key words set last page equal to 0 if reason and ('find any results that matched your criteria' in reason[0]): logging.log(logging.DEBUG, 'No results on page' + response.url) return 0 else: # otherwise we can conclude that the page # has results but that there is only one page. return 1
def kill_ports(ports): for port in ports: log('kill %s start' % port) popen = subprocess.Popen('lsof -i:%s' % port, shell = True, stdout = subprocess.PIPE) (data, err) = popen.communicate() log('data:\n%s \nerr:\n%s' % (data, err)) pattern = re.compile(r'\b\d+\b', re.S) pids = re.findall(pattern, data) log('pids:%s' % str(pids)) for pid in pids: if pid != '' and pid != None: try: log('pid:%s' % pid) popen = subprocess.Popen('kill -9 %s' % pid, shell = True, stdout = subprocess.PIPE) (data, err) = popen.communicate() log('data:\n%s \nerr:\n%s' % (data, err)) except Exception, e: log('kill_ports exception:%s' % e) log('kill %s finish' % port) time.sleep(1)
def Resolve(self, env, resolved_params): logging.log(1, 'Resolving ' + self.name) # TODO(byungchul): Support names in different modules. states = env['_current_module'].states if self.name not in states: did_you_mean = stl.levenshtein.closest_candidate(self.name, states.keys()) raise NameError('Cannot find a state to expand: %s. Did you mean %s?' % (self.name, did_you_mean)) found = states[self.name] if len(self.param_values) != len(found.params): raise TypeError('Wrong number of parameters: %s. ' 'Found %d params, expected %d params.' % (found.name, len(found.params), len(self.param_values))) resolved_state = StateResolved(self.name, found) for v in self.param_values: resolved_state.resolved_params.append(v.Resolve(env, resolved_params)) for v in found.values: if self.value == v: return StateValue(resolved_state, v) did_you_mean = stl.levenshtein.closest_candidate(self.value, found.values) raise NameError('Invalid value in state %s: %s. Did you mean %s?' % (self.name, self.value, did_you_mean))
def Match(self, encoded): """Whether or not |encoded| is compatible with this message instance. If |encoded| has all required fields, and values of all fields are same to those of this message instance, it is compatible. Otherwise, i.e 1) it doesn't have some required fields 2) it has some values of fields different from specified in |value_dict| of this message instance Args: encoded: A string expected to be encoded with same encoding method of this message instance. Returns: Whether or not |encoded| is compatible with this message instance. """ logging.log(1, 'Decoding %s: %s', self.name, encoded) decoded = self.msg.encoding.ParseFromString(encoded, self.msg) logging.info('Matching message value:\nExpected: %s\nActual: %s\n', self.value_dict_or_array, decoded) return MessageValue._MatchValue(self.value_dict_or_array, decoded)
def log( cls, level, message, caller = None ): if not cls.logger: cls.instantiate( logLevel = app.config['LEVELOFLOG'] ) try: if level not in logging._levelNames: cls.log( "ERROR", 'Invalid file level \'%s\''%( level ) ) logLevel = logging._levelNames[level] if not caller: callers = Log.getCallers( inspect.stack() ) else: callers = caller message = '%s.%s - %s'%( callers[0], callers[1] , message ) cls.logger.log( logLevel, message ) except Exception, e: print 'Unable to record the log. Error: %s'%( e )
def teardown_integration_test(self): """ Stops the broker running in the separate process. :return: """ try: stop_broker() if self.broker_process is not None: self.broker_process.join(timeout=_BROKER_SHUTDOWN_TIMEOUT_TIME) except Exception as _: logging.log(msg="Broker didn't shut down. Killing broker process.", level=logging.WARNING) self.broker_process.terminate() self.broker_process.join(timeout=_BROKER_SHUTDOWN_TIMEOUT_TIME) if self.broker_process.is_alive(): logging.log(msg="Broker won't terminate. Integration test exiting.", level=logging.ERROR) sys.exit(1) self.broker_process = None
def click(self, level=None): try: resp = self.sess.get(self.coupon_url, timeout=5) if level != None: soup = bs4.BeautifulSoup(resp.text, "html.parser") tag1 = soup.select('title') tag2 = soup.select('div.content') if len(tag2): logging.log(level, u'{}'.format(tag2[0].text.strip(' \t\r\n'))) else: if len(tag1): logging.log(level, u'{}'.format(tag1[0].text.strip(' \t\r\n'))) else: logging.log(level, u'????') except Exception, e: if level != None: logging.log(level, 'Exp {0} : {1}'.format(FuncName(), e)) return 0 else: return 1
def dir_from_output(output): """Get library directory based on the output of clang. Args: output (str): raw output from clang Returns: str: path to folder with libclang """ log.debug("real output: %s", output) if platform.system() == "Darwin": # [HACK] uh... I'm not sure why it happens like this... folder_to_search = path.join(output, '..', '..') log.debug("folder to search: %s", folder_to_search) return folder_to_search elif platform.system() == "Windows": log.debug("architecture: %s", platform.architecture()) return path.normpath(output) elif platform.system() == "Linux": return path.normpath(path.dirname(output)) return None
def test_cont_feature(self, query, feature, epsilon, min_val=None, max_val=None): """ FInd splits on a continuous feature """ if min_val is None: min_val = feature.min_val if max_val is None: max_val = feature.max_val query_max = make_query(query, feature.name, max_val) max_id = self.predict(query_max) query_min = make_query(query, feature.name, min_val) min_id = self.predict(query_min) logging.log(DEBUG, '\tmin val {} got {}'.format(min_val, min_id)) logging.log(DEBUG, '\tmax val {} got {}'.format(max_val, max_id)) # search for any splitting thresholds thresholds = sorted(self.line_search(query, feature.name, min_val, max_val, min_id, max_id, epsilon)) logging.log(DEBUG, '\tthresholds: {}'.format(thresholds)) return thresholds
def test_cat_feature(self, query, feature, categories=None): """ Find splits on a categorical feature """ if not categories: categories = feature.vals # map of a leaf's ID to all the values that lead to it cat_ids = {} for val in categories: # test each value one after the other query_cat = make_query(query, feature.name, val) cat_id = self.predict(query_cat) logging.log(DEBUG, '\t val {} got {}'.format(val, cat_id)) if cat_id in cat_ids: cat_ids[cat_id].append(val) else: cat_ids[cat_id] = [val] return cat_ids
def merge_all_preds(self, preds): """ Attempt to merge predicate paths for a given leaf identity """ merged = [] while preds: pred1 = preds.pop() found_merge = False for pred2 in copy(preds): try: pred3 = self.merge_preds(pred1, pred2) logging.log(DEBUG, 'merged to {}'.format(pred3)) if pred3 == pred1 or pred3 == pred2: logging.log(DEBUG, 'no new merge...') continue preds += [pred3] found_merge = True except ValueError: pass if not found_merge: merged += [pred1] return merged
def _trace(msg, *args, **kw): logging.log(logging.TRACE, msg, *args, **kw)
def trace(self, msg, *args, **kw): self.log(logging.TRACE, msg, *args, **kw)
def check_call(*args, **kwargs): logging.log(logging.INFO, "subprocess check_call: %s" % " ".join(*args)) return subprocess.check_call(*args, **kwargs)
def call(*args, **kwargs): logging.log(logging.INFO, "subprocess call: %s" % " ".join(*args)) return subprocess.call(*args, **kwargs)
def log_info(text): logging.log(APP_LEVEL, text)
def parse(self, response): content_info = response.css('div.contentinfo') symbol1 = u'?' symbol2 = u'?' rex = r'%s(.*)%s' % (symbol1, symbol2) logging.log(logging.INFO, "rex=" + rex) title = content_info.css('h1 a::text').re_first(rex) logging.log(logging.INFO, title)
def parse_movie(self, response): content_info = response.css('div.contentinfo') movie = Movie() symbol1 = u'?' symbol2 = u'?' rex = r'%s(.*)%s' % (symbol1, symbol2) movie['title'] = content_info.css('h1 a::text').re_first(rex) # logging.log(logging.INFO, "parse_movie " + movie['title']) text = content_info.css('div#text') t_msg_font = text.css('div.t_msgfont') if len(t_msg_font) > 0: movie['cover'] = t_msg_font.css(' img::attr(src)').extract_first() movie['detail'] = self.parse_detail(t_msg_font.css('::text')) else: movie['cover'] = text.css(' p img::attr(src)').extract_first() movie['detail'] = self.parse_detail(text.css(' p::text')) thumbnails = text.css(' p img::attr(src)').extract() if movie['cover'] in thumbnails: thumbnails.remove(movie['cover']) movie['thumbnails'] = thumbnails download_links = text.css(' table tbody tr td a') download_links_array = [] for link_item in download_links: download_link = DownloadLink() download_link['title'] = link_item.css('::text').extract_first() download_link['link'] = link_item.css('::attr(href)').extract_first(); download_links_array.append(dict(download_link)) movie['download_links'] = download_links_array return movie