我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用inspect.getsourcefile()。
def linkcode_resolve(domain, info): def find_source(): # try to find the file and line number, based on code from numpy: # https://github.com/numpy/numpy/blob/master/doc/source/conf.py#L286 obj = sys.modules[info['module']] for part in info['fullname'].split('.'): obj = getattr(obj, part) import inspect import os fn = inspect.getsourcefile(obj) fn = os.path.relpath(fn, start=os.path.dirname(npdl.__file__)) source, lineno = inspect.getsourcelines(obj) return fn, lineno, lineno + len(source) - 1 if domain != 'py' or not info['module']: return None try: filename = 'npdl/%s#L%d-L%d' % find_source() except Exception: filename = info['module'].replace('.', '/') + '.py' tag = 'master' if 'dev' in release else ('v' + release) return "https://github.com/oujago/NumpyDL/blob/%s/%s" % (tag, filename)
def get_src_path(self, obj, append_base=True): """Creates a src path string with line info for use as markdown link. """ path = getsourcefile(obj) if not self.src_root in path: # this can happen with e.g. # inlinefunc-wrapped functions if hasattr(obj, "__module__"): path = "%s.%s" % (obj.__module__, obj.__name__) else: path = obj.__name__ path = path.replace(".", "/") pre, post = path.rsplit(self.src_root + "/", 1) lineno = self.get_line_no(obj) lineno = "" if lineno is None else "#L{}".format(lineno) path = self.src_root + "/" + post + lineno if append_base: path = os.path.join(self.github_link, path) return path
def get_src_path(self, obj, append_base=True): """Creates a src path string with line info for use as markdown link. """ path = getsourcefile(obj) if self.src_root not in path: # this can happen with e.g. # inlinefunc-wrapped functions if hasattr(obj, "__module__"): path = "%s.%s" % (obj.__module__, obj.__name__) else: path = obj.__name__ path = path.replace(".", "/") pre, post = path.rsplit(self.src_root + "/", 1) lineno = self.get_line_no(obj) lineno = "" if lineno is None else "#L{}".format(lineno) path = self.src_root + "/" + post + lineno if append_base: path = os.path.join(self.github_link, path) return path
def validate_modules(self): # Match def p_funcname( fre = re.compile(r'\s*def\s+(p_[a-zA-Z_0-9]*)\(') for module in self.modules: lines, linen = inspect.getsourcelines(module) counthash = {} for linen, line in enumerate(lines): linen += 1 m = fre.match(line) if m: name = m.group(1) prev = counthash.get(name) if not prev: counthash[name] = linen else: filename = inspect.getsourcefile(module) self.log.warning('%s:%d: Function %s redefined. Previously defined on line %d', filename, linen, name, prev) # Get the start symbol
def linkcode_resolve(domain, info): def find_source(): import braindecode # try to find the file and line number, based on code from numpy: # https://github.com/numpy/numpy/blob/master/doc/source/conf.py#L286 obj = sys.modules[info['module']] for part in info['fullname'].split('.'): obj = getattr(obj, part) import inspect import os fn = inspect.getsourcefile(obj) fn = os.path.relpath(fn, start=os.path.dirname(braindecode.__file__)) source, lineno = inspect.getsourcelines(obj) return fn, lineno, lineno + len(source) - 1 if domain != 'py' or not info['module']: return None try: filename = 'braindecode/%s#L%d-L%d' % find_source() except Exception: filename = info['module'].replace('.', '/') + '.py' return "https://github.com/robintibor/braindecode/blob/master/%s" % filename
def linkcode_resolve(domain, info): def find_source(): # try to find the file and line number, based on code from numpy: # https://github.com/numpy/numpy/blob/master/doc/source/conf.py#L286 obj = sys.modules[info['module']] for part in info['fullname'].split('.'): obj = getattr(obj, part) import inspect import os fn = inspect.getsourcefile(obj) fn = os.path.relpath(fn, start=os.path.dirname(yadll.__file__)) source, lineno = inspect.getsourcelines(obj) return fn, lineno, lineno + len(source) - 1 if domain != 'py' or not info['module']: return None try: filename = 'yadll/%s#L%d-L%d' % find_source() except Exception: filename = info['module'].replace('.', '/') + '.py' tag = 'master' if 'dev' in release else ('v' + release) return "https://github.com/yadll/yadll/blob/%s/%s" % (tag, filename) # -- Options for HTML output ---------------------------------------------- # Read the docs style:
def run(self): exists = os.path.exists mtime = lambda path: os.stat(path).st_mtime files = dict() for module in sys.modules.values(): try: path = inspect.getsourcefile(module) if path and exists(path): files[path] = mtime(path) except TypeError: pass while not self.status: for path, lmtime in files.iteritems(): if not exists(path) or mtime(path) > lmtime: self.status = 3 if not exists(self.lockfile): self.status = 2 elif mtime(self.lockfile) < time.time() - self.interval - 5: self.status = 1 if not self.status: time.sleep(self.interval) if self.status != 5: thread.interrupt_main()
def linkcode_resolve(domain, info): def find_source(): obj = sys.modules[info['module']] for part in info['fullname'].split('.'): obj = getattr(obj, part) import inspect import os fn = inspect.getsourcefile(obj) fn = os.path.relpath(fn, start=os.path.dirname(nnbuilder.__file__)) source, lineno = inspect.getsourcelines(obj) return fn, lineno, lineno + len(source) - 1 if domain != 'py' or not info['module']: return None try: filename = 'nnbuilder/%s#L%d-L%d' % find_source() except Exception: filename = info['module'].replace('.', '/') + '.py' return "https://github.com/aeloyq/NNBuilder/tree/master/%s" % (filename)
def interpret_fraction(f): assert inspect.isfunction(f) members = dict(inspect.getmembers(f)) global_dict = members["__globals__"] source_filename = inspect.getsourcefile(f) f_ast = ast.parse(inspect.getsource(f)) _, starting_line = inspect.getsourcelines(f) # ast_demo.increment_lineno(f_ast, starting_line - 1) # print("AST:", ast_demo.dump(f_ast)) visitor = FractionInterpreter() new_ast = visitor.visit(f_ast) # print(ast_demo.dump(new_ast)) ast.fix_missing_locations(new_ast) co = compile(new_ast, '<ast_demo>', 'exec') fake_locals = {} # exec will define the new function into fake_locals scope # this is to avoid conflict with vars in the real locals() # https://stackoverflow.com/questions/24733831/using-a-function-defined-in-an-execed-string-in-python-3 exec(co, None, fake_locals) # new_f = locals()[visitor._new_func_name(f.__name__)] return fake_locals[f.__name__]
def mess_control(f, *, debug=0): assert inspect.isfunction(f) members = dict(inspect.getmembers(f)) global_dict = members["__globals__"] source_filename = inspect.getsourcefile(f) f_ast = ast.parse(inspect.getsource(f)) _, starting_line = inspect.getsourcelines(f) # ast_demo.increment_lineno(f_ast, starting_line - 1) if debug: print("AST:", ast.dump(f_ast)) visitor = ControlMess() new_ast = visitor.visit(f_ast) if debug: print('NEW AST:', ast.dump(new_ast)) ast.fix_missing_locations(new_ast) co = compile(new_ast, '<ast_demo>', 'exec') fake_locals = {} # exec will define the new function into fake_locals scope # this is to avoid conflict with vars in the real locals() # https://stackoverflow.com/questions/24733831/using-a-function-defined-in-an-execed-string-in-python-3 exec(co, None, fake_locals) # new_f = locals()[visitor._new_func_name(f.__name__)] return fake_locals[f.__name__]
def omap(fun, arglist): print banner("Starting omap") N_tasks = len(arglist) jobname = str(npr.RandomState().randint(10**12)) working_dir = path.join(root_working_dir, jobdir(jobname)) module_path = path.join(os.getcwd(), inspect.getsourcefile(fun)) module_name = inspect.getmodulename(module_path) run_signal_path = path.join('..', run_signal(jobname)) fun_name = fun.__name__ slurm_str = slurm_template.format(jobname=jobname, N_tasks=N_tasks, other_options=slurm_options, module_name=module_name, fun_name=fun_name) with temp_dir(working_dir): shutil.copy(module_path, ".") with open(arg_fname, 'w') as f: pickle.dump(arglist, f) with open(slurm_fname, 'w') as f: f.write(slurm_str) with open(run_signal_path, 'w'): pass print "Submitting {0} tasks (output in {1})".format(N_tasks, working_dir) while path.exists(run_signal_path): time.sleep(1) print "Tasks submitted" return collect_results(jobname)
def get_src_path(obj, src_root='tefla', append_base=True): """Creates a src path string with line info for use as markdown link. """ path = getsourcefile(obj) if not src_root in path: # this can happen with e.g. # inlinefunc-wrapped functions if hasattr(obj, "__module__"): path = "%s.%s" % (obj.__module__, obj.__name__) else: path = obj.__name__ path = path.replace(".", "/") try: pre, post = path.rsplit(src_root + "/", 1) except: pre, post = '', '' lineno = get_line_no(obj) lineno = "" if lineno is None else "#L{}".format(lineno) path = src_root + "/" + post + lineno if append_base: path = os.path.join( 'https://github.com/openagi/tefla/blob/master', path) return path
def local_settings_update(self, changes): """Update local_settings.py with new content created according to the changes parameter. The changes parameter should be a list generated by check_modules()""" if not local_settings: raise SystemError('Missing local_settings.py!') logger.info('Creating new local_settings.py with following changes: %s', self._show_changes(changes)) target = inspect.getsourcefile(local_settings) data = self._local_settings_new(changes) backup = inspect.getsource(local_settings) logger.warn('Updating %s', target) self._save_file(target, data) try: reload_module(local_settings) except ImportError as e: logger.exception(e) logger.warn('Restoring %s from backup', target) self._save_file(target, backup) else: # Force reloading of django settings settings._wrapped = empty
def validate_modules(self): # Match def p_funcname( fre = re.compile(r'\s*def\s+(p_[a-zA-Z_0-9]*)\(') for module in self.modules: try: lines, linen = inspect.getsourcelines(module) except IOError: continue counthash = {} for linen, line in enumerate(lines): linen += 1 m = fre.match(line) if m: name = m.group(1) prev = counthash.get(name) if not prev: counthash[name] = linen else: filename = inspect.getsourcefile(module) self.log.warning('%s:%d: Function %s redefined. Previously defined on line %d', filename, linen, name, prev) # Get the start symbol
def find_templates(): """ Load python modules from templates directory and get templates list :return: list of tuples (pairs): [(compiled regex, lambda regex_match: return message_data)] """ templates = [] templates_directory = (inspect.getsourcefile(lambda: 0).rstrip('__init__.py') + 'templates') template_files = os.listdir(templates_directory) for template_file in template_files: if template_file.startswith('.') or not template_file.endswith('.py'): continue # Hack for dev development and disutils try: template_module = importlib.import_module('templates.{}'.format( template_file.rstrip('.py') )) except ImportError: template_module = importlib.import_module('ross.templates.{}'.format( template_file.rstrip('.py') )) # Iterate throw items in template. # If there are variable ends with 'templates', # extend templates list with it. for (name, content) in template_module.__dict__.items(): if name.endswith('templates'): for (regex_text, data_func) in content: templates.append((re.compile(regex_text, re.IGNORECASE), data_func)) return templates
def __init__(self, exc_type, exc_value, tb): self.lineno = tb.tb_lineno self.function_name = tb.tb_frame.f_code.co_name self.locals = tb.tb_frame.f_locals self.globals = tb.tb_frame.f_globals fn = inspect.getsourcefile(tb) or inspect.getfile(tb) if fn[-4:] in ('.pyo', '.pyc'): fn = fn[:-1] # if it's a file on the file system resolve the real filename. if os.path.isfile(fn): fn = os.path.realpath(fn) self.filename = to_unicode(fn, get_filesystem_encoding()) self.module = self.globals.get('__name__') self.loader = self.globals.get('__loader__') self.code = tb.tb_frame.f_code # support for paste's traceback extensions self.hide = self.locals.get('__traceback_hide__', False) info = self.locals.get('__traceback_info__') if info is not None: try: info = text_type(info) except UnicodeError: info = str(info).decode('utf-8', 'replace') self.info = info
def version_check_and_info(module): """Return either git info or standard module version if not a git repo. Args: module (module): python module object to get info for. Returns: dict: dictionary of info """ srcpath = inspect.getsourcefile(module) try: repo = git.Repo(srcpath, search_parent_directories=True) except git.InvalidGitRepositoryError: log.info('module %s not in a git repo, checking package version' % module.__name__) info = version_info(module) else: info = git_info(repo) info['source_path'] = srcpath return info
def __init__(self, lexicon_file="vader_lexicon.txt"): _this_module_file_path_ = abspath(getsourcefile(lambda:0)) lexicon_full_filepath = join(dirname(_this_module_file_path_), lexicon_file) with open(lexicon_full_filepath) as f: self.lexicon_full_filepath = f.read() self.lexicon = self.make_lex_dict()
def getfslineno(obj): """ Return source location (path, lineno) for the given object. If the source cannot be determined return ("", -1) """ try: code = py.code.Code(obj) except TypeError: try: fn = (py.std.inspect.getsourcefile(obj) or py.std.inspect.getfile(obj)) except TypeError: return "", -1 fspath = fn and py.path.local(fn) or None lineno = -1 if fspath: try: _, lineno = findsource(obj) except IOError: pass else: fspath = code.path lineno = code.firstlineno assert isinstance(lineno, int) return fspath, lineno # # helper functions #
def getfslineno(obj): """ Return source location (path, lineno) for the given object. If the source cannot be determined return ("", -1) """ import _pytest._code try: code = _pytest._code.Code(obj) except TypeError: try: fn = (py.std.inspect.getsourcefile(obj) or py.std.inspect.getfile(obj)) except TypeError: return "", -1 fspath = fn and py.path.local(fn) or None lineno = -1 if fspath: try: _, lineno = findsource(obj) except IOError: pass else: fspath = code.path lineno = code.firstlineno assert isinstance(lineno, int) return fspath, lineno # # helper functions #
def linkcode_resolve(domain, info): if domain != 'py' or not info['module']: return None rtd_version = os.environ.get('READTHEDOCS_VERSION') if rtd_version == 'latest': tag = 'master' else: tag = 'v{}'.format(__version__) # Import the object from module path obj = _import_object_from_name(info['module'], info['fullname']) # If it's not defined in the internal module, return None. mod = inspect.getmodule(obj) if mod is None: return None if not (mod.__name__ == 'cupy' or mod.__name__.startswith('cupy.')): return None # Get the source file name and line number at which obj is defined. try: filename = inspect.getsourcefile(obj) except TypeError: # obj is not a module, class, function, ..etc. return None # inspect can return None for cython objects if filename is None: return None # Get the source line number _, linenum = inspect.getsourcelines(obj) assert isinstance(linenum, six.integer_types) filename = os.path.realpath(filename) relpath = _get_source_relative_path(filename) return 'https://github.com/cupy/cupy/blob/{}/{}#L{}'.format( tag, relpath, linenum)
def __init__(self, exc_type, exc_value, tb): self.lineno = tb.tb_lineno self.function_name = tb.tb_frame.f_code.co_name self.locals = tb.tb_frame.f_locals self.globals = tb.tb_frame.f_globals fn = inspect.getsourcefile(tb) or inspect.getfile(tb) if fn[-4:] in ('.pyo', '.pyc'): fn = fn[:-1] # if it's a file on the file system resolve the real filename. if os.path.isfile(fn): fn = os.path.realpath(fn) self.filename = fn self.module = self.globals.get('__name__') self.loader = self.globals.get('__loader__') self.code = tb.tb_frame.f_code # support for paste's traceback extensions self.hide = self.locals.get('__traceback_hide__', False) info = self.locals.get('__traceback_info__') if info is not None: try: info = text_type(info) except UnicodeError: info = str(info).decode('utf-8', 'replace') self.info = info
def __init__(self, exc_type, exc_value, tb): self.lineno = tb.tb_lineno self.function_name = tb.tb_frame.f_code.co_name self.locals = tb.tb_frame.f_locals self.globals = tb.tb_frame.f_globals fn = inspect.getsourcefile(tb) or inspect.getfile(tb) if fn[-4:] in ('.pyo', '.pyc'): fn = fn[:-1] # if it's a file on the file system resolve the real filename. if os.path.isfile(fn): fn = os.path.realpath(fn) self.filename = to_unicode(fn, sys.getfilesystemencoding()) self.module = self.globals.get('__name__') self.loader = self.globals.get('__loader__') self.code = tb.tb_frame.f_code # support for paste's traceback extensions self.hide = self.locals.get('__traceback_hide__', False) info = self.locals.get('__traceback_info__') if info is not None: try: info = text_type(info) except UnicodeError: info = str(info).decode('utf-8', 'replace') self.info = info
def validate_module(self, module): lines, linen = inspect.getsourcelines(module) fre = re.compile(r'\s*def\s+(t_[a-zA-Z_0-9]*)\(') sre = re.compile(r'\s*(t_[a-zA-Z_0-9]*)\s*=') counthash = {} linen += 1 for line in lines: m = fre.match(line) if not m: m = sre.match(line) if m: name = m.group(1) prev = counthash.get(name) if not prev: counthash[name] = linen else: filename = inspect.getsourcefile(module) self.log.error('%s:%d: Rule %s redefined. Previously defined on line %d', filename, linen, name, prev) self.error = True linen += 1 # ----------------------------------------------------------------------------- # lex(module) # # Build all of the regular expression rules from definitions in the supplied module # -----------------------------------------------------------------------------
def traverse(roots, parent='', verbose=False): """ Recursive call which also handles printing output. :param api: The falcon.API or callable that returns an instance to look at. :type api: falcon.API or callable :param parent: The parent uri path to the current iteration. :type parent: str :param verbose: If the output should be verbose. :type verbose: bool """ for root in roots: if root.method_map: print('->', parent + '/' + root.raw_segment) if verbose: for method, func in root.method_map.items(): if func.__name__ != 'method_not_allowed': if isinstance(func, partial): real_func = func.func else: real_func = func source_file = inspect.getsourcefile(real_func) print('-->{0} {1}:{2}'.format( method, source_file, source_file[1] )) if root.children: traverse(root.children, parent + '/' + root.raw_segment, verbose)
def update_md5(filenames): """Update our built-in md5 registry""" import re for name in filenames: base = os.path.basename(name) f = open(name,'rb') md5_data[base] = md5(f.read()).hexdigest() f.close() data = [" %r: %r,\n" % it for it in md5_data.items()] data.sort() repl = "".join(data) import inspect srcfile = inspect.getsourcefile(sys.modules[__name__]) f = open(srcfile, 'rb'); src = f.read(); f.close() match = re.search("\nmd5_data = {\n([^}]+)}", src) if not match: print >>sys.stderr, "Internal error!" sys.exit(2) src = src[:match.start(1)] + repl + src[match.end(1):] f = open(srcfile,'w') f.write(src) f.close()
def func_namespace(func): """Generates a unique namespace for a function""" kls = None if hasattr(func, 'im_func') or hasattr(func, '__func__'): kls = im_class(func) func = im_func(func) if kls: return '%s.%s' % (kls.__module__, kls.__name__) else: return '%s|%s' % (inspect.getsourcefile(func), func.__name__)
def _send_module(self): fd = open(inspect.getsourcefile(self.__class__)) result = self._post('/module_update', files={'file': fd}) fd.close() result = self._post('/module_update_info', json={'name': self.name, 'config': self._get_config()}) if result['status'] != 'ok': raise ModuleInitializationError(self, result['error'])
def _deprecated_method(method, cls, method_name, msg): """Show deprecation warning about a magic method definition. Uses warn_explicit to bind warning to method definition instead of triggering code, which isn't relevant. """ warn_msg = "{classname}.{method_name} is deprecated in traitlets 4.1: {msg}".format( classname=cls.__name__, method_name=method_name, msg=msg ) for parent in inspect.getmro(cls): if method_name in parent.__dict__: cls = parent break # limit deprecation messages to once per package package_name = cls.__module__.split('.', 1)[0] key = (package_name, msg) if not _should_warn(key): return try: fname = inspect.getsourcefile(method) or "<unknown>" lineno = inspect.getsourcelines(method)[1] or 0 except (IOError, TypeError) as e: # Failed to inspect for some reason warn(warn_msg + ('\n(inspection failed) %s' % e), DeprecationWarning) else: warn_explicit(warn_msg, DeprecationWarning, fname, lineno)
def remote_exec(self, source, **kwargs): """ return channel object and connect it to a remote execution thread where the given ``source`` executes. * ``source`` is a string: execute source string remotely with a ``channel`` put into the global namespace. * ``source`` is a pure function: serialize source and call function with ``**kwargs``, adding a ``channel`` object to the keyword arguments. * ``source`` is a pure module: execute source of module with a ``channel`` in its global namespace In all cases the binding ``__name__='__channelexec__'`` will be available in the global namespace of the remotely executing code. """ call_name = None if isinstance(source, types.ModuleType): linecache.updatecache(inspect.getsourcefile(source)) source = inspect.getsource(source) elif isinstance(source, types.FunctionType): call_name = source.__name__ source = _source_of_function(source) else: source = textwrap.dedent(str(source)) if call_name is None and kwargs: raise TypeError("can't pass kwargs to non-function remote_exec") channel = self.newchannel() self._send(Message.CHANNEL_EXEC, channel.id, gateway_base.dumps_internal((source, call_name, kwargs))) return channel
def test_function_without_known_source_fails(self): # this one wont be able to find the source mess = {} py.builtin.exec_('def fail(channel): pass', mess, mess) print(inspect.getsourcefile(mess['fail'])) pytest.raises(ValueError, gateway._source_of_function, mess['fail'])
def runtests(): """ Run all mpmath tests and print output. """ import os.path from inspect import getsourcefile from .tests import runtests as tests testdir = os.path.dirname(os.path.abspath(getsourcefile(tests))) importdir = os.path.abspath(testdir + '/../..') tests.testit(importdir, testdir)