我们从Python开源项目中,提取了以下18个代码示例,用于说明如何使用settings.DEBUG。
def parse_xbee_rf_data(data): if DEBUG: print data node_addr = binascii.b2a_hex(data['source_addr_long']) + binascii.b2a_hex((data['source_addr'])) readings = re.findall("{.*?}", data['rf_data']) # returns a list of K-V pair matches payload = {'node': node_addr} for reading in readings: item_dict = json.loads(reading) for k, v in item_dict.iteritems(): sensor_name = k.encode('utf-8') sensor_value = v.encode('utf-8') if sensor_name == "temp:": sensor_name = "temp" payload['sensor'] = sensor_name payload['val'] = sensor_value # now pass the payload to the RequestBuilder and send it print payload if PRODUCTION: sendHTTPPost(payload)
def ask_auth(*dargs, fail_callback=None, pass_performer=False): """ Decorator to ask for authorization """ def decorator(func): """ Decorator wrapper """ def wrapper(*args, **kwargs): """ Wrapper """ if settings.DEBUG: func(*args, **kwargs) return prompt = AuthPromptWindow(dargs) if prompt.is_authorized: if pass_performer: kwargs["_performer"] = prompt.user func(*args, **kwargs) else: if prompt.show_error: gui.utils.error("Error", "Erreur d'authentification") if fail_callback is not None: fail_callback() return wrapper return decorator
def posts_fetcher(self): while True: for bot in self.bots: try: if settings.DEBUG: print('Checking for news {} on bot type {}-{}'.format(datetime.datetime.utcnow(), bot.web_type, bot.target_url)) chat_id = bot.chat_id or self.chat posts = bot.save_last_updates(chat_id) if settings.DEBUG: print('-- {} New posts saved found'.format(len(posts))) except Exception as e: exc_type, exc_value, exc_traceback = sys.exc_info() error_stack = 'Save_updates ERROR: {}\n'.format(e) error_stack += "".join(traceback.format_exception(exc_type, exc_value, exc_traceback)) self.send_error(error_stack) gevent.sleep(self.wait_seconds)
def pre_schedule_hook(self): if not DEBUG: self.post_interval = RedditSetup.post_interval self.scheduler.run_repeating(self.store.clear_ids, interval=RedditSetup.db_clear_interval, first=0)
def main(debug=settings.DEBUG, url="127.0.0.1"): log = logging.getLogger('werkzeug') log.setLevel(logging.DEBUG) file_handler = logging.FileHandler("log.log", "a") file_handler.setLevel(logging.DEBUG) log.addHandler(file_handler) stream_handler = logging.StreamHandler() stream_handler.setLevel(logging.DEBUG) log.addHandler(stream_handler) app.debug = debug app.run(url)
def get_token(): r = requests.post(TOKEN_URL, data=payload) print r print r.text token_dict = json.loads(r.text) token = token_dict['token'] if VERBOSE: print token_dict if DEBUG: # TODO move to tests assert(token is not None) return token_dict['token']
def create_app(loop): app = web.Application(loop=loop, debug=settings.DEBUG) setup_jinja(app, settings.DEBUG) aiohttp_session.setup(app, EncryptedCookieStorage( settings.SESSION_SECRET.encode('utf-8'), max_age=settings.SESSION_MAX_AGE)) app.middlewares.append(aiohttp_login.flash.middleware) app.router.add_get('/', handlers.index) app.router.add_get('/users/', handlers.users, name='users') app['db'] = await asyncpg.create_pool(dsn=settings.DATABASE, loop=loop) aiohttp_login.setup(app, AsyncpgStorage(app['db']), settings.AUTH) return app
def __call__(self, method): from functools import wraps @wraps(method) def timed(*args, **kw): if self.logger.level <= logging.DEBUG: ts = time.time() result = method(*args, **kw) te = time.time() print_args = False if print_args: self.logger.debug('Ran %r (%s, %r) in %2.2fs' % (method.__name__, ", ".join(["%s" % (a,) for a in args]), kw, te - ts)) else: self.logger.debug('Ran %r in %2.2fs' % (method.__name__, te - ts)) return result else: return method(*args, **kw) return timed
def __init__(self, label = ""): # Avoid importing this at module scope in order # to co-habit with chroma_settings() from django.db import connection self.connection = connection self.label = label self.logger.disabled = not self.enabled if self.enabled and not len(self.logger.handlers): self.logger.setLevel(logging.DEBUG) self.logger.addHandler(logging.FileHandler('dbperf.log'))
def __enter__(self): if settings.DEBUG: self.t_initial = time.time() self.q_initial = len(self.connection.queries)
def test_debug(self): """ Testing debug """ settings.DEBUG = True self.func() self.assertTrue(self.func_called) settings.DEBUG = False
def init(): global engine loop = asyncio.get_event_loop() engine = loop.run_until_complete(create_engine(DSN, echo=settings.DEBUG))
def jsonify_(data): keycase = settings.JSON_KEYCASE if keycase: try: casefunc = getattr(stringcase, keycase) data = keycase_convert(data, casefunc) except AttributeError: log.warning(u'%s keycase is not supported, response default json. ' u'Supported keycase: %s' % (keycase, get_support_keycase())) if settings.DEBUG: js = json.dumps(data, ensure_ascii=False, indent=4) else: js = json.dumps(data, ensure_ascii=False, separators=[',', ':']) return Response(js, mimetype='application/json')
def register_decorators_on_module_funcs(modules, decorators): '''?decorator?????module?????? ?????__nodeco__???False??????????? ??????????? eg: def func(): pass func.__nodeco__ = True ''' if not isinstance(modules, (list, tuple)): modules = [modules] if not isinstance(decorators, (list, tuple)): decorators = [decorators] for m in modules: for funcname, func in vars(m).iteritems(): if (isinstance(func, types.FunctionType) and not funcname.startswith('_') and func.__module__ == m.__name__): if getattr(func, '__nodeco__', False): continue for deco in decorators: if settings.DEBUG: log.debug('register %s on %s.%s' % (deco.__name__, m.__name__, funcname)) func = deco(func) vars(m)[funcname] = func
def get( ID, cursor ) : if DEBUG : print " >>> running piper_pickledb get " return cursor.get( ID ) ######### # EOF # #########
def get( ID, cursor ) : if DEBUG : print " >>> running piper_mongodb get " return cursor.find_one( { "_id" : ID } ) ######### # EOF # #########
def telegram_posts_sender(self): while True: time_to_wait = (self.wait_seconds // 3) + 1 posts_to_sent = Crawler.get_post_to_send() grouped_posts = groupby(posts_to_sent, key=lambda x: x.to_send_id) pending_msgs_to_send = False for chat_id, posts in grouped_posts: pending_msgs_to_send = True posts = list(posts)[:self.post_chunk] if settings.DEBUG: print('Sending {} new posts to chat {}'.format(len(posts), chat_id)) for post in posts: try: self.telegram.sendMessage(chat_id=chat_id, text=post.description) if post.image: try: self.telegram.sendPhoto(chat_id=chat_id, photo=post.image) except BadRequest: self.send_error('ERROR sending picture to {}'.format(post)) post.status = 'SENT' except RetryAfter as e: # Flood control exceeded. Retry in 175 seconds self.send_error('RetryAfter error, waiting {} seconds'.format(e.retry_after)) time_to_wait = max(e.retry_after, time_to_wait) post.status = 'ERROR' except Exception as e: exc_type, exc_value, exc_traceback = sys.exc_info() error_stack = 'Send_updates ERROR: {}\n'.format(e) error_stack += "".join(traceback.format_exception(exc_type, exc_value, exc_traceback)) self.send_error(error_stack) post.status = 'ERROR' Crawler.save_posts(posts) if pending_msgs_to_send: sleep_time = 3 else: sleep_time = time_to_wait gevent.sleep(sleep_time)
def handle(self, *args, **kwargs): """ Generate config files for running nginx, and send the nginx command line to stdout. The reason for sending the command line to stdout instead of just running it is so that supervisord can directly manage the resulting nginx process (otherwise we would have to handle passing signals through). """ from chroma_core.lib.util import site_dir # This command is only for development assert settings.DEBUG SITE_ROOT = site_dir() join_site_root = partial(os.path.join, SITE_ROOT) DEV_NGINX_DIR = join_site_root("dev_nginx") join_nginx_dir = partial(join_site_root, DEV_NGINX_DIR) NGINX_CONF_TEMPLATE = join_site_root("nginx.conf.template") NGINX_CONF = join_nginx_dir("nginx.conf") CHROMA_MANAGER_CONF_TEMPLATE = join_site_root("chroma-manager.conf.template") CHROMA_MANAGER_CONF = join_nginx_dir("chroma-manager.conf") if not os.path.exists(DEV_NGINX_DIR): os.makedirs(DEV_NGINX_DIR) def write_conf(template_path, conf_path): conf_text = Template(open(template_path).read()).render(Context({ 'var': DEV_NGINX_DIR, 'log': SITE_ROOT, 'SSL_PATH': settings.SSL_PATH, 'APP_PATH': settings.APP_PATH, 'REPO_PATH': settings.DEV_REPO_PATH, 'HTTP_FRONTEND_PORT': settings.HTTP_FRONTEND_PORT, 'HTTPS_FRONTEND_PORT': settings.HTTPS_FRONTEND_PORT, 'HTTP_AGENT_PORT': settings.HTTP_AGENT_PORT, 'HTTP_API_PORT': settings.HTTP_API_PORT, 'REALTIME_PORT': settings.REALTIME_PORT, 'VIEW_SERVER_PORT': settings.VIEW_SERVER_PORT })) open(conf_path, 'w').write(conf_text) write_conf(NGINX_CONF_TEMPLATE, NGINX_CONF) write_conf(CHROMA_MANAGER_CONF_TEMPLATE, CHROMA_MANAGER_CONF) print " ".join([self._nginx_path, "-c", NGINX_CONF])