Python inspect 模块,getsource() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用inspect.getsource()

项目:warriorframework    作者:warriorframework    | 项目源码 | 文件源码
def get_action(driver, keyword):
    """get action class corresponding to the keyword in the driver
    """
    drvmod = 'ProductDrivers.' + driver
    drvmodobj = importlib.import_module(drvmod)
    drvfile_methods = inspect.getmembers(drvmodobj, inspect.isroutine)
    main_method = [item[1] for item in drvfile_methods if item[0] == 'main'][0]
    main_src = inspect.getsource(main_method)
    pkglstmatch = re.search(r'package_list.*=.*\[(.*)\]', main_src, re.MULTILINE | re.DOTALL)
    pkglst = pkglstmatch.group(1).split(',')
    for pkg in pkglst:
        pkgobj = importlib.import_module(pkg)
        pkgdir = os.path.dirname(pkgobj.__file__)
        action_modules = [pkg+'.'+name for _, name, _ in pkgutil.iter_modules([pkgdir])]
        action_module_objs = [importlib.import_module(action_module) for action_module in action_modules]
        for action_module_obj in action_module_objs:
            for action_class in inspect.getmembers(action_module_obj, inspect.isclass):
                for func_name in inspect.getmembers(action_class[1], inspect.isroutine):
                    if keyword == func_name[0]:
                        return action_class[1]
    return None
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def combineFunctions(self, functions):
        """ generates a combined formula as used for the calculation
            @param functions the list of individual functions (lambda code)
            @return the combined formula
            @note out of scope is the simplification (like: (x+1)+1 => x+2)
        """
        expression = ""
        for function in reversed(functions):
            match = re.match(".*\((?P<expression>lambda.*)\).*", inspect.getsource(function))
            if match:
                functionCode = match.group("expression")
                functionCode = functionCode[functionCode.find(":")+1:].strip()
                if not len(expression):
                    expression = functionCode
                else:
                    expression = expression.replace("x", "("+functionCode+")")
        return expression
项目:SoCFoundationFlow    作者:mattaw    | 项目源码 | 文件源码
def h_fun(fun):
    """
    Hash functions

    :param fun: function to hash
    :type  fun: function
    :return: hash of the function
    """
    try:
        return fun.code
    except AttributeError:
        try:
            h = inspect.getsource(fun)
        except IOError:
            h = "nocode"
        try:
            fun.code = h
        except AttributeError:
            pass
        return h
项目:SoCFoundationFlow    作者:mattaw    | 项目源码 | 文件源码
def h_fun(fun):
    """
    Hash functions

    :param fun: function to hash
    :type  fun: function
    :return: hash of the function
    """
    try:
        return fun.code
    except AttributeError:
        try:
            h = inspect.getsource(fun)
        except IOError:
            h = "nocode"
        try:
            fun.code = h
        except AttributeError:
            pass
        return h
项目:SoCFoundationFlow    作者:mattaw    | 项目源码 | 文件源码
def h_fun(fun):
    """
    Hash functions

    :param fun: function to hash
    :type  fun: function
    :return: hash of the function
    """
    try:
        return fun.code
    except AttributeError:
        try:
            h = inspect.getsource(fun)
        except IOError:
            h = "nocode"
        try:
            fun.code = h
        except AttributeError:
            pass
        return h
项目:bothub-sdk-python    作者:bothub-studio    | 项目源码 | 文件源码
def get_decorators(cls):
    decorators = {}

    def visit_FunctionDef(node):
        decorators[node.name] = []
        for n in node.decorator_list:
            name = ''
            if isinstance(n, ast.Call):
                name = n.func.attr if isinstance(n.func, ast.Attribute) else n.func.id
            else:
                name = n.attr if isinstance(n, ast.Attribute) else n.id

            args = [a.s for a in n.args] if hasattr(n, 'args') else []
            decorators[node.name].append((name, args))

    node_iter = ast.NodeVisitor()
    node_iter.visit_FunctionDef = visit_FunctionDef
    _cls = cls if inspect.isclass(cls) else cls.__class__
    node_iter.visit(ast.parse(inspect.getsource(_cls)))
    return decorators
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def __init__(self, *parts, **kwargs):
        self.lines = lines = []
        de = kwargs.get('deindent', True)
        rstrip = kwargs.get('rstrip', True)
        for part in parts:
            if not part:
                partlines = []
            if isinstance(part, Source):
                partlines = part.lines
            elif isinstance(part, (tuple, list)):
                partlines = [x.rstrip("\n") for x in part]
            elif isinstance(part, py.builtin._basestring):
                partlines = part.split('\n')
                if rstrip:
                    while partlines:
                        if partlines[-1].strip():
                            break
                        partlines.pop()
            else:
                partlines = getsource(part, deindent=de).lines
            if de:
                partlines = deindent(partlines)
            lines.extend(partlines)
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def __init__(self, *parts, **kwargs):
        self.lines = lines = []
        de = kwargs.get('deindent', True)
        rstrip = kwargs.get('rstrip', True)
        for part in parts:
            if not part:
                partlines = []
            if isinstance(part, Source):
                partlines = part.lines
            elif isinstance(part, (tuple, list)):
                partlines = [x.rstrip("\n") for x in part]
            elif isinstance(part, py.builtin._basestring):
                partlines = part.split('\n')
                if rstrip:
                    while partlines:
                        if partlines[-1].strip():
                            break
                        partlines.pop()
            else:
                partlines = getsource(part, deindent=de).lines
            if de:
                partlines = deindent(partlines)
            lines.extend(partlines)
项目:opyum    作者:Amper    | 项目源码 | 文件源码
def code_to_ast(code: types.CodeType, file: str = None) -> ast.Module:
    """
    Return node object for code object.
    """

    if code and not isinstance(code, types.CodeType):
        raise TypeError('Unexpected type: {}'.format(str(type(code))))

    result = None

    try:
        src = inspect.getsource(code)
        file = file or inspect.getfile(code)
        result = source_to_ast(src, file)
    except IOError:
        pass

    return result
项目:opyum    作者:Amper    | 项目源码 | 文件源码
def module_to_ast(module: types.ModuleType, file: str = None) -> ast.Module:
    """
    Return node object for python module.
    """

    if module and not isinstance(module, types.ModuleType):
        raise TypeError('Unexpected type: {}'.format(str(type(module))))

    result = None

    try:
        src = inspect.getsource(module)
        file = file or inspect.getfile(module)
        result = source_to_ast(src, file)
    except IOError:
        pass

    return result
项目:opyum    作者:Amper    | 项目源码 | 文件源码
def class_to_ast(class_: type, file: str = None) -> ast.ClassDef:
    """

    """

    if class_ and not isinstance(class_, type):
        raise TypeError('Unexpected type: {}'.format(str(type(class_))))

    result = None

    try:
        src = inspect.getsource(class_)
        file = file or inspect.getfile(class_)
        result = source_to_ast(src, file)
    except IOError:
        pass

    return result
项目:palladio    作者:slipguru    | 项目源码 | 文件源码
def get_class_traits(klass):
    """ Yield all of the documentation for trait definitions on a class object.
    """
    # FIXME: gracefully handle errors here or in the caller?
    source = inspect.getsource(klass)
    cb = CommentBlocker()
    cb.process_file(StringIO(source))
    mod_ast = compiler.parse(source)
    class_ast = mod_ast.node.nodes[0]
    for node in class_ast.code.nodes:
        # FIXME: handle other kinds of assignments?
        if isinstance(node, compiler.ast.Assign):
            name = node.nodes[0].name
            rhs = unparse(node.expr).strip()
            doc = strip_comment_marker(cb.search_for_comment(node.lineno, default=''))
            yield name, rhs, doc
项目:mitogen    作者:dw    | 项目源码 | 文件源码
def _get_module_via_sys_modules(self, fullname):
        """Attempt to fetch source code via sys.modules. This is specifically
        to support __main__, but it may catch a few more cases."""
        module = sys.modules.get(fullname)
        if not isinstance(module, types.ModuleType):
            LOG.debug('sys.modules[%r] absent or not a regular module',
                      fullname)
            return

        modpath = self._py_filename(getattr(module, '__file__', ''))
        if not modpath:
            return

        is_pkg = hasattr(module, '__path__')
        try:
            source = inspect.getsource(module)
        except IOError:
            # Work around inspect.getsourcelines() bug.
            if not is_pkg:
                raise
            source = '\n'

        return (module.__file__.rstrip('co'),
                source,
                hasattr(module, '__path__'))
项目:mitogen    作者:dw    | 项目源码 | 文件源码
def get_boot_command(self):
        source = inspect.getsource(self._first_stage)
        source = textwrap.dedent('\n'.join(source.strip().split('\n')[2:]))
        source = source.replace('    ', '\t')
        source = source.replace('CONTEXT_NAME', self.remote_name)
        encoded = source.encode('zlib').encode('base64').replace('\n', '')
        # We can't use bytes.decode() in 3.x since it was restricted to always
        # return unicode, so codecs.decode() is used instead. In 3.x
        # codecs.decode() requires a bytes object. Since we must be compatible
        # with 2.4 (no bytes literal), an extra .encode() either returns the
        # same str (2.x) or an equivalent bytes (3.x).
        return [
            self.python_path, '-c',
            'from codecs import decode as _;'
            'exec(_(_("%s".encode(),"base64"),"zlib"))' % (encoded,)
        ]
项目:portia2code    作者:scrapinghub    | 项目源码 | 文件源码
def merge_sources(*sources):
    def sort_imports(import_string):
        order = 0
        if import_string.startswith('import .'):
            order = 1
        elif import_string.startswith('from .'):
            order = 3
        elif import_string.startswith('from '):
            order = 2
        return order, import_string
    sources = [getsource(source).splitlines() for source in sources]
    imports = []
    for source in sources:
        for line in source:
            if line.startswith(('from', 'import')):
                imports.append(line)
    without_imports = (line for source in chain(sources) for line in source
                       if not line.startswith(('from', 'import')))
    imports.sort(key=sort_imports)
    return '\n'.join(chain(imports, without_imports))
项目:optimalvibes    作者:littlemika    | 项目源码 | 文件源码
def build_lazy_ie(ie, name):
    valid_url = getattr(ie, '_VALID_URL', None)
    s = ie_template.format(
        name=name,
        bases=', '.join(map(get_base_name, ie.__bases__)),
        valid_url=valid_url,
        module=ie.__module__)
    if ie.suitable.__func__ is not InfoExtractor.suitable.__func__:
        s += '\n' + getsource(ie.suitable)
    if hasattr(ie, '_make_valid_url'):
        # search extractors
        s += make_valid_template.format(valid_url=ie._make_valid_url())
    return s


# find the correct sorting and add the required base classes so that sublcasses
# can be correctly created
项目:bolero    作者:rock-learning    | 项目源码 | 文件源码
def get_class_traits(klass):
    """ Yield all of the documentation for trait definitions on a class object.
    """
    # FIXME: gracefully handle errors here or in the caller?
    source = inspect.getsource(klass)
    cb = CommentBlocker()
    cb.process_file(StringIO(source))
    mod_ast = compiler.parse(source)
    class_ast = mod_ast.node.nodes[0]
    for node in class_ast.code.nodes:
        # FIXME: handle other kinds of assignments?
        if isinstance(node, compiler.ast.Assign):
            name = node.nodes[0].name
            rhs = unparse(node.expr).strip()
            doc = strip_comment_marker(cb.search_for_comment(node.lineno, default=''))
            yield name, rhs, doc
项目:hyperas    作者:maxpumperla    | 项目源码 | 文件源码
def retrieve_data_string(data, verbose=True):
    data_string = inspect.getsource(data)
    first_line = data_string.split("\n")[0]
    indent_length = len(determine_indent(data_string))
    data_string = data_string.replace(first_line, "")
    r = re.compile(r'^\s*return.*')
    last_line = [s for s in reversed(data_string.split("\n")) if r.match(s)][0]
    data_string = data_string.replace(last_line, "")

    split_data = data_string.split("\n")
    for i, line in enumerate(split_data):
        split_data[i] = line[indent_length:] + "\n"
    data_string = ''.join(split_data)
    if verbose:
        print(">>> Data")
        print(with_line_numbers(data_string))
    return data_string
项目:qiime2    作者:qiime2    | 项目源码 | 文件源码
def source(self):
        """

        Returns
        -------
        str
            The source code of this action's callable formatted as Markdown
            text.

        """
        try:
            source = inspect.getsource(self._callable)
        except OSError:
            raise TypeError(
                "Cannot retrieve source code for callable %r" %
                self._callable.__name__)
        return markdown_source_template % {'source': source}
项目:ubc    作者:axiros    | 项目源码 | 文件源码
def parse_func(n, func, prefix, f, s, typ='function'):
    '''inspect function signature
    # TODO: py3 inspect changed.
    '''
    prefix += (n,)
    s.doc[prefix] = {'t': typ, 'd': getsource(func)}
    fn = '.'.join(prefix[1:])
    if not matches(fn, f.fmatch):
        return
    s.funcs.append(fn)
    args = getargspec(func)
    m = { 'pos_args': [], 'doc': doc_tag_line(f, func)
        , 'arg_keys': args.args, 'args': {}}
    defs = list(args.defaults or ())
    for a in reversed(args.args):
        d = m['args'][a] = defs.pop() if defs else 'n.d.'
        if d == 'n.d.':
            m['pos_args'].insert(0, a)
    #m['sig'] = getsig(func, args, m)
    s.reg[fn] = m
    l('added', 'function' , prefix)
项目:execnet    作者:pytest-dev    | 项目源码 | 文件源码
def bootstrap_exec(io, spec):
    try:
        sendexec(
            io,
            inspect.getsource(gateway_base),
            "execmodel = get_execmodel(%r)" % spec.execmodel,
            'io = init_popen_io(execmodel)',
            "io.write('1'.encode('ascii'))",
            "serve(io, id='%s-slave')" % spec.id,
        )
        s = io.read(1)
        assert s == "1".encode('ascii')
    except EOFError:
        ret = io.wait()
        if ret == 255:
            raise HostNotFound(io.remoteaddress)
项目:execnet    作者:pytest-dev    | 项目源码 | 文件源码
def bootstrap_socket(io, id):
    # XXX: switch to spec
    from execnet.gateway_socket import SocketIO

    sendexec(
        io,
        inspect.getsource(gateway_base),
        'import socket',
        inspect.getsource(SocketIO),
        "try: execmodel",
        "except NameError:",
        "   execmodel = get_execmodel('thread')",
        "io = SocketIO(clientsock, execmodel)",
        "io.write('1'.encode('ascii'))",
        "serve(io, id='%s-slave')" % id,
    )
    s = io.read(1)
    assert s == "1".encode('ascii')
项目:hydpy    作者:tyralla    | 项目源码 | 文件源码
def listofmodeluserfunctions(self):
        """User functions of the model class."""
        lines = []
        for (name, member) in vars(self.model.__class__).items():
            if (inspect.isfunction(member) and
                    (name not in ('run', 'new2old')) and
                    ('fastaccess' in inspect.getsource(member))):
                lines.append((name, member))
        run = vars(self.model.__class__).get('run')
        if run is not None:
            lines.append(('run', run))
        for (name, member) in vars(self.model).items():
            if (inspect.ismethod(member) and
                    ('fastaccess' in inspect.getsource(member))):
                lines.append((name, member))
        return lines
项目:dalila    作者:slipguru    | 项目源码 | 文件源码
def get_class_traits(klass):
    """ Yield all of the documentation for trait definitions on a class object.
    """
    # FIXME: gracefully handle errors here or in the caller?
    source = inspect.getsource(klass)
    cb = CommentBlocker()
    cb.process_file(StringIO(source))
    mod_ast = compiler.parse(source)
    class_ast = mod_ast.node.nodes[0]
    for node in class_ast.code.nodes:
        # FIXME: handle other kinds of assignments?
        if isinstance(node, compiler.ast.Assign):
            name = node.nodes[0].name
            rhs = unparse(node.expr).strip()
            doc = strip_comment_marker(cb.search_for_comment(node.lineno, default=''))
            yield name, rhs, doc
项目:intel-manager-for-lustre    作者:intel-hpdd    | 项目源码 | 文件源码
def wait_for_assert(self, lambda_expression, timeout=TEST_TIMEOUT):
        """
        Evaluates lambda_expression once/1s until no AssertionError or hits
        timeout.
        """
        import time
        import inspect

        running_time = 0
        while running_time < timeout:
            try:
                lambda_expression()
            except AssertionError:
                pass
            else:
                break
            time.sleep(1)
            running_time += 1
        self.assertLess(running_time, timeout, "Timed out waiting for %s." % inspect.getsource(lambda_expression))
项目:intel-manager-for-lustre    作者:intel-hpdd    | 项目源码 | 文件源码
def wait_until_true(self, lambda_expression, error_message='', timeout=TEST_TIMEOUT):
        """Evaluates lambda_expression once/1s until True or hits timeout."""
        assert hasattr(lambda_expression, '__call__'), 'lambda_expression is not callable: %s' % type(lambda_expression)
        assert hasattr(error_message, '__call__') or type(error_message) is str, 'error_message is not callable and not a str: %s' % type(error_message)
        assert type(timeout) == int, 'timeout is not an int: %s' % type(timeout)

        running_time = 0
        lambda_result = None
        wait_time = 0.01
        while not lambda_result and running_time < timeout:
            lambda_result = lambda_expression()
            logger.debug("%s evaluated to %s" % (inspect.getsource(lambda_expression), lambda_result))

            if not lambda_result:
                time.sleep(wait_time)
                wait_time = min(1, wait_time * 10)
                running_time += wait_time

        if hasattr(error_message, '__call__'):
            error_message = error_message()

        self.assertLess(running_time,
                        timeout,
                        'Timed out waiting for %s\nError Message %s' % (inspect.getsource(lambda_expression), error_message))
项目:intel-manager-for-lustre    作者:intel-hpdd    | 项目源码 | 文件源码
def wait_for_assert(self, lambda_expression, timeout=TEST_TIMEOUT):
        """
        Evaluates lambda_expression once/1s until no AssertionError or hits
        timeout.
        """
        running_time = 0
        assertion = None
        while running_time < timeout:
            try:
                lambda_expression()
            except AssertionError, e:
                assertion = e
                logger.debug("%s tripped assertion: %s" % (inspect.getsource(lambda_expression), e))
            else:
                break
            time.sleep(1)
            running_time += 1
        self.assertLess(running_time,
                        timeout,
                        "Timed out waiting for %s\nAssertion %s" % (inspect.getsource(lambda_expression), assertion))
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_proceed_with_fake_filename(self):
        '''doctest monkeypatches linecache to enable inspection'''
        fn, source = '<test>', 'def x(): pass\n'
        getlines = linecache.getlines
        def monkey(filename, module_globals=None):
            if filename == fn:
                return source.splitlines(True)
            else:
                return getlines(filename, module_globals)
        linecache.getlines = monkey
        try:
            ns = {}
            exec compile(source, fn, 'single') in ns
            inspect.getsource(ns["x"])
        finally:
            linecache.getlines = getlines
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_proceed_with_fake_filename(self):
        '''doctest monkeypatches linecache to enable inspection'''
        fn, source = '<test>', 'def x(): pass\n'
        getlines = linecache.getlines
        def monkey(filename, module_globals=None):
            if filename == fn:
                return source.splitlines(True)
            else:
                return getlines(filename, module_globals)
        linecache.getlines = monkey
        try:
            ns = {}
            exec compile(source, fn, 'single') in ns
            inspect.getsource(ns["x"])
        finally:
            linecache.getlines = getlines
项目:ml-utils    作者:LinxiFan    | 项目源码 | 文件源码
def interpret_fraction(f):
    assert inspect.isfunction(f)
    members = dict(inspect.getmembers(f))
    global_dict = members["__globals__"]
    source_filename = inspect.getsourcefile(f)
    f_ast = ast.parse(inspect.getsource(f))
    _, starting_line = inspect.getsourcelines(f)
    # ast_demo.increment_lineno(f_ast, starting_line - 1)
    # print("AST:", ast_demo.dump(f_ast))
    visitor = FractionInterpreter()
    new_ast = visitor.visit(f_ast)
    # print(ast_demo.dump(new_ast))
    ast.fix_missing_locations(new_ast)
    co = compile(new_ast, '<ast_demo>', 'exec')
    fake_locals = {}
    # exec will define the new function into fake_locals scope
    # this is to avoid conflict with vars in the real locals()
    # https://stackoverflow.com/questions/24733831/using-a-function-defined-in-an-execed-string-in-python-3
    exec(co, None, fake_locals)
    # new_f = locals()[visitor._new_func_name(f.__name__)]
    return fake_locals[f.__name__]
项目:ml-utils    作者:LinxiFan    | 项目源码 | 文件源码
def mess_control(f, *, debug=0):
    assert inspect.isfunction(f)
    members = dict(inspect.getmembers(f))
    global_dict = members["__globals__"]
    source_filename = inspect.getsourcefile(f)
    f_ast = ast.parse(inspect.getsource(f))
    _, starting_line = inspect.getsourcelines(f)
    # ast_demo.increment_lineno(f_ast, starting_line - 1)
    if debug:
        print("AST:", ast.dump(f_ast))
    visitor = ControlMess()
    new_ast = visitor.visit(f_ast)
    if debug:
        print('NEW AST:', ast.dump(new_ast))
    ast.fix_missing_locations(new_ast)
    co = compile(new_ast, '<ast_demo>', 'exec')
    fake_locals = {}
    # exec will define the new function into fake_locals scope
    # this is to avoid conflict with vars in the real locals()
    # https://stackoverflow.com/questions/24733831/using-a-function-defined-in-an-execed-string-in-python-3
    exec(co, None, fake_locals)
    # new_f = locals()[visitor._new_func_name(f.__name__)]
    return fake_locals[f.__name__]
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def __init__(self, *parts, **kwargs):
        self.lines = lines = []
        de = kwargs.get('deindent', True)
        rstrip = kwargs.get('rstrip', True)
        for part in parts:
            if not part:
                partlines = []
            if isinstance(part, Source):
                partlines = part.lines
            elif isinstance(part, (tuple, list)):
                partlines = [x.rstrip("\n") for x in part]
            elif isinstance(part, py.builtin._basestring):
                partlines = part.split('\n')
                if rstrip:
                    while partlines:
                        if partlines[-1].strip():
                            break
                        partlines.pop()
            else:
                partlines = getsource(part, deindent=de).lines
            if de:
                partlines = deindent(partlines)
            lines.extend(partlines)
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def __init__(self, *parts, **kwargs):
        self.lines = lines = []
        de = kwargs.get('deindent', True)
        rstrip = kwargs.get('rstrip', True)
        for part in parts:
            if not part:
                partlines = []
            if isinstance(part, Source):
                partlines = part.lines
            elif isinstance(part, (tuple, list)):
                partlines = [x.rstrip("\n") for x in part]
            elif isinstance(part, py.builtin._basestring):
                partlines = part.split('\n')
                if rstrip:
                    while partlines:
                        if partlines[-1].strip():
                            break
                        partlines.pop()
            else:
                partlines = getsource(part, deindent=de).lines
            if de:
                partlines = deindent(partlines)
            lines.extend(partlines)
项目:icing    作者:slipguru    | 项目源码 | 文件源码
def get_class_traits(klass):
    """ Yield all of the documentation for trait definitions on a class object.
    """
    # FIXME: gracefully handle errors here or in the caller?
    source = inspect.getsource(klass)
    cb = CommentBlocker()
    cb.process_file(StringIO(source))
    mod_ast = compiler.parse(source)
    class_ast = mod_ast.node.nodes[0]
    for node in class_ast.code.nodes:
        # FIXME: handle other kinds of assignments?
        if isinstance(node, compiler.ast.Assign):
            name = node.nodes[0].name
            rhs = unparse(node.expr).strip()
            doc = strip_comment_marker(cb.search_for_comment(node.lineno, default=''))
            yield name, rhs, doc
项目:esdc-ce    作者:erigones    | 项目源码 | 文件源码
def local_settings_update(self, changes):
        """Update local_settings.py with new content created according to the changes parameter.
         The changes parameter should be a list generated by check_modules()"""
        if not local_settings:
            raise SystemError('Missing local_settings.py!')

        logger.info('Creating new local_settings.py with following changes: %s', self._show_changes(changes))
        target = inspect.getsourcefile(local_settings)
        data = self._local_settings_new(changes)
        backup = inspect.getsource(local_settings)

        logger.warn('Updating %s', target)
        self._save_file(target, data)

        try:
            reload_module(local_settings)
        except ImportError as e:
            logger.exception(e)
            logger.warn('Restoring %s from backup', target)
            self._save_file(target, backup)
        else:
            # Force reloading of django settings
            settings._wrapped = empty
项目:contact_map    作者:dwhswenson    | 项目源码 | 文件源码
def write_version_py(filename):
    # Adding the git rev number needs to be done inside write_version_py(),
    # otherwise the import of numpy.version messes up the build under Python 3.
    git_version_code = inspect.getsource(get_git_version)
    if os.path.exists('.git'):
        GIT_REVISION = get_git_version()
    else:
        GIT_REVISION = 'Unknown'

    content = VERSION_PY_CONTENT % {
        'version': PACKAGE_VERSION,
        'git_revision': GIT_REVISION,
        'is_release': str(IS_RELEASE),
        'git_version_code': str(git_version_code)
    }

    with open(filename, 'w') as version_file:
        version_file.write(content)


################################################################################
# Installation
################################################################################
项目:esper    作者:scanner-research    | 项目源码 | 文件源码
def query(name):
    frame = inspect.stack()[1]
    module = inspect.getmodule(frame[0])
    filename = module.__file__
    dataset = os.path.abspath(filename).split('/')[-2]

    def wrapper(f):
        lines = inspect.getsource(f).split('\n')
        lines = lines[:-1]  # Seems to include a trailing newline

        # Hacky way to get just the function body
        i = 0
        while True:
            if "():" in lines[i]:
                break
            i = i + 1

        fn = lines[i:]
        fn += ['FN = ' + f.__name__]
        queries[dataset].append([name, '\n'.join(fn)])

        return f

    return wrapper
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_proceed_with_fake_filename(self):
        '''doctest monkeypatches linecache to enable inspection'''
        fn, source = '<test>', 'def x(): pass\n'
        getlines = linecache.getlines
        def monkey(filename, module_globals=None):
            if filename == fn:
                return source.splitlines(keepends=True)
            else:
                return getlines(filename, module_globals)
        linecache.getlines = monkey
        try:
            ns = {}
            exec(compile(source, fn, 'single'), ns)
            inspect.getsource(ns["x"])
        finally:
            linecache.getlines = getlines
项目:pycos    作者:pgiri    | 项目源码 | 文件源码
def _run_request(self, request, where, cpu, gen, *args, **kwargs):
        """Internal use only.
        """
        if isinstance(gen, str):
            name = gen
        else:
            name = gen.func_name

        if name in self._xfer_funcs:
            code = None
        else:
            # if not inspect.isgeneratorfunction(gen):
            #     logger.warning('"%s" is not a valid generator function', name)
            #     raise StopIteration([])
            code = inspect.getsource(gen).lstrip()

        def _run_req(task=None):
            msg = {'req': 'job', 'auth': self._auth,
                   'job': _DispycosJob_(request, task, name, where, cpu, code, args, kwargs)}
            if (yield self.scheduler.deliver(msg, timeout=MsgTimeout)) == 1:
                reply = yield task.receive()
                if isinstance(reply, Task):
                    if self.status_task:
                        msg = DispycosTaskInfo(reply, args, kwargs, time.time())
                        self.status_task.send(DispycosStatus(Scheduler.TaskCreated, msg))
                if not request.endswith('async'):
                    reply = yield task.receive()
            else:
                reply = None
            raise StopIteration(reply)

        yield Task(_run_req).finish()
项目:pycos    作者:pgiri    | 项目源码 | 文件源码
def _run_request(self, request, where, cpu, gen, *args, **kwargs):
        """Internal use only.
        """
        if isinstance(gen, str):
            name = gen
        else:
            name = gen.__name__

        if name in self._xfer_funcs:
            code = None
        else:
            # if not inspect.isgeneratorfunction(gen):
            #     logger.warning('"%s" is not a valid generator function', name)
            #     raise StopIteration([])
            code = inspect.getsource(gen).lstrip()

        def _run_req(task=None):
            msg = {'req': 'job', 'auth': self._auth,
                   'job': _DispycosJob_(request, task, name, where, cpu, code, args, kwargs)}
            if (yield self.scheduler.deliver(msg, timeout=MsgTimeout)) == 1:
                reply = yield task.receive()
                if isinstance(reply, Task):
                    if self.status_task:
                        msg = DispycosTaskInfo(reply, args, kwargs, time.time())
                        self.status_task.send(DispycosStatus(Scheduler.TaskCreated, msg))
                if not request.endswith('async'):
                    reply = yield task.receive()
            else:
                reply = None
            raise StopIteration(reply)

        yield Task(_run_req).finish()
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def __new__(cls, name, bases, dct):
    slots = dct.get('__slots__', [])
    orig_slots = []
    for base in bases:
        if hasattr(base, "__slots__"):
        orig_slots += base.__slots__
    if '__init__' in dct:
        init = dct['__init__']
        initproc = type.__new__(cls, name, bases, dct)
        initproc_source = inspect.getsource(initproc)
        ast = compile(initproc_source, "dont_care", 'exec', _ast.PyCF_ONLY_AST)
        classdef = ast.body[0]
        stmts = classdef.body
        for declaration in stmts:
        if isinstance(declaration, _ast.FunctionDef):
            name1 = declaration.name
            if name1 == '__init__': # delete this line if you do not initialize all instance variables in __init__
            initbody = declaration.body
            for statement in initbody:
                if isinstance(statement, _ast.Assign):
                for target in statement.targets:
                    name1 = target.attr
                    if name1 not in orig_slots:
                    slots.append(name1)
        if slots:
        dct['__slots__'] = slots
    return type.__new__(cls, name, bases, dct)
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def runtime(f):
    """Evaluates the given function each time it is called."""
    # get the function's name
    name = f.__name__
    # and its source code, sans decorator
    source = remove_decorators(getsource(f))
    @wraps(f)
    def wrapped(*args, **kwargs):
        # execute the function's declaration
        exec(source)
        # since the above overwrites its name in the local
        # scope we can call it here using eval
        return eval("%s(*%s, **%s)" % (name, args, kwargs))
    return wrapped
项目:mycroft-light    作者:MatthewScholefield    | 项目源码 | 文件源码
def get_info(obj):
    try:
        lines = inspect.getsource(obj).split('\n')
        lines = [i for i in lines if '@' not in i]  # filter out decorators
        return lines[0].replace('def ', '').replace('):', ')').replace('  ', '').replace(
            obj.__name__, '').replace('self, ', '').replace('self', '')
    except:
        return None
项目:catalearn    作者:Catalearn    | 项目源码 | 文件源码
def get_source(func):
    raw_source = inspect.getsource(func)
    print(raw_source)
    corrected_source = correct_indentation(raw_source)
    print(corrected_source)
    return corrected_source
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def getsource(obj, **kwargs):
    obj = py.code.getrawcode(obj)
    try:
        strsrc = inspect.getsource(obj)
    except IndentationError:
        strsrc = "\"Buggy python version consider upgrading, cannot get source\""
    assert isinstance(strsrc, str)
    return Source(strsrc, **kwargs)
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def getstatementrange_ast(lineno, source, assertion=False, astnode=None):
    if astnode is None:
        content = str(source)
        if sys.version_info < (2,7):
            content += "\n"
        try:
            astnode = compile(content, "source", "exec", 1024)  # 1024 for AST
        except ValueError:
            start, end = getstatementrange_old(lineno, source, assertion)
            return None, start, end
    start, end = get_statement_startend2(lineno, astnode)
    # we need to correct the end:
    # - ast-parsing strips comments
    # - there might be empty lines
    # - we might have lesser indented code blocks at the end
    if end is None:
        end = len(source.lines)

    if end > start + 1:
        # make sure we don't span differently indented code blocks
        # by using the BlockFinder helper used which inspect.getsource() uses itself
        block_finder = inspect.BlockFinder()
        # if we start with an indented line, put blockfinder to "started" mode
        block_finder.started = source.lines[start][0].isspace()
        it = ((x + "\n") for x in source.lines[start:end])
        try:
            for tok in tokenize.generate_tokens(lambda: next(it)):
                block_finder.tokeneater(*tok)
        except (inspect.EndOfBlock, IndentationError):
            end = block_finder.last + start
        except Exception:
            pass

    # the end might still point to a comment or empty line, correct it
    while end:
        line = source.lines[end - 1].lstrip()
        if line.startswith("#") or not line:
            end -= 1
        else:
            break
    return astnode, start, end
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def getsource(obj, **kwargs):
    import _pytest._code
    obj = _pytest._code.getrawcode(obj)
    try:
        strsrc = inspect.getsource(obj)
    except IndentationError:
        strsrc = "\"Buggy python version consider upgrading, cannot get source\""
    assert isinstance(strsrc, str)
    return Source(strsrc, **kwargs)
项目:django-modeltrans    作者:zostera    | 项目源码 | 文件源码
def get_helper_functions(self):
        def to_str(fn):
            return inspect.getsource(fn) if callable(fn) else fn

        for fn in self.helper_functions:
            yield to_str(fn)

        for fn in self.get_extra_helper_functions():
            yield to_str(fn)
项目:pygorithm    作者:OmkarPathak    | 项目源码 | 文件源码
def get_code():
        """
        returns the code for the current class
        """
        return inspect.getsource(Dijkstra)
项目:pygorithm    作者:OmkarPathak    | 项目源码 | 文件源码
def get_code():
        """
        returns the code for the current class
        """
        return inspect.getsource(OneDirectionalAStar)