我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用inspect.getsource()。
def get_action(driver, keyword): """get action class corresponding to the keyword in the driver """ drvmod = 'ProductDrivers.' + driver drvmodobj = importlib.import_module(drvmod) drvfile_methods = inspect.getmembers(drvmodobj, inspect.isroutine) main_method = [item[1] for item in drvfile_methods if item[0] == 'main'][0] main_src = inspect.getsource(main_method) pkglstmatch = re.search(r'package_list.*=.*\[(.*)\]', main_src, re.MULTILINE | re.DOTALL) pkglst = pkglstmatch.group(1).split(',') for pkg in pkglst: pkgobj = importlib.import_module(pkg) pkgdir = os.path.dirname(pkgobj.__file__) action_modules = [pkg+'.'+name for _, name, _ in pkgutil.iter_modules([pkgdir])] action_module_objs = [importlib.import_module(action_module) for action_module in action_modules] for action_module_obj in action_module_objs: for action_class in inspect.getmembers(action_module_obj, inspect.isclass): for func_name in inspect.getmembers(action_class[1], inspect.isroutine): if keyword == func_name[0]: return action_class[1] return None
def combineFunctions(self, functions): """ generates a combined formula as used for the calculation @param functions the list of individual functions (lambda code) @return the combined formula @note out of scope is the simplification (like: (x+1)+1 => x+2) """ expression = "" for function in reversed(functions): match = re.match(".*\((?P<expression>lambda.*)\).*", inspect.getsource(function)) if match: functionCode = match.group("expression") functionCode = functionCode[functionCode.find(":")+1:].strip() if not len(expression): expression = functionCode else: expression = expression.replace("x", "("+functionCode+")") return expression
def h_fun(fun): """ Hash functions :param fun: function to hash :type fun: function :return: hash of the function """ try: return fun.code except AttributeError: try: h = inspect.getsource(fun) except IOError: h = "nocode" try: fun.code = h except AttributeError: pass return h
def get_decorators(cls): decorators = {} def visit_FunctionDef(node): decorators[node.name] = [] for n in node.decorator_list: name = '' if isinstance(n, ast.Call): name = n.func.attr if isinstance(n.func, ast.Attribute) else n.func.id else: name = n.attr if isinstance(n, ast.Attribute) else n.id args = [a.s for a in n.args] if hasattr(n, 'args') else [] decorators[node.name].append((name, args)) node_iter = ast.NodeVisitor() node_iter.visit_FunctionDef = visit_FunctionDef _cls = cls if inspect.isclass(cls) else cls.__class__ node_iter.visit(ast.parse(inspect.getsource(_cls))) return decorators
def __init__(self, *parts, **kwargs): self.lines = lines = [] de = kwargs.get('deindent', True) rstrip = kwargs.get('rstrip', True) for part in parts: if not part: partlines = [] if isinstance(part, Source): partlines = part.lines elif isinstance(part, (tuple, list)): partlines = [x.rstrip("\n") for x in part] elif isinstance(part, py.builtin._basestring): partlines = part.split('\n') if rstrip: while partlines: if partlines[-1].strip(): break partlines.pop() else: partlines = getsource(part, deindent=de).lines if de: partlines = deindent(partlines) lines.extend(partlines)
def code_to_ast(code: types.CodeType, file: str = None) -> ast.Module: """ Return node object for code object. """ if code and not isinstance(code, types.CodeType): raise TypeError('Unexpected type: {}'.format(str(type(code)))) result = None try: src = inspect.getsource(code) file = file or inspect.getfile(code) result = source_to_ast(src, file) except IOError: pass return result
def module_to_ast(module: types.ModuleType, file: str = None) -> ast.Module: """ Return node object for python module. """ if module and not isinstance(module, types.ModuleType): raise TypeError('Unexpected type: {}'.format(str(type(module)))) result = None try: src = inspect.getsource(module) file = file or inspect.getfile(module) result = source_to_ast(src, file) except IOError: pass return result
def class_to_ast(class_: type, file: str = None) -> ast.ClassDef: """ """ if class_ and not isinstance(class_, type): raise TypeError('Unexpected type: {}'.format(str(type(class_)))) result = None try: src = inspect.getsource(class_) file = file or inspect.getfile(class_) result = source_to_ast(src, file) except IOError: pass return result
def get_class_traits(klass): """ Yield all of the documentation for trait definitions on a class object. """ # FIXME: gracefully handle errors here or in the caller? source = inspect.getsource(klass) cb = CommentBlocker() cb.process_file(StringIO(source)) mod_ast = compiler.parse(source) class_ast = mod_ast.node.nodes[0] for node in class_ast.code.nodes: # FIXME: handle other kinds of assignments? if isinstance(node, compiler.ast.Assign): name = node.nodes[0].name rhs = unparse(node.expr).strip() doc = strip_comment_marker(cb.search_for_comment(node.lineno, default='')) yield name, rhs, doc
def _get_module_via_sys_modules(self, fullname): """Attempt to fetch source code via sys.modules. This is specifically to support __main__, but it may catch a few more cases.""" module = sys.modules.get(fullname) if not isinstance(module, types.ModuleType): LOG.debug('sys.modules[%r] absent or not a regular module', fullname) return modpath = self._py_filename(getattr(module, '__file__', '')) if not modpath: return is_pkg = hasattr(module, '__path__') try: source = inspect.getsource(module) except IOError: # Work around inspect.getsourcelines() bug. if not is_pkg: raise source = '\n' return (module.__file__.rstrip('co'), source, hasattr(module, '__path__'))
def get_boot_command(self): source = inspect.getsource(self._first_stage) source = textwrap.dedent('\n'.join(source.strip().split('\n')[2:])) source = source.replace(' ', '\t') source = source.replace('CONTEXT_NAME', self.remote_name) encoded = source.encode('zlib').encode('base64').replace('\n', '') # We can't use bytes.decode() in 3.x since it was restricted to always # return unicode, so codecs.decode() is used instead. In 3.x # codecs.decode() requires a bytes object. Since we must be compatible # with 2.4 (no bytes literal), an extra .encode() either returns the # same str (2.x) or an equivalent bytes (3.x). return [ self.python_path, '-c', 'from codecs import decode as _;' 'exec(_(_("%s".encode(),"base64"),"zlib"))' % (encoded,) ]
def merge_sources(*sources): def sort_imports(import_string): order = 0 if import_string.startswith('import .'): order = 1 elif import_string.startswith('from .'): order = 3 elif import_string.startswith('from '): order = 2 return order, import_string sources = [getsource(source).splitlines() for source in sources] imports = [] for source in sources: for line in source: if line.startswith(('from', 'import')): imports.append(line) without_imports = (line for source in chain(sources) for line in source if not line.startswith(('from', 'import'))) imports.sort(key=sort_imports) return '\n'.join(chain(imports, without_imports))
def build_lazy_ie(ie, name): valid_url = getattr(ie, '_VALID_URL', None) s = ie_template.format( name=name, bases=', '.join(map(get_base_name, ie.__bases__)), valid_url=valid_url, module=ie.__module__) if ie.suitable.__func__ is not InfoExtractor.suitable.__func__: s += '\n' + getsource(ie.suitable) if hasattr(ie, '_make_valid_url'): # search extractors s += make_valid_template.format(valid_url=ie._make_valid_url()) return s # find the correct sorting and add the required base classes so that sublcasses # can be correctly created
def retrieve_data_string(data, verbose=True): data_string = inspect.getsource(data) first_line = data_string.split("\n")[0] indent_length = len(determine_indent(data_string)) data_string = data_string.replace(first_line, "") r = re.compile(r'^\s*return.*') last_line = [s for s in reversed(data_string.split("\n")) if r.match(s)][0] data_string = data_string.replace(last_line, "") split_data = data_string.split("\n") for i, line in enumerate(split_data): split_data[i] = line[indent_length:] + "\n" data_string = ''.join(split_data) if verbose: print(">>> Data") print(with_line_numbers(data_string)) return data_string
def source(self): """ Returns ------- str The source code of this action's callable formatted as Markdown text. """ try: source = inspect.getsource(self._callable) except OSError: raise TypeError( "Cannot retrieve source code for callable %r" % self._callable.__name__) return markdown_source_template % {'source': source}
def parse_func(n, func, prefix, f, s, typ='function'): '''inspect function signature # TODO: py3 inspect changed. ''' prefix += (n,) s.doc[prefix] = {'t': typ, 'd': getsource(func)} fn = '.'.join(prefix[1:]) if not matches(fn, f.fmatch): return s.funcs.append(fn) args = getargspec(func) m = { 'pos_args': [], 'doc': doc_tag_line(f, func) , 'arg_keys': args.args, 'args': {}} defs = list(args.defaults or ()) for a in reversed(args.args): d = m['args'][a] = defs.pop() if defs else 'n.d.' if d == 'n.d.': m['pos_args'].insert(0, a) #m['sig'] = getsig(func, args, m) s.reg[fn] = m l('added', 'function' , prefix)
def bootstrap_exec(io, spec): try: sendexec( io, inspect.getsource(gateway_base), "execmodel = get_execmodel(%r)" % spec.execmodel, 'io = init_popen_io(execmodel)', "io.write('1'.encode('ascii'))", "serve(io, id='%s-slave')" % spec.id, ) s = io.read(1) assert s == "1".encode('ascii') except EOFError: ret = io.wait() if ret == 255: raise HostNotFound(io.remoteaddress)
def bootstrap_socket(io, id): # XXX: switch to spec from execnet.gateway_socket import SocketIO sendexec( io, inspect.getsource(gateway_base), 'import socket', inspect.getsource(SocketIO), "try: execmodel", "except NameError:", " execmodel = get_execmodel('thread')", "io = SocketIO(clientsock, execmodel)", "io.write('1'.encode('ascii'))", "serve(io, id='%s-slave')" % id, ) s = io.read(1) assert s == "1".encode('ascii')
def listofmodeluserfunctions(self): """User functions of the model class.""" lines = [] for (name, member) in vars(self.model.__class__).items(): if (inspect.isfunction(member) and (name not in ('run', 'new2old')) and ('fastaccess' in inspect.getsource(member))): lines.append((name, member)) run = vars(self.model.__class__).get('run') if run is not None: lines.append(('run', run)) for (name, member) in vars(self.model).items(): if (inspect.ismethod(member) and ('fastaccess' in inspect.getsource(member))): lines.append((name, member)) return lines
def wait_for_assert(self, lambda_expression, timeout=TEST_TIMEOUT): """ Evaluates lambda_expression once/1s until no AssertionError or hits timeout. """ import time import inspect running_time = 0 while running_time < timeout: try: lambda_expression() except AssertionError: pass else: break time.sleep(1) running_time += 1 self.assertLess(running_time, timeout, "Timed out waiting for %s." % inspect.getsource(lambda_expression))
def wait_until_true(self, lambda_expression, error_message='', timeout=TEST_TIMEOUT): """Evaluates lambda_expression once/1s until True or hits timeout.""" assert hasattr(lambda_expression, '__call__'), 'lambda_expression is not callable: %s' % type(lambda_expression) assert hasattr(error_message, '__call__') or type(error_message) is str, 'error_message is not callable and not a str: %s' % type(error_message) assert type(timeout) == int, 'timeout is not an int: %s' % type(timeout) running_time = 0 lambda_result = None wait_time = 0.01 while not lambda_result and running_time < timeout: lambda_result = lambda_expression() logger.debug("%s evaluated to %s" % (inspect.getsource(lambda_expression), lambda_result)) if not lambda_result: time.sleep(wait_time) wait_time = min(1, wait_time * 10) running_time += wait_time if hasattr(error_message, '__call__'): error_message = error_message() self.assertLess(running_time, timeout, 'Timed out waiting for %s\nError Message %s' % (inspect.getsource(lambda_expression), error_message))
def wait_for_assert(self, lambda_expression, timeout=TEST_TIMEOUT): """ Evaluates lambda_expression once/1s until no AssertionError or hits timeout. """ running_time = 0 assertion = None while running_time < timeout: try: lambda_expression() except AssertionError, e: assertion = e logger.debug("%s tripped assertion: %s" % (inspect.getsource(lambda_expression), e)) else: break time.sleep(1) running_time += 1 self.assertLess(running_time, timeout, "Timed out waiting for %s\nAssertion %s" % (inspect.getsource(lambda_expression), assertion))
def test_proceed_with_fake_filename(self): '''doctest monkeypatches linecache to enable inspection''' fn, source = '<test>', 'def x(): pass\n' getlines = linecache.getlines def monkey(filename, module_globals=None): if filename == fn: return source.splitlines(True) else: return getlines(filename, module_globals) linecache.getlines = monkey try: ns = {} exec compile(source, fn, 'single') in ns inspect.getsource(ns["x"]) finally: linecache.getlines = getlines
def interpret_fraction(f): assert inspect.isfunction(f) members = dict(inspect.getmembers(f)) global_dict = members["__globals__"] source_filename = inspect.getsourcefile(f) f_ast = ast.parse(inspect.getsource(f)) _, starting_line = inspect.getsourcelines(f) # ast_demo.increment_lineno(f_ast, starting_line - 1) # print("AST:", ast_demo.dump(f_ast)) visitor = FractionInterpreter() new_ast = visitor.visit(f_ast) # print(ast_demo.dump(new_ast)) ast.fix_missing_locations(new_ast) co = compile(new_ast, '<ast_demo>', 'exec') fake_locals = {} # exec will define the new function into fake_locals scope # this is to avoid conflict with vars in the real locals() # https://stackoverflow.com/questions/24733831/using-a-function-defined-in-an-execed-string-in-python-3 exec(co, None, fake_locals) # new_f = locals()[visitor._new_func_name(f.__name__)] return fake_locals[f.__name__]
def mess_control(f, *, debug=0): assert inspect.isfunction(f) members = dict(inspect.getmembers(f)) global_dict = members["__globals__"] source_filename = inspect.getsourcefile(f) f_ast = ast.parse(inspect.getsource(f)) _, starting_line = inspect.getsourcelines(f) # ast_demo.increment_lineno(f_ast, starting_line - 1) if debug: print("AST:", ast.dump(f_ast)) visitor = ControlMess() new_ast = visitor.visit(f_ast) if debug: print('NEW AST:', ast.dump(new_ast)) ast.fix_missing_locations(new_ast) co = compile(new_ast, '<ast_demo>', 'exec') fake_locals = {} # exec will define the new function into fake_locals scope # this is to avoid conflict with vars in the real locals() # https://stackoverflow.com/questions/24733831/using-a-function-defined-in-an-execed-string-in-python-3 exec(co, None, fake_locals) # new_f = locals()[visitor._new_func_name(f.__name__)] return fake_locals[f.__name__]
def local_settings_update(self, changes): """Update local_settings.py with new content created according to the changes parameter. The changes parameter should be a list generated by check_modules()""" if not local_settings: raise SystemError('Missing local_settings.py!') logger.info('Creating new local_settings.py with following changes: %s', self._show_changes(changes)) target = inspect.getsourcefile(local_settings) data = self._local_settings_new(changes) backup = inspect.getsource(local_settings) logger.warn('Updating %s', target) self._save_file(target, data) try: reload_module(local_settings) except ImportError as e: logger.exception(e) logger.warn('Restoring %s from backup', target) self._save_file(target, backup) else: # Force reloading of django settings settings._wrapped = empty
def write_version_py(filename): # Adding the git rev number needs to be done inside write_version_py(), # otherwise the import of numpy.version messes up the build under Python 3. git_version_code = inspect.getsource(get_git_version) if os.path.exists('.git'): GIT_REVISION = get_git_version() else: GIT_REVISION = 'Unknown' content = VERSION_PY_CONTENT % { 'version': PACKAGE_VERSION, 'git_revision': GIT_REVISION, 'is_release': str(IS_RELEASE), 'git_version_code': str(git_version_code) } with open(filename, 'w') as version_file: version_file.write(content) ################################################################################ # Installation ################################################################################
def query(name): frame = inspect.stack()[1] module = inspect.getmodule(frame[0]) filename = module.__file__ dataset = os.path.abspath(filename).split('/')[-2] def wrapper(f): lines = inspect.getsource(f).split('\n') lines = lines[:-1] # Seems to include a trailing newline # Hacky way to get just the function body i = 0 while True: if "():" in lines[i]: break i = i + 1 fn = lines[i:] fn += ['FN = ' + f.__name__] queries[dataset].append([name, '\n'.join(fn)]) return f return wrapper
def test_proceed_with_fake_filename(self): '''doctest monkeypatches linecache to enable inspection''' fn, source = '<test>', 'def x(): pass\n' getlines = linecache.getlines def monkey(filename, module_globals=None): if filename == fn: return source.splitlines(keepends=True) else: return getlines(filename, module_globals) linecache.getlines = monkey try: ns = {} exec(compile(source, fn, 'single'), ns) inspect.getsource(ns["x"]) finally: linecache.getlines = getlines
def _run_request(self, request, where, cpu, gen, *args, **kwargs): """Internal use only. """ if isinstance(gen, str): name = gen else: name = gen.func_name if name in self._xfer_funcs: code = None else: # if not inspect.isgeneratorfunction(gen): # logger.warning('"%s" is not a valid generator function', name) # raise StopIteration([]) code = inspect.getsource(gen).lstrip() def _run_req(task=None): msg = {'req': 'job', 'auth': self._auth, 'job': _DispycosJob_(request, task, name, where, cpu, code, args, kwargs)} if (yield self.scheduler.deliver(msg, timeout=MsgTimeout)) == 1: reply = yield task.receive() if isinstance(reply, Task): if self.status_task: msg = DispycosTaskInfo(reply, args, kwargs, time.time()) self.status_task.send(DispycosStatus(Scheduler.TaskCreated, msg)) if not request.endswith('async'): reply = yield task.receive() else: reply = None raise StopIteration(reply) yield Task(_run_req).finish()
def _run_request(self, request, where, cpu, gen, *args, **kwargs): """Internal use only. """ if isinstance(gen, str): name = gen else: name = gen.__name__ if name in self._xfer_funcs: code = None else: # if not inspect.isgeneratorfunction(gen): # logger.warning('"%s" is not a valid generator function', name) # raise StopIteration([]) code = inspect.getsource(gen).lstrip() def _run_req(task=None): msg = {'req': 'job', 'auth': self._auth, 'job': _DispycosJob_(request, task, name, where, cpu, code, args, kwargs)} if (yield self.scheduler.deliver(msg, timeout=MsgTimeout)) == 1: reply = yield task.receive() if isinstance(reply, Task): if self.status_task: msg = DispycosTaskInfo(reply, args, kwargs, time.time()) self.status_task.send(DispycosStatus(Scheduler.TaskCreated, msg)) if not request.endswith('async'): reply = yield task.receive() else: reply = None raise StopIteration(reply) yield Task(_run_req).finish()
def __new__(cls, name, bases, dct): slots = dct.get('__slots__', []) orig_slots = [] for base in bases: if hasattr(base, "__slots__"): orig_slots += base.__slots__ if '__init__' in dct: init = dct['__init__'] initproc = type.__new__(cls, name, bases, dct) initproc_source = inspect.getsource(initproc) ast = compile(initproc_source, "dont_care", 'exec', _ast.PyCF_ONLY_AST) classdef = ast.body[0] stmts = classdef.body for declaration in stmts: if isinstance(declaration, _ast.FunctionDef): name1 = declaration.name if name1 == '__init__': # delete this line if you do not initialize all instance variables in __init__ initbody = declaration.body for statement in initbody: if isinstance(statement, _ast.Assign): for target in statement.targets: name1 = target.attr if name1 not in orig_slots: slots.append(name1) if slots: dct['__slots__'] = slots return type.__new__(cls, name, bases, dct)
def runtime(f): """Evaluates the given function each time it is called.""" # get the function's name name = f.__name__ # and its source code, sans decorator source = remove_decorators(getsource(f)) @wraps(f) def wrapped(*args, **kwargs): # execute the function's declaration exec(source) # since the above overwrites its name in the local # scope we can call it here using eval return eval("%s(*%s, **%s)" % (name, args, kwargs)) return wrapped
def get_info(obj): try: lines = inspect.getsource(obj).split('\n') lines = [i for i in lines if '@' not in i] # filter out decorators return lines[0].replace('def ', '').replace('):', ')').replace(' ', '').replace( obj.__name__, '').replace('self, ', '').replace('self', '') except: return None
def get_source(func): raw_source = inspect.getsource(func) print(raw_source) corrected_source = correct_indentation(raw_source) print(corrected_source) return corrected_source
def getsource(obj, **kwargs): obj = py.code.getrawcode(obj) try: strsrc = inspect.getsource(obj) except IndentationError: strsrc = "\"Buggy python version consider upgrading, cannot get source\"" assert isinstance(strsrc, str) return Source(strsrc, **kwargs)
def getstatementrange_ast(lineno, source, assertion=False, astnode=None): if astnode is None: content = str(source) if sys.version_info < (2,7): content += "\n" try: astnode = compile(content, "source", "exec", 1024) # 1024 for AST except ValueError: start, end = getstatementrange_old(lineno, source, assertion) return None, start, end start, end = get_statement_startend2(lineno, astnode) # we need to correct the end: # - ast-parsing strips comments # - there might be empty lines # - we might have lesser indented code blocks at the end if end is None: end = len(source.lines) if end > start + 1: # make sure we don't span differently indented code blocks # by using the BlockFinder helper used which inspect.getsource() uses itself block_finder = inspect.BlockFinder() # if we start with an indented line, put blockfinder to "started" mode block_finder.started = source.lines[start][0].isspace() it = ((x + "\n") for x in source.lines[start:end]) try: for tok in tokenize.generate_tokens(lambda: next(it)): block_finder.tokeneater(*tok) except (inspect.EndOfBlock, IndentationError): end = block_finder.last + start except Exception: pass # the end might still point to a comment or empty line, correct it while end: line = source.lines[end - 1].lstrip() if line.startswith("#") or not line: end -= 1 else: break return astnode, start, end
def getsource(obj, **kwargs): import _pytest._code obj = _pytest._code.getrawcode(obj) try: strsrc = inspect.getsource(obj) except IndentationError: strsrc = "\"Buggy python version consider upgrading, cannot get source\"" assert isinstance(strsrc, str) return Source(strsrc, **kwargs)
def get_helper_functions(self): def to_str(fn): return inspect.getsource(fn) if callable(fn) else fn for fn in self.helper_functions: yield to_str(fn) for fn in self.get_extra_helper_functions(): yield to_str(fn)
def get_code(): """ returns the code for the current class """ return inspect.getsource(Dijkstra)
def get_code(): """ returns the code for the current class """ return inspect.getsource(OneDirectionalAStar)