我们从Python开源项目中,提取了以下40个代码示例,用于说明如何使用flask.current_app.debug()。
def logging_levels(): """ Context manager to conditionally set logging levels. Supports setting per-request debug logging using the `X-Request-Debug` header. """ enabled = strtobool(request.headers.get("x-request-debug", "false")) level = None try: if enabled: level = getLogger().getEffectiveLevel() getLogger().setLevel(DEBUG) yield finally: if enabled: getLogger().setLevel(level)
def capture_request(self): if not current_app.debug: # only capture request body on debug return if not self.options.include_request_body: # only capture request body if requested return if ( request.content_length and self.options.include_request_body is not True and request.content_length >= self.options.include_request_body ): # don't capture request body if it's too large return if not request.get_json(force=True, silent=True): # only capture request body if json return self.request_body = request.get_json(force=True)
def redirect_to_ssl(self): """Redirect incoming requests to HTTPS.""" # Should we redirect? criteria = [ request.is_secure, current_app.debug, request.headers.get('X-Forwarded-Proto', 'http') == 'https' ] if not any(criteria) and not self.skip: if request.url.startswith('http://'): url = request.url.replace('http://', 'https://', 1) code = 302 if self.permanent: code = 301 r = redirect(url, code=code) return r
def dbuser_add(arg_userid, arg_password, arg_email): global dbconn if app.debug: app.logger.debug("dbuser_add: arg_userid=%s, arg_email=%s", arg_userid, arg_email) try: dbcursor = dbconn.cursor() stamp = epoch2dbfmt(time.time()) dbcursor.execute("INSERT INTO {tn} ('{cn1}', '{cn2}','{cn3}','{cn4}') VALUES ('{cv1}','{cv2}','{cv3}','{cv4}')" \ .format(tn=ddl.TBL_USER, \ cn1=ddl.FLD_USER_ID, cv1=arg_userid, \ cn2=ddl.FLD_USER_PSWD, cv2=arg_password, \ cn3=ddl.FLD_USER_EMAIL, cv3=arg_email, \ cn4=ddl.FLD_USER_TSTAMP, cv4=stamp)) dbconn.commit() except sqlite3.Error as e: app.logger.error("dbuser_add: INSERT {%s,%s} failed, reason: {%s}", arg_userid, arg_email, repr(e)) return False # Success return True #===================== # Remove a user record #=====================
def dbuser_remove(arg_userid): global dbconn if app.debug: app.logger.debug("dbuser_remove: arg_userid=%s", arg_userid) try: dbcursor = dbconn.cursor() dbcursor.execute("DELETE FROM {tn} WHERE {cn1}='{cv1}'" \ .format(tn=ddl.TBL_USER, \ cn1=ddl.FLD_USER_ID, cv1=arg_userid)) dbconn.commit() except sqlite3.Error as e: app.logger.error("dbuser_remove: DELETE {%s} failed, reason: {%s}", arg_userid, repr(e)) return False # Success return True #======================== # Initialize the database #========================
def delete_memoized_verhash(self, f, *args): """ Delete the version hash associated with the function. ..warning:: Performing this operation could leave keys behind that have been created with this version hash. It is up to the application to make sure that all keys that may have been created with this version hash at least have timeouts so they will not sit orphaned in the cache backend. """ if not callable(f): raise DeprecationWarning("Deleting messages by relative name is no longer" " reliable, please use a function reference") try: self._memoize_version(f, delete=True) except Exception: if current_app.debug: raise logger.exception("Exception possibly due to cache backend.")
def convert_to_json(data): ''' Encode the data as JSON -- taken from flask_restplus.representations.output_json -- updated to clean the dictionary of nulls. ''' settings = current_app.config.get('RESTPLUS_JSON', {}) # If we're in debug mode, and the indent is not set, we set it to a # reasonable value here. Note that this won't override any existing value # that was set. We also set the "sort_keys" value. if current_app.debug: settings.setdefault('indent', 4) settings.setdefault('sort_keys', True) # always end the json dumps with a new line # see https://github.com/mitsuhiko/flask/pull/1262 dumped = dumps(cleandict(data), **settings) + "\n" return dumped
def redirect_to_ssl(self): """ Redirect incoming requests to HTTPS. """ criteria = [ request.is_secure, current_app.debug, current_app.testing, request.headers.get('X-Forwarded-Proto', 'http') == 'https' ] if request.headers.get('User-Agent', '').lower().startswith(self.exclude_user_agents): return if not any(criteria): if request.url.startswith('http://'): url = request.url.replace('http://', 'https://', 1) r = redirect(url, code=301) return r
def worker(channel, queue, token, repo_ids=None, build_ids=None): allowed_repo_ids = frozenset(token['repo_ids']) while (await channel.wait_message()): msg = await channel.get_json() data = msg.get('data') if data['repository']['id'] not in allowed_repo_ids: continue if build_ids and data['id'] not in build_ids: continue if repo_ids and data['repository']['id'] not in repo_ids: continue evt = Event( msg.get('id'), msg.get('event'), data, ) await queue.put(evt) current_app.logger.debug( 'pubsub.event.received qsize=%s', queue.qsize()) # @log_errors
def catch_parade_error(func): def wrapper(*args, **kw): try: return func(*args, **kw) except ParadeError as e: if current_app.debug: exc_type, exc_value, exc_traceback = sys.exc_info() stack_info = traceback.format_exception(exc_type, exc_value, exc_traceback) abort(e.status, code=e.code, message=e.reason, traceback=stack_info) else: abort(e.status, code=e.code, message=e.reason) except Exception as e: if current_app.debug: exc_type, exc_value, exc_traceback = sys.exc_info() stack_info = traceback.format_exception(exc_type, exc_value, exc_traceback) abort(500, code=0, message=str(e), traceback=stack_info) else: abort(500, code=0, message=str(e)) return wrapper
def output_json(data, code, headers=None): """Makes a Flask response with a JSON encoded body""" settings = current_app.config.get('RESTFUL_JSON', {}) # If we're in debug mode, and the indent is not set, we set it to a # reasonable value here. Note that this won't override any existing value # that was set. We also set the "sort_keys" value. if current_app.debug: settings.setdefault('indent', 4) settings.setdefault('sort_keys', True) # always end the json dumps with a new line # see https://github.com/mitsuhiko/flask/pull/1262 dumped = dumps(data, **settings) + "\n" resp = make_response(dumped, code) resp.headers.extend(headers or {}) return resp
def setup_client(client): """ Attach handlers to the clients """ #log.debug('setup_client {}'.format(client.clientId)) client.register(handlers.connection_handler, 'ManagedAccounts', 'NextValidId') client.register(handlers.history_handler, 'HistoricalData') client.register(handlers.order_handler, 'OpenOrder', 'OrderStatus', 'OpenOrderEnd') client.register(handlers.portfolio_positions_handler, 'Position', 'PositionEnd') client.register(handlers.account_summary_handler, 'AccountSummary', 'AccountSummaryEnd') client.register(handlers.account_update_handler, 'UpdateAccountTime', 'UpdateAccountValue', 'UpdatePortfolio', 'AccountDownloadEnd') client.register(handlers.contract_handler, 'ContractDetails') client.register(handlers.executions_handler, 'ExecDetails', 'ExecDetailsEnd', 'CommissionsReport') client.register(handlers.error_handler, 'Error') # Add handlers for feeds client.register(handlers.market_handler, 'TickSize', 'TickPrice') # For easier debugging, register all messages with the generic handler # client.registerAll(handlers.generic_handler) # Be sure we're in a disconnected state client.disconnect()
def output_json(data, code, headers=None): """Makes a Flask response with a JSON encoded body""" settings = current_app.config.get('RESTFUL_JSON', {}) # If we're in debug mode, and the indent is not set, we set it to a # reasonable value here. Note that this won't override any existing value # that was set. We also set the "sort_keys" value. if current_app.debug: settings.setdefault('indent', 4) settings.setdefault('sort_keys', False) # always end the json dumps with a new line # see https://github.com/mitsuhiko/flask/pull/1262 dumped = dumps(data, **settings) + "\n" resp = make_response(dumped, code) # resp.headers.extend(headers or {'Content-Type':'application/json'}) # Always return as JSON resp.headers['Content-Type'] = 'application/json' return resp
def log(self, logger): if self.status_code == 500: # something actually went wrong; investigate dct = self.to_dict() if current_app.debug or current_app.testing: message = dct.pop("message") logger.warning(message, extra=dct, exc_info=True) else: logger.warning(dct) else: # usually log at INFO; a raised exception can be an error or expected behavior (e.g. 404) logger.info(self.to_dict())
def capture_response(self, response): self.success = True body, self.status_code, self.response_headers = parse_response(response) if not current_app.debug: # only capture responsebody on debug return if not self.options.include_response_body: # only capture response body if requested return if not body: # only capture request body if there is one return if ( self.options.include_response_body is not True and len(body) >= self.options.include_response_body ): # don't capture response body if it's too large return try: self.response_body = loads(body) except (TypeError, ValueError): # not json pass
def verify_email_recipient(arg_recipient): if app.debug: app.logger.debug("verify_email_recipient: email recipient = %s", arg_recipient) # Inspect email address result = re.match('^[_a-z0-9-]+(\.[_a-z0-9-]+)*@[a-z0-9-]+(\.[a-z0-9-]+)*(\.[a-z]{2,4})$', arg_recipient) if result == None: if app.debug: app.logger.debug("verify_email_recipient: Not an email address: %s", arg_recipient) return False # Extract domain name from arg_recipient pieces = arg_recipient.split("@") if len(pieces) != 2: if app.debug: app.logger.debug("verify_email_recipient: Did not split into 2 pieces: %s", arg_recipient) return False domain = pieces[1] if app.debug: app.logger.debug("verify_email_recipient: email domain = %s", domain) # Get MX record for target domain try: records = dns.resolver.query(domain, 'MX') mxRecord = str(records[0].exchange) except: if app.debug: app.logger.debug("verify_email_recipient: DNS MX-query exception with %s", domain) return False if app.debug: app.logger.debug("verify_email_recipient: DNS MX record = %s", mxRecord) return True #====================================== # Convert epoch time to database format #======================================
def _get_wrap(self, node, classes='form-group'): # add required class, which strictly speaking isn't bootstrap, but # a common enough customization if node.flags.required: classes += ' required' div = tags.div(_class=classes) if current_app.debug: div.add(tags.comment(' Field: {} ({}) '.format( node.name, node.__class__.__name__))) return div
def logger(node=None): ''' ''' data = request.get_json() log_type = data['log_type'] log_level = current_app.config['DOORMAN_MINIMUM_OSQUERY_LOG_LEVEL'] if current_app.debug: current_app.logger.debug(json.dumps(data, indent=2)) if log_type == 'status': log_tee.handle_status(data, host_identifier=node.host_identifier) status_logs = [] for item in data.get('data', []): if int(item['severity']) < log_level: continue status_logs.append(StatusLog(node_id=node.id, **item)) else: db.session.add(node) db.session.bulk_save_objects(status_logs) db.session.commit() elif log_type == 'result': db.session.add(node) db.session.bulk_save_objects(process_result(data, node)) db.session.commit() log_tee.handle_result(data, host_identifier=node.host_identifier) analyze_result.delay(data, node.to_dict()) else: current_app.logger.error("%s - Unknown log_type %r", request.remote_addr, log_type ) current_app.logger.info(json.dumps(data)) # still need to write last_checkin, last_ip db.session.add(node) db.session.commit() return jsonify(node_invalid=False)
def handle_netapp_exception(error): '''Return the error message from the filer and 500 status code''' return_message = {'message': error.msg, "errno": error.errno} if current_app.debug: return_message['failing_query'] = str(error.failing_query) return return_message, 500
def dbgdump(obj, default=None, cls=None): if current_app.config.get('ASK_PRETTY_DEBUG_LOGS', False): indent = 2 else: indent = None msg = json.dumps(obj, indent=indent, default=default, cls=cls) logger.debug(msg)
def init_app(self, app, path='templates.yaml'): """Initializes Ask app by setting configuration variables, loading templates, and maps Ask route to a flask view. The Ask instance is given the following configuration variables by calling on Flask's configuration: `ASK_APPLICATION_ID`: Turn on application ID verification by setting this variable to an application ID or a list of allowed application IDs. By default, application ID verification is disabled and a warning is logged. This variable should be set in production to ensure requests are being sent by the applications you specify. Default: None `ASK_VERIFY_REQUESTS`: Enables or disables Alexa request verification, which ensures requests sent to your skill are from Amazon's Alexa service. This setting should not be disabled in production. It is useful for mocking JSON requests in automated tests. Default: True `ASK_VERIFY_TIMESTAMP_DEBUG`: Turn on request timestamp verification while debugging by setting this to True. Timestamp verification helps mitigate against replay attacks. It relies on the system clock being synchronized with an NTP server. This setting should not be enabled in production. Default: False `ASK_PRETTY_DEBUG_LOGS`: Add tabs and linebreaks to the Alexa request and response printed to the debug log. This improves readability when printing to the console, but breaks formatting when logging to CloudWatch. Default: False """ if self._route is None: raise TypeError("route is a required argument when app is not None") app.ask = self app.add_url_rule(self._route, view_func=self._flask_view_func, methods=['POST']) app.jinja_loader = ChoiceLoader([app.jinja_loader, YamlLoader(app, path)])
def _alexa_request(self, verify=True): raw_body = flask_request.data alexa_request_payload = json.loads(raw_body) if verify: cert_url = flask_request.headers['Signaturecertchainurl'] signature = flask_request.headers['Signature'] # load certificate - this verifies a the certificate url and format under the hood cert = verifier.load_certificate(cert_url) # verify signature verifier.verify_signature(cert, signature, raw_body) # verify timestamp raw_timestamp = alexa_request_payload.get('request', {}).get('timestamp') timestamp = self._parse_timestamp(raw_timestamp) if not current_app.debug or self.ask_verify_timestamp_debug: verifier.verify_timestamp(timestamp) # verify application id try: application_id = alexa_request_payload['session']['application']['applicationId'] except KeyError: application_id = alexa_request_payload['context'][ 'System']['application']['applicationId'] if self.ask_application_id is not None: verifier.verify_application_id(application_id, self.ask_application_id) return alexa_request_payload
def ping(loop, resp, client_guid): # periodically send ping to the browser. Any message that # starts with ":" colon ignored by a browser and could be used # as ping message. while True: await asyncio.sleep(15, loop=loop) current_app.logger.debug('pubsub.ping guid=%s', client_guid) resp.write(b': ping\r\n\r\n') # @log_errors
def build_server(loop, host, port): app = Application(loop=loop, logger=current_app.logger, debug=current_app.debug) app.router.add_route('GET', '/', stream) app.router.add_route('GET', '/healthz', health_check) return await loop.create_server(app.make_handler(), host, port)
def serve_swaggerui_assets(path): """ Swagger-UI assets serving route. """ if not current_app.debug: import warnings warnings.warn( "/swaggerui/ is recommended to be served by public-facing server (e.g. NGINX)" ) from flask import send_from_directory return send_from_directory('../static/', path)
def get_client(): """ Creates a client connection to be used with orders """ # Get client ID from our non-order pool list in memory timeout = g.timeout while g.clientId_in_use: log.debug('Waiting for clientId to become available...({})'.format(timeout)) time.sleep(0.5) timeout -= 1 client = g.client_connection # Enable logging if we're in debug mode if current_app.debug is True: client.enableLogging() # Reconnect if needed if not client.isConnected(): log.debug('Client {} not connected. Trying to reconnect...'.format(g.client_id)) client.disconnect() time.sleep(1) client.connect() # If we failed to reconnect, be sure to put our client ID back in the pool if client.isConnected() is False: raise Exception('Client cannot connect') return client
def recording_enabled(): return (current_app.debug or current_app.config.get('SQLALCHEMY_RECORD_QUERIES'))
def handle_all_exceptions(e): is_server_error = not isinstance(e, HTTPException) ret = {} error = {} ret['error'] = error if is_server_error or e.code >= 500: # Use context_id from the client if it's available, or make one if not. log_context = request.headers.get("Drift-Log-Context") log_context = json.loads(log_context) if log_context else {} context_id = log_context.get("request_id", str(uuid.uuid4()).replace("-", "")) error['context_id'] = context_id title = str(e) + " - [{}]".format(context_id) splunk_link = 'http://splunk.devnorth.dg-api.com:8000/en-US/app/search/search' splunk_link += '?q=search%20sourcetype%3D%22*%22%20%7C%20search%20{}'.format(context_id) error['link'] = splunk_link if is_server_error: # Do a traceback if caller has dev role, or we are running in debug mode. current_user = query_current_user() if (current_user and "dev" in current_user['roles']) or current_app.debug: sio = cStringIO.StringIO() ei = sys.exc_info() sio.write("%s: %s\n" % (type(e).__name__, e)) traceback.print_exception(ei[0], ei[1], ei[2], None, sio) error["traceback"] = sio.getvalue() sio.close() error['description'] = str(e) else: error['description'] = "Internal Server Error" # The exception is logged out and picked up by Splunk or comparable tool. # The 'context_id' in the title enables quick cross referencing with the # response body below. log.exception(title) ret['status_code'] = 500 ret['message'] = "Internal Server Error" error['code'] = 'internal_server_error' else: ret['status_code'] = e.code ret['message'] = e.name error['code'] = 'user_error' if e.code < 500 else 'server_error' error['description'] = e.description # Support for Flask Restful 'data' property in exceptions. if hasattr(e, 'data') and e.data: error.update(e.data) # Legacy field 'message'. If it's in the 'data' payload, rename the field # to 'description'. if 'message' in e.data: error['description'] = error.pop('message') if e.code >= 500: # It's a "soft" server error. Let's log it out. log.warning(title + " " + error['description']) return make_response(jsonify(ret), ret['status_code'])
def sign_csr(arg_userid, arg_csr_path, arg_crt_path): # csr = User CSR file in internal crypto format (result, buffer) = get_file_contents(arg_csr_path) if not result: app.logger.error("sign_csr: cannot access CSR {%s} for user {%s}, reason: {%s}", arg_csr_path, arg_userid, buffer) return False try: csr = crypto.load_certificate_request(crypto.FILETYPE_PEM, buffer) except Exception as e: app.logger.error("sign_csr: load CSR {%s} for user {%s} failed, reason: {%s}", arg_csr_path, arg_userid, repr(e)) return False # CAcertificate = CA certificate in internal crypto format (result, buffer) = get_file_contents(CA_CRT_FILE) if not result: app.logger.error("sign_csr: cannot access CA certificate {%s} for user {%s}, reason: {%s}", CA_CRT_FILE, arg_userid, repr(e)) return False try: CAcertificate = crypto.load_certificate(crypto.FILETYPE_PEM, buffer) if app.debug: app.logger.debug("sign_csr: CA cert subject = {%s}", CAcertificate.get_subject()) except Exception as e: app.logger.error("sign_csr: load CA certificate {%s} for user {%s} failed, reason: {%s}", CA_CRT_FILE, arg_userid, repr(e)) return False # CAprivatekey = CA private key in internal crypto format (result, buffer) = get_file_contents(CA_KEY_FILE) if not result: app.logger.error("sign_csr: cannot access CA private key {%s} for user {%s}, reason: {%s}", CA_KEY_FILE, arg_userid, buffer) return False try: CAprivatekey = crypto.load_privatekey(crypto.FILETYPE_PEM, buffer) except Exception as e: app.logger.error("sign_csr: load CA private key {%s} for user {%s} failed, reason: {%s}", CA_KEY_FILE, arg_userid, repr(e)) # Sign CSR, giving the CRT try: cert = crypto.X509() cert.set_serial_number(42) cert.gmtime_adj_notBefore(0) cert.gmtime_adj_notAfter(EXPIRY_PERIOD) cert.set_issuer(CAcertificate.get_subject()) subject = csr.get_subject() # will log the subject later cert.set_subject(subject) cert.set_pubkey(csr.get_pubkey()) cert.sign(CAprivatekey, DIGEST) except Exception as e: app.logger.error("sign_csr: Cannot sign CSR {%s} for user {%s}, reason: {%s}", arg_csr_path, arg_userid, repr(e)) return False # Store signed CRT try: file = open(arg_crt_path, "w") file.write(crypto.dump_certificate(crypto.FILETYPE_PEM, cert).decode('utf-8')) file.flush() os.fsync(file) file.close() except Exception as e: app.logger.error("sign_csr: Cannot store CRT {%s} for user {%s}, reason: {%s}", arg_crt_path, arg_userid, repr(e)) return False # Success app.logger.info("sign_csr: Success with CRT {%s} for user {%s}, subject={%s}", arg_csr_path, arg_userid, subject) return True
def redis_cached(timeout=None, key_prefix='view/%s', unless=None): """ ????????? :param timeout: ??????, ????????3600? :param key_prefix: ??? :param unless: ?????????? :return: ??????????? """ def decorator(f): @functools.wraps(f) # ?????? def decorated_function(*args, **kwargs): if callable(unless) and unless() is True: return f(*args, **kwargs) if kwargs.get('nocache'): return f(*args, **kwargs) # ????????? nocache ??????? try: cache_key = decorated_function.make_cache_key(*args, **kwargs) cache_key = urllib.quote(cache_key, safe='') rv = redis_get(cache_key) except Exception: if current_app.debug: raise return f(*args, **kwargs) if rv is None: rv = f(*args, **kwargs) try: redis_set(cache_key, rv, timeout=decorated_function.cache_timeout) except Exception: if current_app.debug: raise return f(*args, **kwargs) return rv def make_cache_key(*args, **kwargs): if callable(key_prefix): cache_key = key_prefix() elif '%s' in key_prefix: cache_key = key_prefix % (request.url+'_uid_'+str(current_user.get_id())) else: cache_key = key_prefix cache_key = hashlib.md5(cache_key.encode('utf-8')).hexdigest() cache_key = '_'.join((get_version(level='day'), cache_key)) return cache_key decorated_function.uncached = f decorated_function.cache_timeout = timeout decorated_function.make_cache_key = make_cache_key return decorated_function return decorator
def redis_memoize(timeout=100, make_name=None, unless=None): """ ????????? :param timeout: ??????, ??100s :param make_name: ????,???????????,???????????,?????????,?????????? :param unless: ?????????? :return: ??????????? """ def decorator(f): @functools.wraps(f) # ?????? def decorated_function(*args, **kwargs): if callable(unless) and unless() is True: return f(*args, **kwargs) if kwargs.get('nocache'): return f(*args, **kwargs) # ????????? nocache ??????? try: cache_key = decorated_function.make_cache_key(make_name, args, kwargs) rv = redis_get(cache_key) except Exception: if current_app.debug: raise return f(*args, **kwargs) if rv is None: rv = f(*args, **kwargs) try: redis_set(cache_key, rv, timeout=decorated_function.cache_timeout) except Exception: if current_app.debug: raise return f(*args, **kwargs) return rv def make_cache_key(make_name, keyargs, keykwargs): fname = f.__name__ if callable(make_name): fname = make_name(fname) if isinstance(make_name, str): fname = make_name alt_fname = '.'.join((f.__module__, fname)) try: origin_str = "{0}{1}{2}".format(alt_fname, keyargs, keykwargs) except AttributeError: origin_str = "%s%s%s" % (alt_fname, keyargs, keykwargs) cache_key = hashlib.md5(origin_str.encode('utf-8')).hexdigest() cache_key = '_'.join((get_version(level='day'), cache_key)) return cache_key decorated_function.uncached = f decorated_function.cache_timeout = timeout decorated_function.make_cache_key = make_cache_key return decorated_function return decorator
def distributed_write(node=None): ''' ''' data = request.get_json() if current_app.debug: current_app.logger.debug(json.dumps(data, indent=2)) queries = data.get('queries', {}) statuses = data.get('statuses', {}) for guid, results in queries.items(): task = DistributedQueryTask.query.filter( DistributedQueryTask.guid == guid, DistributedQueryTask.status == DistributedQueryTask.PENDING, DistributedQueryTask.node == node, ).first() if not task: current_app.logger.error( "%s - Got result for distributed query not in PENDING " "state: %s: %s", request.remote_addr, guid, json.dumps(data) ) continue # non-zero status indicates sqlite errors if not statuses.get(guid, 0): status = DistributedQueryTask.COMPLETE else: current_app.logger.error( "%s - Got non-zero status code (%d) on distributed query %s", request.remote_addr, statuses.get(guid), guid ) status = DistributedQueryTask.FAILED for columns in results: result = DistributedQueryResult( columns, distributed_query=task.distributed_query, distributed_query_task=task ) db.session.add(result) else: task.status = status db.session.add(task) else: # need to write last_checkin, last_ip on node db.session.add(node) db.session.commit() return jsonify(node_invalid=False)