Python six 模块,exec_() 实例源码

我们从Python开源项目中,提取了以下22个代码示例,用于说明如何使用six.exec_()

项目:InplusTrader_Linux    作者:zhengwsh    | 项目源码 | 文件源码
def parse_user_config(config, source_code=None):
    try:
        if source_code is None:
            with codecs.open(config["base"]["strategy_file"], encoding="utf-8") as f:
                source_code = f.read()

        scope = {}

        code = compile(source_code, config["base"]["strategy_file"], 'exec')
        six.exec_(code, scope)

        __config__ = scope.get("__config__", {})

        deep_update(__config__, config)

        for sub_key, sub_dict in six.iteritems(__config__):
            if sub_key not in config["whitelist"]:
                continue
            deep_update(sub_dict, config[sub_key])

    except Exception as e:
        system_log.error(_('in parse_user_config, exception: {e}').format(e=e))
    finally:
        return config
项目:InplusTrader_Linux    作者:zhengwsh    | 项目源码 | 文件源码
def parse_user_config_from_code(config, source_code=None):
    try:
        if source_code is None:
            with codecs.open(config["base"]["strategy_file"], encoding="utf-8") as f:
                source_code = f.read()

        scope = {}

        code = compile(source_code, config["base"]["strategy_file"], 'exec')
        six.exec_(code, scope)

        __config__ = scope.get("__config__", {})

        for sub_key, sub_dict in six.iteritems(__config__):
            if sub_key not in config["whitelist"]:
                continue
            deep_update(sub_dict, config[sub_key])

    except Exception as e:
        system_log.error(_(u"in parse_user_config, exception: {e}").format(e=e))
    finally:
        return config
项目:RPoint    作者:george17-meet    | 项目源码 | 文件源码
def get_wrapped(func, wrapper_template, evaldict):
    # Preserve the argspec for the wrapped function so that testing
    # tools such as pytest can continue to use their fixture injection.
    args, a, kw, defaults = inspect.getargspec(func)

    signature = inspect.formatargspec(args, a, kw, defaults)
    is_bound_method = hasattr(func, '__self__')
    if is_bound_method:
        args = args[1:]     # Omit 'self'
    callargs = inspect.formatargspec(args, a, kw, None)

    ctx = {'signature': signature, 'funcargs': callargs}
    six.exec_(wrapper_template % ctx, evaldict)

    wrapper = evaldict['wrapper']

    update_wrapper(wrapper, func)
    if is_bound_method:
        wrapper = wrapper.__get__(func.__self__, type(func.__self__))
    return wrapper
项目:obsoleted-vpduserv    作者:InfraSIM    | 项目源码 | 文件源码
def test_exec_():
    def f():
        l = []
        six.exec_("l.append(1)")
        assert l == [1]
    f()
    ns = {}
    six.exec_("x = 42", ns)
    assert ns["x"] == 42
    glob = {}
    loc = {}
    six.exec_("global y; y = 42; x = 12", glob, loc)
    assert glob["y"] == 42
    assert "x" not in glob
    assert loc["x"] == 12
    assert "y" not in loc
项目:Deploy_XXNET_Server    作者:jzp820927    | 项目源码 | 文件源码
def test_exec_():
    def f():
        l = []
        six.exec_("l.append(1)")
        assert l == [1]
    f()
    ns = {}
    six.exec_("x = 42", ns)
    assert ns["x"] == 42
    glob = {}
    loc = {}
    six.exec_("global y; y = 42; x = 12", glob, loc)
    assert glob["y"] == 42
    assert "x" not in glob
    assert loc["x"] == 12
    assert "y" not in loc
项目:python-xdis    作者:rocky    | 项目源码 | 文件源码
def test_big_linenos(self):
        def func(count):
            namespace = {}
            func = "def foo():\n " + "".join(["\n "] * count + ["spam\n"])
            six.exec_(func, namespace)
            return namespace['foo']

        # Test all small ranges
        big_lineno_format = _BIG_LINENO_FORMAT_36 if PY36 else _BIG_LINENO_FORMAT
        for i in range(1, 300):
            expected = big_lineno_format % (i + 2)
            self.do_disassembly(func(i), expected)

        # Test some larger ranges too
        for i in range(300, 5000, 10):
            expected = big_lineno_format % (i + 2)
            self.do_disassembly(func(i), expected)
项目:rqalpha    作者:ricequant    | 项目源码 | 文件源码
def code_config(config, source_code=None):
    try:
        if source_code is None:
            with codecs.open(config["base"]["strategy_file"], encoding="utf-8") as f:
                source_code = f.read()

        # FIXME: hardcode for parametric mod
        def noop(*args, **kwargs):
            pass
        scope = {'define_parameter': noop}

        code = compile(source_code, config["base"]["strategy_file"], 'exec')
        six.exec_(code, scope)

        return scope.get('__config__', {})
    except Exception as e:
        system_log.error(_(u"in parse_user_config, exception: {e}").format(e=e))
        return {}
项目:six    作者:benjaminp    | 项目源码 | 文件源码
def test_exec_():
    def f():
        l = []
        six.exec_("l.append(1)")
        assert l == [1]
    f()
    ns = {}
    six.exec_("x = 42", ns)
    assert ns["x"] == 42
    glob = {}
    loc = {}
    six.exec_("global y; y = 42; x = 12", glob, loc)
    assert glob["y"] == 42
    assert "x" not in glob
    assert loc["x"] == 12
    assert "y" not in loc
项目:ibex    作者:atavory    | 项目源码 | 文件源码
def load_module(self, full_name):
        orig = full_name.split('.')[-1]

        if orig not in sklearn.__all__:
            raise ImportError('%s not in sklearn' % orig)

        mod = sys.modules.setdefault(full_name, imp.new_module(full_name))
        mod.__file__ = ''
        mod.__name__ = full_name
        mod.__path__ = ''
        mod.__loader__ = self
        mod.__package__ = '.'.join(full_name.split('.')[:-1])

        code = _code.substitute({'mod_name': orig})

        six.exec_(code, mod.__dict__)

        return mod
项目:auger    作者:laffra    | 项目源码 | 文件源码
def _set_signature(mock, original, instance=False):
    # creates a function with signature (*args, **kwargs) that delegates to a
    # mock. It still does signature checking by calling a lambda with the same
    # signature as the original.
    if not _callable(original):
        return

    skipfirst = isinstance(original, ClassTypes)
    result = _get_signature_object(original, instance, skipfirst)
    if result is None:
        return
    func, sig = result
    def checksig(*args, **kwargs):
        sig.bind(*args, **kwargs)
    _copy_func_details(func, checksig)

    name = original.__name__
    if not _isidentifier(name):
        name = 'funcopy'
    context = {'_checksig_': checksig, 'mock': mock}
    src = """def %s(*args, **kwargs):
    _checksig_(*args, **kwargs)
    return mock(*args, **kwargs)""" % name
    six.exec_(src, context)
    funcopy = context[name]
    _setup_func(funcopy, mock)
    return funcopy
项目:RPoint    作者:george17-meet    | 项目源码 | 文件源码
def _set_signature(mock, original, instance=False):
    # creates a function with signature (*args, **kwargs) that delegates to a
    # mock. It still does signature checking by calling a lambda with the same
    # signature as the original.
    if not _callable(original):
        return

    skipfirst = isinstance(original, ClassTypes)
    result = _get_signature_object(original, instance, skipfirst)
    if result is None:
        return
    func, sig = result
    def checksig(*args, **kwargs):
        sig.bind(*args, **kwargs)
    _copy_func_details(func, checksig)

    name = original.__name__
    if not _isidentifier(name):
        name = 'funcopy'
    context = {'_checksig_': checksig, 'mock': mock}
    src = """def %s(*args, **kwargs):
    _checksig_(*args, **kwargs)
    return mock(*args, **kwargs)""" % name
    six.exec_(src, context)
    funcopy = context[name]
    _setup_func(funcopy, mock)
    return funcopy
项目:pypugjs    作者:matannoam    | 项目源码 | 文件源码
def visitCode(self, code):
        if code.buffer:
            val = code.val.lstrip()
            val = self.var_processor(val)
            val = self._do_eval(val)
            if code.escape:
                val = str(val).replace('&', '&amp;').replace('<', '&lt;').replace('>', '&gt;')
            self.buf.append(val)
        if code.block:
            self.visit(code.block)
        if not code.buffer and not code.block:
            six.exec_(code.val.lstrip(), self.global_context, self.local_context)
项目:Url    作者:beiruan    | 项目源码 | 文件源码
def load_module(self, fullname):
        if self.mod is None:
            self.mod = mod = imp.new_module(fullname)
        else:
            mod = self.mod
        mod.__file__ = '<%s>' % self.name
        mod.__loader__ = self
        mod.__project__ = self.project
        mod.__package__ = ''
        code = self.get_code(fullname)
        six.exec_(code, mod.__dict__)
        linecache.clearcache()
        if sys.version_info[:2] == (3, 3):
            sys.modules[fullname] = mod
        return mod
项目:terra    作者:UW-Hydro    | 项目源码 | 文件源码
def remove_coding(text):
    """
    Remove the coding comment, which six.exec_ doesn't like.
    """
    sub_re = re.compile("^#\s*-\*-\s*coding:\s*.*-\*-$", flags=re.MULTILINE)
    return sub_re.sub("", text)

# ------------------------------------------------------------------------------
# Template
# ------------------------------------------------------------------------------
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def byte_EXEC_STMT(self):
        stmt, globs, locs = self.popn(3)
        six.exec_(stmt, globs, locs)
项目:fulmar    作者:tylderen    | 项目源码 | 文件源码
def load_module(self, fullname):
        if self.mod is None:
            self.mod = mod = imp.new_module(fullname)
        else:
            mod = self.mod
        mod.__file__ = '<%s>' % self.name
        mod.__loader__ = self
        mod.__project__ = self.project
        mod.__package__ = ''
        code = self.get_code(fullname)
        six.exec_(code, mod.__dict__)
        linecache.clearcache()
        return mod
项目:Snipping    作者:yittg    | 项目源码 | 文件源码
def execwrap(content, from_file=None):
    from_file = from_file or '<stdin>'
    if isinstance(content, six.string_types):
        content = compile_text(content, from_file=from_file)

    def _inner():
        global_env = exec_globals()
        local_env = global_env
        six.exec_(content, global_env, local_env)
        return global_env

    globals_ = {}
    output_handler = None
    try:
        with reopen_stdout_stderr() as output_handler:
            globals_ = _inner()
    except Exception:
        if output_handler is not None:
            output = "%s%s" % (output_handler.read(),
                               _exception(from_file))
        else:
            output = _exception(from_file)
    else:
        output = output_handler.read()

    output = strutil.ensure_text(output)

    return output, globals_
项目:deb-python-sqlalchemy-utils    作者:openstack    | 项目源码 | 文件源码
def render_expression(expression, bind, stream=None):
    """Generate a SQL expression from the passed python expression.

    Only the global variable, `engine`, is available for use in the
    expression. Additional local variables may be passed in the context
    parameter.

    Note this function is meant for convenience and protected usage. Do NOT
    blindly pass user input to this function as it uses exec.

    :param bind: A SQLAlchemy engine or bind URL.
    :param stream: Render all DDL operations to the stream.
    """

    # Create a stream if not present.

    if stream is None:
        stream = six.moves.cStringIO()

    engine = create_mock_engine(bind, stream)

    # Navigate the stack and find the calling frame that allows the
    # expression to execuate.

    for frame in inspect.stack()[1:]:
        try:
            frame = frame[0]
            local = dict(frame.f_locals)
            local['engine'] = engine
            six.exec_(expression, frame.f_globals, local)
            break
        except:
            pass
    else:
        raise ValueError('Not a valid python expression', engine)

    return stream
项目:rqalpha    作者:ricequant    | 项目源码 | 文件源码
def compile_strategy(source_code, strategy, scope):
    try:
        code = compile(source_code, strategy, 'exec')
        six.exec_(code, scope)
        return scope
    except Exception as e:
        exc_type, exc_val, exc_tb = sys.exc_info()
        exc_val = patch_user_exc(exc_val, force=True)
        try:
            msg = str(exc_val)
        except Exception as e1:
            msg = ""
            six.print_(e1)

        error = CustomError()
        error.set_msg(msg)
        error.set_exc(exc_type, exc_val, exc_tb)
        stackinfos = list(traceback.extract_tb(exc_tb))

        if isinstance(e, (SyntaxError, IndentationError)):
            error.add_stack_info(exc_val.filename, exc_val.lineno, "", exc_val.text)
        else:
            for item in stackinfos:
                filename, lineno, func_name, code = item
                if strategy == filename:
                    error.add_stack_info(*item)
            # avoid empty stack
            if error.stacks_length == 0:
                error.add_stack_info(*item)

        raise CustomException(error)
项目:opcsim    作者:dhhagan    | 项目源码 | 文件源码
def remove_coding(text):
    """
    Remove the coding comment, which six.exec_ doesn't like.
    """
    sub_re = re.compile("^#\s*-\*-\s*coding:\s*.*-\*-$", flags=re.MULTILINE)
    return sub_re.sub("", text)

#------------------------------------------------------------------------------
# Template
#------------------------------------------------------------------------------
项目:deb-python-sqlalchemy-utils    作者:openstack    | 项目源码 | 文件源码
def mock_engine(engine, stream=None):
    """Mocks out the engine specified in the passed bind expression.

    Note this function is meant for convenience and protected usage. Do NOT
    blindly pass user input to this function as it uses exec.

    :param engine: A python expression that represents the engine to mock.
    :param stream: Render all DDL operations to the stream.
    """

    # Create a stream if not present.

    if stream is None:
        stream = six.moves.cStringIO()

    # Navigate the stack and find the calling frame that allows the
    # expression to execuate.

    for frame in inspect.stack()[1:]:

        try:
            frame = frame[0]
            expression = '__target = %s' % engine
            six.exec_(expression, frame.f_globals, frame.f_locals)
            target = frame.f_locals['__target']
            break

        except:
            pass

    else:

        raise ValueError('Not a valid python expression', engine)

    # Evaluate the expression and get the target engine.

    frame.f_locals['__mock'] = create_mock_engine(target, stream)

    # Replace the target with our mock.

    six.exec_('%s = __mock' % engine, frame.f_globals, frame.f_locals)

    # Give control back.

    yield stream

    # Put the target engine back.

    frame.f_locals['__target'] = target
    six.exec_('%s = __target' % engine, frame.f_globals, frame.f_locals)
    six.exec_('del __target', frame.f_globals, frame.f_locals)
    six.exec_('del __mock', frame.f_globals, frame.f_locals)
项目:nodepy    作者:nodepy    | 项目源码 | 文件源码
def main(argv=None):
  args = parser.parse_args(argv)
  args.nodepy_path.insert(0, '.')
  if args.version:
    print(VERSION)
    return 0

  args.pmd = check_pmd_envvar() or args.pmd
  sys.argv = [sys.argv[0]] + args.request[1:]

  maindir = pathlib.Path(args.maindir) if args.maindir else pathlib.Path.cwd()
  ctx = nodepy.context.Context(maindir)

  # Updat the module search path.
  args.nodepy_path.insert(0, ctx.modules_directory)
  ctx.resolver.paths.extend(x for x in map(pathlib.Path, args.nodepy_path) if x.is_dir())
  ctx.localimport.path.extend(args.python_path)

  # Create the module in which we run the REPL or the command
  # specified via -c.
  if args.c or not args.request:
    filename = nodepy.utils.path.VoidPath('<repl>')
    directory = pathlib.Path.cwd()
    repl_module = ReplModule(ctx, None, filename, directory)
    repl_module.init()

  with ctx.enter():
    if args.pmd:
      install_pmd(ctx)
    if args.c:
      repl_module.set_exec_handler(lambda: six.exec_(args.c, vars(repl_module.namespace)))
      repl_module.load()
    if args.request:
      try:
        filename = path.urlpath.make(args.request[0])
      except ValueError:
        filename = args.request[0]
      ctx.main_module = ctx.resolve(filename)
      if not args.keep_arg0:
        sys.argv[0] = str(ctx.main_module.filename)
      ctx.main_module.init()
      if args.pymain:
        ctx.main_module.namespace.__name__ = '__main__'
      ctx.load_module(ctx.main_module, do_init=False)
    elif not args.c:
      ctx.main_module = repl_module
      repl_module.set_exec_handler(lambda: code.interact('', local=vars(repl_module.namespace)))
      repl_module.load()