我们从Python开源项目中,提取了以下22个代码示例,用于说明如何使用six.exec_()。
def parse_user_config(config, source_code=None): try: if source_code is None: with codecs.open(config["base"]["strategy_file"], encoding="utf-8") as f: source_code = f.read() scope = {} code = compile(source_code, config["base"]["strategy_file"], 'exec') six.exec_(code, scope) __config__ = scope.get("__config__", {}) deep_update(__config__, config) for sub_key, sub_dict in six.iteritems(__config__): if sub_key not in config["whitelist"]: continue deep_update(sub_dict, config[sub_key]) except Exception as e: system_log.error(_('in parse_user_config, exception: {e}').format(e=e)) finally: return config
def parse_user_config_from_code(config, source_code=None): try: if source_code is None: with codecs.open(config["base"]["strategy_file"], encoding="utf-8") as f: source_code = f.read() scope = {} code = compile(source_code, config["base"]["strategy_file"], 'exec') six.exec_(code, scope) __config__ = scope.get("__config__", {}) for sub_key, sub_dict in six.iteritems(__config__): if sub_key not in config["whitelist"]: continue deep_update(sub_dict, config[sub_key]) except Exception as e: system_log.error(_(u"in parse_user_config, exception: {e}").format(e=e)) finally: return config
def get_wrapped(func, wrapper_template, evaldict): # Preserve the argspec for the wrapped function so that testing # tools such as pytest can continue to use their fixture injection. args, a, kw, defaults = inspect.getargspec(func) signature = inspect.formatargspec(args, a, kw, defaults) is_bound_method = hasattr(func, '__self__') if is_bound_method: args = args[1:] # Omit 'self' callargs = inspect.formatargspec(args, a, kw, None) ctx = {'signature': signature, 'funcargs': callargs} six.exec_(wrapper_template % ctx, evaldict) wrapper = evaldict['wrapper'] update_wrapper(wrapper, func) if is_bound_method: wrapper = wrapper.__get__(func.__self__, type(func.__self__)) return wrapper
def test_exec_(): def f(): l = [] six.exec_("l.append(1)") assert l == [1] f() ns = {} six.exec_("x = 42", ns) assert ns["x"] == 42 glob = {} loc = {} six.exec_("global y; y = 42; x = 12", glob, loc) assert glob["y"] == 42 assert "x" not in glob assert loc["x"] == 12 assert "y" not in loc
def test_big_linenos(self): def func(count): namespace = {} func = "def foo():\n " + "".join(["\n "] * count + ["spam\n"]) six.exec_(func, namespace) return namespace['foo'] # Test all small ranges big_lineno_format = _BIG_LINENO_FORMAT_36 if PY36 else _BIG_LINENO_FORMAT for i in range(1, 300): expected = big_lineno_format % (i + 2) self.do_disassembly(func(i), expected) # Test some larger ranges too for i in range(300, 5000, 10): expected = big_lineno_format % (i + 2) self.do_disassembly(func(i), expected)
def code_config(config, source_code=None): try: if source_code is None: with codecs.open(config["base"]["strategy_file"], encoding="utf-8") as f: source_code = f.read() # FIXME: hardcode for parametric mod def noop(*args, **kwargs): pass scope = {'define_parameter': noop} code = compile(source_code, config["base"]["strategy_file"], 'exec') six.exec_(code, scope) return scope.get('__config__', {}) except Exception as e: system_log.error(_(u"in parse_user_config, exception: {e}").format(e=e)) return {}
def load_module(self, full_name): orig = full_name.split('.')[-1] if orig not in sklearn.__all__: raise ImportError('%s not in sklearn' % orig) mod = sys.modules.setdefault(full_name, imp.new_module(full_name)) mod.__file__ = '' mod.__name__ = full_name mod.__path__ = '' mod.__loader__ = self mod.__package__ = '.'.join(full_name.split('.')[:-1]) code = _code.substitute({'mod_name': orig}) six.exec_(code, mod.__dict__) return mod
def _set_signature(mock, original, instance=False): # creates a function with signature (*args, **kwargs) that delegates to a # mock. It still does signature checking by calling a lambda with the same # signature as the original. if not _callable(original): return skipfirst = isinstance(original, ClassTypes) result = _get_signature_object(original, instance, skipfirst) if result is None: return func, sig = result def checksig(*args, **kwargs): sig.bind(*args, **kwargs) _copy_func_details(func, checksig) name = original.__name__ if not _isidentifier(name): name = 'funcopy' context = {'_checksig_': checksig, 'mock': mock} src = """def %s(*args, **kwargs): _checksig_(*args, **kwargs) return mock(*args, **kwargs)""" % name six.exec_(src, context) funcopy = context[name] _setup_func(funcopy, mock) return funcopy
def visitCode(self, code): if code.buffer: val = code.val.lstrip() val = self.var_processor(val) val = self._do_eval(val) if code.escape: val = str(val).replace('&', '&').replace('<', '<').replace('>', '>') self.buf.append(val) if code.block: self.visit(code.block) if not code.buffer and not code.block: six.exec_(code.val.lstrip(), self.global_context, self.local_context)
def load_module(self, fullname): if self.mod is None: self.mod = mod = imp.new_module(fullname) else: mod = self.mod mod.__file__ = '<%s>' % self.name mod.__loader__ = self mod.__project__ = self.project mod.__package__ = '' code = self.get_code(fullname) six.exec_(code, mod.__dict__) linecache.clearcache() if sys.version_info[:2] == (3, 3): sys.modules[fullname] = mod return mod
def remove_coding(text): """ Remove the coding comment, which six.exec_ doesn't like. """ sub_re = re.compile("^#\s*-\*-\s*coding:\s*.*-\*-$", flags=re.MULTILINE) return sub_re.sub("", text) # ------------------------------------------------------------------------------ # Template # ------------------------------------------------------------------------------
def byte_EXEC_STMT(self): stmt, globs, locs = self.popn(3) six.exec_(stmt, globs, locs)
def load_module(self, fullname): if self.mod is None: self.mod = mod = imp.new_module(fullname) else: mod = self.mod mod.__file__ = '<%s>' % self.name mod.__loader__ = self mod.__project__ = self.project mod.__package__ = '' code = self.get_code(fullname) six.exec_(code, mod.__dict__) linecache.clearcache() return mod
def execwrap(content, from_file=None): from_file = from_file or '<stdin>' if isinstance(content, six.string_types): content = compile_text(content, from_file=from_file) def _inner(): global_env = exec_globals() local_env = global_env six.exec_(content, global_env, local_env) return global_env globals_ = {} output_handler = None try: with reopen_stdout_stderr() as output_handler: globals_ = _inner() except Exception: if output_handler is not None: output = "%s%s" % (output_handler.read(), _exception(from_file)) else: output = _exception(from_file) else: output = output_handler.read() output = strutil.ensure_text(output) return output, globals_
def render_expression(expression, bind, stream=None): """Generate a SQL expression from the passed python expression. Only the global variable, `engine`, is available for use in the expression. Additional local variables may be passed in the context parameter. Note this function is meant for convenience and protected usage. Do NOT blindly pass user input to this function as it uses exec. :param bind: A SQLAlchemy engine or bind URL. :param stream: Render all DDL operations to the stream. """ # Create a stream if not present. if stream is None: stream = six.moves.cStringIO() engine = create_mock_engine(bind, stream) # Navigate the stack and find the calling frame that allows the # expression to execuate. for frame in inspect.stack()[1:]: try: frame = frame[0] local = dict(frame.f_locals) local['engine'] = engine six.exec_(expression, frame.f_globals, local) break except: pass else: raise ValueError('Not a valid python expression', engine) return stream
def compile_strategy(source_code, strategy, scope): try: code = compile(source_code, strategy, 'exec') six.exec_(code, scope) return scope except Exception as e: exc_type, exc_val, exc_tb = sys.exc_info() exc_val = patch_user_exc(exc_val, force=True) try: msg = str(exc_val) except Exception as e1: msg = "" six.print_(e1) error = CustomError() error.set_msg(msg) error.set_exc(exc_type, exc_val, exc_tb) stackinfos = list(traceback.extract_tb(exc_tb)) if isinstance(e, (SyntaxError, IndentationError)): error.add_stack_info(exc_val.filename, exc_val.lineno, "", exc_val.text) else: for item in stackinfos: filename, lineno, func_name, code = item if strategy == filename: error.add_stack_info(*item) # avoid empty stack if error.stacks_length == 0: error.add_stack_info(*item) raise CustomException(error)
def remove_coding(text): """ Remove the coding comment, which six.exec_ doesn't like. """ sub_re = re.compile("^#\s*-\*-\s*coding:\s*.*-\*-$", flags=re.MULTILINE) return sub_re.sub("", text) #------------------------------------------------------------------------------ # Template #------------------------------------------------------------------------------
def mock_engine(engine, stream=None): """Mocks out the engine specified in the passed bind expression. Note this function is meant for convenience and protected usage. Do NOT blindly pass user input to this function as it uses exec. :param engine: A python expression that represents the engine to mock. :param stream: Render all DDL operations to the stream. """ # Create a stream if not present. if stream is None: stream = six.moves.cStringIO() # Navigate the stack and find the calling frame that allows the # expression to execuate. for frame in inspect.stack()[1:]: try: frame = frame[0] expression = '__target = %s' % engine six.exec_(expression, frame.f_globals, frame.f_locals) target = frame.f_locals['__target'] break except: pass else: raise ValueError('Not a valid python expression', engine) # Evaluate the expression and get the target engine. frame.f_locals['__mock'] = create_mock_engine(target, stream) # Replace the target with our mock. six.exec_('%s = __mock' % engine, frame.f_globals, frame.f_locals) # Give control back. yield stream # Put the target engine back. frame.f_locals['__target'] = target six.exec_('%s = __target' % engine, frame.f_globals, frame.f_locals) six.exec_('del __target', frame.f_globals, frame.f_locals) six.exec_('del __mock', frame.f_globals, frame.f_locals)
def main(argv=None): args = parser.parse_args(argv) args.nodepy_path.insert(0, '.') if args.version: print(VERSION) return 0 args.pmd = check_pmd_envvar() or args.pmd sys.argv = [sys.argv[0]] + args.request[1:] maindir = pathlib.Path(args.maindir) if args.maindir else pathlib.Path.cwd() ctx = nodepy.context.Context(maindir) # Updat the module search path. args.nodepy_path.insert(0, ctx.modules_directory) ctx.resolver.paths.extend(x for x in map(pathlib.Path, args.nodepy_path) if x.is_dir()) ctx.localimport.path.extend(args.python_path) # Create the module in which we run the REPL or the command # specified via -c. if args.c or not args.request: filename = nodepy.utils.path.VoidPath('<repl>') directory = pathlib.Path.cwd() repl_module = ReplModule(ctx, None, filename, directory) repl_module.init() with ctx.enter(): if args.pmd: install_pmd(ctx) if args.c: repl_module.set_exec_handler(lambda: six.exec_(args.c, vars(repl_module.namespace))) repl_module.load() if args.request: try: filename = path.urlpath.make(args.request[0]) except ValueError: filename = args.request[0] ctx.main_module = ctx.resolve(filename) if not args.keep_arg0: sys.argv[0] = str(ctx.main_module.filename) ctx.main_module.init() if args.pymain: ctx.main_module.namespace.__name__ = '__main__' ctx.load_module(ctx.main_module, do_init=False) elif not args.c: ctx.main_module = repl_module repl_module.set_exec_handler(lambda: code.interact('', local=vars(repl_module.namespace))) repl_module.load()