我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用inspect.currentframe()。
def run_tests_if_main(show_coverage=False): """ Run tests in a given file if it is run as a script Coverage is reported for running this single test. Set show_coverage to launch the report in the web browser. """ local_vars = inspect.currentframe().f_back.f_locals if not local_vars.get('__name__', '') == '__main__': return # we are in a "__main__" os.chdir(ROOT_DIR) fname = str(local_vars['__file__']) _clear_imageio() _enable_faulthandler() pytest.main('-v -x --color=yes --cov imageio ' '--cov-config .coveragerc --cov-report html %s' % repr(fname)) if show_coverage: import webbrowser fname = os.path.join(ROOT_DIR, 'htmlcov', 'index.html') webbrowser.open_new_tab(fname)
def __ImportFrom(module_xname, xname): """ Import a specified name from a python module into the global namespace. This can be accomplished programatically and inside a method or class. @param module_xname : name of the module to load x.y.z @param xname : symbol to import from loaded module @return method/class : imported item from loaded module """ try: module = __import__(module_xname,globals(),locals(), [xname]) except ImportError: return None try: module = getattr(module,xname) if xname: for x in range( len(inspect.stack()) ): inspect.currentframe(x).f_globals[xname]=module except AttributeError as e: module=None return module
def produce(self, topic, doc, payload): """ Produce a new event. :param topic: The topic of the produced event. :param doc: The document to which the event belongs. :param payload: The file pointer beloning to the document. :type topic: ``str`` :type doc: ``gransk.core.Document`` :type payload: ``file`` """ caller = inspect.currentframe().f_back.f_locals['self'].__module__ listeners = self.listeners.get(topic, []) filename = os.path.basename(doc.path) for listener, callback in listeners: LOGGER.debug('[%s] %s -> %s (%s)', topic, caller, listener, filename) callback(doc, payload) if len(listeners) == 0: LOGGER.debug('[%s] %s -> no listeners (%s)', topic, caller, filename)
def _find_frame_by_self(clz): """ Find the first frame on stack in which there is local variable 'self' of type clz. """ frame = inspect.currentframe() while frame: self = frame.f_locals.get('self') if isinstance(self, clz): return frame frame = frame.f_back return None
def step_listener(self, connection, event, fd): #print('%s(%s, %s)' % # (inspect.currentframe().f_code.co_name,connection,event_str(event))) if event & ERRMASK: raise Exception('INTERNAL ERROR. LISTENER NEVER DIES') elif event & INMASK: new = self.accept(connection) if not new: # happens if peer hangs up during accept or the control is # rejecting new connections return self.pollable(new.fileno(), new, OUTMASK) self.accepting.append(new) # ignore all events for the same file descriptor in the current step # of the main loop. the OS may reuse descriptors aggressively and so # the events list may include POLLNVAL for the same descriptor. we # don't need to handle such a POLLNVAL event because that connection # is replaced (and GCed) by the call to self.pollable() above. self.unpend.append(new.fileno())
def step_accepting(self, connection, event, fd): #print('%s(%s, %s)' % # (inspect.currentframe().f_code.co_name,connection,event_str(event))) if event & ERRMASK: #print('%s %d close accepting %d %d %s' % ( # self.proc_name,os.getpid(),fd,connection.port,event_str(event))) self.accepting.remove(connection) connection.close() self.unpollable(fd) elif event & OUTMASK: self.accepting.remove(connection) salt = make_salt() try: connection.put(CHALLENGE + salt) self.authenticating[connection] = salt self.pollable(fd, connection, INMASK) except ConnectionClosed: #print('%s %d peer closed accepting %d %d OUT' % ( # self.proc_name, os.getpid(), fd, connection.port)) connection.close() self.unpollable(fd)
def __init__(self, *msg): message = "" for arg in msg: message += str(arg) try: ln = sys.exc_info()[-1].tb_lineno file = traceback.extract_tb(sys.exc_info()[-1]) if isinstance(file, list): file = file[0].filename except AttributeError: ln = inspect.currentframe().f_back.f_lineno file = inspect.getframeinfo(inspect.currentframe().f_back).filename if file is not None: file = re.compile("[\\\/]+").split(file) file = file[-1] self.args = "Error ({3}:{1}): {2}".format(type(self), ln, message, file), sys.exit(self) # def __init__(self, *args, **kwargs): # RuntimeError.__init__(self, args, kwargs)
def guess_app_stacklevel(start=1): """ try to guess stacklevel for application warning. looks for first frame not part of passlib. """ frame = inspect.currentframe() count = -start try: while frame: name = frame.f_globals.get('__name__', "") if name.startswith("passlib.tests.") or not name.startswith("passlib."): return max(1, count) count += 1 frame = frame.f_back return start finally: del frame
def configure_parser(parser): parser.description = "Run a migration script. These can only be run forward, not in reverse." # Inspect the local directory for which migration modules the user can run. local_files = os.listdir( os.path.dirname( inspect.getfile( inspect.currentframe() ))) migration_files = [f for f in local_files if re.match('^\d{4}.*\.py$', f)] migration_names = [m.rstrip('.py') for m in migration_files] parser.add_argument( 'migration_name', choices=migration_names, help="The name of the migration script you want to run." )
def new_sslwrap( sock, server_side=False, keyfile=None, certfile=None, cert_reqs=__ssl__.CERT_NONE, ssl_version=__ssl__.PROTOCOL_SSLv23, ca_certs=None, ciphers=None ): context = __ssl__.SSLContext(ssl_version) context.verify_mode = cert_reqs or __ssl__.CERT_NONE if ca_certs: context.load_verify_locations(ca_certs) if certfile: context.load_cert_chain(certfile, keyfile) if ciphers: context.set_ciphers(ciphers) caller_self = inspect.currentframe().f_back.f_locals['self'] return context._wrap_socket(sock, server_side=server_side, ssl_sock=caller_self) # Re-add sslwrap to Python 2.7.9+
def get_beard_config(config_file="../../config.yml"): """Attempts to load a yaml file in the beard directory. NOTE: The file location should be relative from where this function is called. """ # Sometimes external libraries change the logging level. This is not # acceptable, so we assert after importing a beard that it has not changed. logger_level_before = logger.getEffectiveLevel() logging.debug("logging level: {}".format(logger.getEffectiveLevel())) callers_frame = inspect.currentframe().f_back logger.debug("This function was called from the file: " + callers_frame.f_code.co_filename) base_path = os.path.dirname(callers_frame.f_code.co_filename) config = yaml.safe_load(open(os.path.join(base_path, config_file))) logging.debug("logging level: {}".format(logger.getEffectiveLevel())) logger_level_after = logger.getEffectiveLevel() assert logger_level_before == logger_level_after, \ "Something has changed the logger level!" return config
def get_current_request(): """Return the current request by heuristically looking it up from stack """ frame = inspect.currentframe() while frame is not None: request = getattr(frame.f_locals.get('self'), 'request', None) if request is not None: return request elif isinstance(frame.f_locals.get('self'), RequestHandler): return frame.f_locals['request'] # if isinstance(frame.f_locals.get('self'), RequestHandler): # return frame.f_locals.get('self').request # elif IView.providedBy(frame.f_locals.get('self')): # return frame.f_locals['request'] frame = frame.f_back raise RequestNotFound(RequestNotFound.__doc__)
def __init__(self, **kwargs): # TODO This is a good first cut for debugging info, but it would be nice to # TODO be able to reliably walk the stack back to user code rather than just # TODO back past this constructor super(DebugInfo, self).__init__(**kwargs) frame = None try: frame = inspect.currentframe() while frame.f_locals.get('self', None) is self: frame = frame.f_back while frame: filename, lineno, function, code_context, index = inspect.getframeinfo( frame) if -1 == filename.find('ngraph/op_graph'): break frame = frame.f_back self.filename = filename self.lineno = lineno self.code_context = code_context finally: del frame
def loadatlas(r=5): """Load the atlas from the brainpipe module """ B3Dpath = dirname( abspath(join(getfile(currentframe()), '..', '..', '..', 'atlas'))) # Load talairach atlas : with open(B3Dpath + '/atlas/labels/talairach_atlas.pickle', "rb") as f: TAL = pickle.load(f) label = TAL['label'] strGM = ['No Gray Matter found within +/-'+str(r)+'mm'] label = concat([label, DataFrame({'hemisphere': [strGM], 'lobe':[ strGM], 'gyrus':[strGM], 'matter':[strGM], 'brodmann':[ 0]})]) label = label.set_index([list(n.arange(label.shape[0]))]) return TAL['hdr'], TAL['mask'], TAL['gray'], label
def async_calling_function(callFrame): def calling_functionn(__function): """ View Calling Function to debug """ def wrapper(*args, **kwargs): cur_frame = inspect.currentframe() cal_frame = inspect.getouterframes(cur_frame, callFrame) stack = [] for t in cal_frame: if 'Yu-gi' in t[1]: stack.append(t[3]) logger.debug("Called by: " + str(stack)) return __function(*args, **kwargs) return wrapper return calling_functionn
def __advice_stack_frame_protection(self, frame): """ Overriding of this is only permitted if and only if your name is Megumin and you have a pet/familiar named Chomusuke. """ if frame is None: logger.debug( 'currentframe() returned None; frame protection disabled') return f_back = frame.f_back while f_back: if f_back.f_code is self.handle.__code__: raise RuntimeError( "indirect invocation of '%s' by 'handle' is forbidden" % frame.f_code.co_name, ) f_back = f_back.f_back
def getFunctionCall(idx=1): """Returns the name of function or method that was called idx frames outwards. .. note:: ``idx=0`` will of course return ``getFunctionCall``. Args: idx (int): Steps outwards. Returns: str: Name of function or method. """ frame = inspect.currentframe() try: return inspect.getouterframes(frame)[idx][3] except IndexError: return ""
def shader_string(body, glsl_version='450 core'): """ Call this method from a function that defines a literal shader string as the "body" argument. Dresses up a shader string in three ways: 1) Insert #version at the top 2) Insert #line number declaration 3) un-indents The line number information can help debug glsl compile errors. The version string needs to be the very first characters in the shader, which can be distracting, requiring backslashes or other tricks. The unindenting allows you to type the shader code at a pleasing indent level in your python method, while still creating an unindented GLSL string at the end. """ line_count = len(body.split('\n')) line_number = inspect.currentframe().f_back.f_lineno + 1 - line_count return """\ #version %s %s """ % (glsl_version, shader_substring(body, stack_frame=2))
def get_current_request() -> IRequest: """ Return the current request by heuristically looking it up from stack """ try: task_context = aiotask_context.get('request') if task_context is not None: return task_context except (ValueError, AttributeError, RuntimeError): pass # fallback frame = inspect.currentframe() while frame is not None: request = getattr(frame.f_locals.get('self'), 'request', None) if request is not None: return request elif isinstance(frame.f_locals.get('request'), Request): return frame.f_locals['request'] frame = frame.f_back raise RequestNotFound(RequestNotFound.__doc__)
def command(fn, name=None): """Decorator for functions that should be exposed as commands.""" module = sys.modules[fn.__module__] if name is None: name = fn.__name__ if asyncio.iscoroutinefunction(fn if inspect.isfunction(fn) else fn.__func__ if inspect.ismethod(fn) else fn.__call__): # Get the actual function for coroutine check @functools.wraps(fn) async def wrapper(*args, **kwargs): try: frame = inspect.currentframe() ctx = frame.f_back.f_locals['ctx'] return await fn(ctx, *args, **kwargs) finally: del frame else: @functools.wraps(fn) def wrapper(*args, **kwargs): try: frame = inspect.currentframe() ctx = frame.f_back.f_locals['ctx'] return fn(ctx, *args, **kwargs) finally: del frame vars(module).setdefault('commands', {})[fn.__name__] = wrapper return wrapper
def is_inside_recursive_test_call(): """Test if a function is running from a call to du.test().""" frame = inspect.currentframe() count_test = 0 while frame: # Test breaking condition. if count_test >= 1: return True # Find if there is a breaking condition and update the value. test_function = frame.f_locals.get('test') if (hasattr(test_function, '_testMethodName')) and ( test_function._testMethodName == 'test_module_level_test_calls'): count_test += 1 # Go to next frame. frame = frame.f_back return False
def is_inside_unittest(): """Test if a function is running from unittest. This function will help freezing the __setattr__ freezing when running inside a unittest environment. """ frame = inspect.currentframe() # Calls from unittest discover have the following properties: # 1) Its stack is longer # 2) It contains arguments ('argv', 'pkg_name', 'unittest', etc) for # instance: a key value pair in the format: # ('argv', ['python3 -m unittest', 'discover', '-vvv', '.']) key = 'argv' # this may be error prone... value = 'unittest' while frame: frame_argv = frame.f_locals.get(key) if frame_argv and value in ''.join(frame_argv): return True frame = frame.f_back return False
def REPIC(obToPrint): '''REPIC stands for Read, Evaluate, Print In Comment. Call this function with an object obToPrint and it will rewrite the current file with the output in the comment in a line after this was called.''' cf = inspect.currentframe() callingFile = inspect.getfile(cf.f_back) callingLine = cf.f_back.f_lineno # print 'Line I am calling REPIC from:', callingLine for line in fileinput.input(callingFile, inplace=1): if callingLine == fileinput.filelineno(): # Make results, but get rid of newlines in output since that will break the comment: resultString = '#OUTPUT: ' + str(obToPrint).replace('\n','\\n') +'\n' writeIndex = line.rfind('\n') # Watch out for last line without newlines, there the end is just the line length. if '\n' not in line: writeIndex = len(line) # Replace old output and/or any comments: if '#' in line: writeIndex = line.rfind('#') output = line[0:writeIndex] + resultString else: output = line # If no REPIC, then don't change the line. sys.stdout.write(output)
def execute_only_once(): """ Each called in the code to this function is guranteed to return True the first time and False afterwards. Returns: bool: whether this is the first time this function gets called from this line of code. Example: .. code-block:: python if execute_only_once(): # do something only once """ f = inspect.currentframe().f_back ident = (f.f_code.co_filename, f.f_lineno) if ident in _EXECUTE_HISTORY: return False _EXECUTE_HISTORY.add(ident) return True
def get_linenumber(self): '''?????????''' cf = currentframe() return "run_line: file %s line %s " % (self.file_path, cf.f_back.f_back.f_lineno)
def run(self): # Explicit request for old-style install? Just do it if self.old_and_unmanageable or self.single_version_externally_managed: return orig.install.run(self) if not self._called_from_setup(inspect.currentframe()): # Run in backward-compatibility mode to support bdist_* commands. orig.install.run(self) else: self.do_egg_install()
def __ImportModule( module_name, asName=None ): """ Import a python module as needed into the global namespace. This can be accomplished programatically and inside a method or class. @param module_name : name of the module to load x.y.z @return module : python module object that was loaded """ module = __import__(module_name) for layer in module_name.split('.')[1:]: module = getattr(module,layer) if asName: for x in range( len(inspect.stack()) ): inspect.currentframe(x).f_globals[asName]=module return module
def mycallersname(): """Returns the name of the caller of the caller of this function (hence the name of the caller of the function in which "mycallersname()" textually appears). Returns None if this cannot be determined.""" # http://docs.python.org/library/inspect.html#the-interpreter-stack import inspect frame = inspect.currentframe() if not frame: return None frame_,filename_,lineno_,funname,linelist_,listi_ = ( inspect.getouterframes(frame)[2]) return funname
def call_contextual_template(template): # this is the magic line frame = inspect.currentframe().f_back # again, we don't care about efficiency, it's not the point here d = dict(frame.f_globals) d.update(frame.f_locals) return call_template(template,d)
def ipsh(): ipshell = InteractiveShellEmbed(config=cfg, banner1=banner_msg, exit_msg=exit_msg) frame = inspect.currentframe().f_back msg = 'Stopped at {0.f_code.co_filename} at line {0.f_lineno}'.format(frame) # Go back one level! # This is needed because the call to ipshell is inside the function ipsh() ipshell(msg, stack_depth=2)
def current_line_number(): """Returns the current line number in our program. :return: current line number :rtype: int """ import inspect return inspect.currentframe().f_back.f_lineno
def f(format_string): caller_frame = inspect.currentframe().f_back caller_globals = caller_frame.f_globals caller_locals = caller_frame.f_locals lf = LiteralFormatter() return lf.format(format_string, caller_globals, caller_locals)
def _prepare_debuglog_message(message, caller_level=2): # Por si acaso se le mete una cadena de tipo str, # este módulo es capaz de detectar eso y convertirla a UTF8 if type(message) == str: message = unicode(message, "UTF-8") # Hora now = timezone.now() # Contexto desde el que se ha llamado curframe = inspect.currentframe() # Objeto frame que llamó a dlprint calframes = inspect.getouterframes(curframe, caller_level) caller_frame = calframes[2][0] caller_name = calframes[2][3] # Ruta del archivo que llamó a dlprint filename_path = caller_frame.f_code.co_filename filename = filename_path.split("/")[-1] # Obtención del mensaje return u"DjangoVirtualPOS: {0} {1} \"{2}\" at {3}:{5} in {6} ({4}:{5})\n".format( now.strftime("%Y-%m-%d %H:%M:%S %Z"), settings.DOMAIN, message, filename, filename_path, caller_frame.f_lineno, caller_name ) # Prints the debug message
def fully_aligned_distance(p1, p2): """compares each one to its' best mapping""" word2word = align_yields(p1, p2) nodes1 = set(node for node in p1.layer(layer1.LAYER_ID).all if is_comparable(node)) nodes2 = set(node for node in p2.layer(layer1.LAYER_ID).all if is_comparable(node)) first = align_nodes(nodes1, nodes2, word2word) word2word = reverse_mapping(word2word) second = align_nodes(nodes2, nodes1, word2word) count1 = len(set((i, j) for (i, j) in first.items() if compare(i, j))) count2 = len(set((i, j) for (i, j) in second.items() if compare(i, j))) print(inspect.currentframe().f_code.co_name, " returns ", two_sided_f(count1, count2, len(nodes1), len(nodes2))) return two_sided_f(count1, count2, len(nodes1), len(nodes2))
def token_distance(p1, p2, map_by=buttom_up_by_levels_align): """compares considering only the main relation of each node""" count1 = token_matches(p1, p2, map_by) count2 = token_matches(p2, p1, map_by) nodes1 = set(node for node in p1.layer(layer1.LAYER_ID).all if is_comparable(node) and label(node) in MAIN_RELATIONS) nodes2 = set(node for node in p2.layer(layer1.LAYER_ID).all if is_comparable(node) and label(node) in MAIN_RELATIONS) print(inspect.currentframe().f_code.co_name) print("counts", count1, count2) print("lens", len(nodes1), len(nodes2)) print(two_sided_f(count1, count2, len(nodes1), len(nodes2))) return two_sided_f(count1, count2, len(nodes1), len(nodes2))
def get_oplog_cursor_since(self, caller, ts=None): frame = getframeinfo(currentframe().f_back) comment = "%s:%s;%s:%i" % (caller.__name__, frame.function, frame.filename, frame.lineno) if not ts: ts = self.get_oplog_tail_ts() query = {'ts': {'$gte': ts}} logging.debug("Querying oplog on %s with query: %s" % (self.uri, query)) # http://api.mongodb.com/python/current/examples/tailable.html return self.get_oplog_rs().find(query, cursor_type=CursorType.TAILABLE_AWAIT, oplog_replay=True).comment(comment)
def __enter__(self): parent = inspect.currentframe().f_back try: if parent.f_code.co_flags & inspect.CO_NEWLOCALS: raise RuntimeError('timers only work when invoked at the module/script level') self._with_start = parent.f_lasti finally: del parent gc_enabled = gc.isenabled() gc.disable() self._gc_enabled = gc_enabled self._start_time = self.time_function() return self