我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用contextlib.contextmanager()。
def reconfigure(**kwargs): """Reconfigures the given kwargs, restoring the current configuration for only those kwargs when the contextmanager exits. """ conf_namespace = staticconf.config.get_namespace(namespace) starting_config = { k: v for k, v in conf_namespace.get_config_values().iteritems() if k in kwargs } configure_from_dict(kwargs) try: yield finally: final_config = { k: v for k, v in conf_namespace.get_config_values().iteritems() if k not in kwargs } final_config.update(starting_config) staticconf.config.get_namespace(namespace).clear() configure_from_dict(final_config)
def capture_new_data_pipeline_messages(topic): """contextmanager that moves to the tail of the given topic, and waits to receive new messages, returning a function that can be called zero or more times which will retrieve decoded data pipeline messages from the topic. Returns: Callable[[int], List[Message]]: Function that takes a single optional argument, count, and returns up to count decoded data pipeline messages. This function does not block, and will return however many messages are available immediately. Default count is 100. """ with capture_new_messages(topic) as get_kafka_messages: def get_data_pipeline_messages(count=100): kafka_messages = get_kafka_messages(count) return [ create_from_offset_and_message(kafka_message) for kafka_message in kafka_messages ] yield get_data_pipeline_messages
def randommock(): """Returns a contextmanager that mocks random.random() at a specific value Usage:: def test_something(randommock): with randommock(0.55): # test stuff... """ @contextlib.contextmanager def _randommock(value): with mock.patch('random.random') as mock_random: mock_random.return_value = value yield return _randommock
def _get_logger_for_contextmanager(log): """Get the canonical logger from a context manager. Parameters ---------- log : Logger or None The explicit logger passed to the context manager. Returns ------- log : Logger The logger to use in the context manager. """ if log is not None: return log # We need to walk up through the context manager, then through # @contextmanager and finally into the top level calling frame. return _logger_for_frame(_getframe(3))
def test_generator_based_context_manager_throw(): @contextlib.contextmanager @_core.enable_ki_protection def protected_manager(): assert _core.currently_ki_protected() try: yield finally: assert _core.currently_ki_protected() with protected_manager(): assert not _core.currently_ki_protected() with pytest.raises(KeyError): # This is the one that used to fail with protected_manager(): raise KeyError
def cross_validation_lock(self): """ A contextmanager for running a block with our cross validation lock set to True. At the end of the block, the lock's value is restored to its value prior to entering the block. """ if self._cross_validation_lock: yield return else: try: self._cross_validation_lock = True yield finally: self._cross_validation_lock = False
def logged_in(): @contextlib.contextmanager def _login(user): setattr(ctx_stack.top, 'jwt_user', None) if isinstance(user, str) and user == 'NOT_LOGGED_IN': _TOKENS.append(None) res = None else: _TOKENS.append( flask_jwt.create_access_token(identity=user.id, fresh=True) ) res = user yield res _TOKENS.pop(-1) setattr(ctx_stack.top, 'jwt_user', None) yield _login _TOKENS.clear() _TOKENS.clear()
def test_weak_keys_destroy_while_iterating(self): # Issue #7105: iterators shouldn't crash when a key is implicitly removed dict, objects = self.make_weak_keyed_dict() self.check_weak_destroy_while_iterating(dict, objects, 'keys') self.check_weak_destroy_while_iterating(dict, objects, 'items') self.check_weak_destroy_while_iterating(dict, objects, 'values') self.check_weak_destroy_while_iterating(dict, objects, 'keyrefs') dict, objects = self.make_weak_keyed_dict() @contextlib.contextmanager def testcontext(): try: it = iter(dict.items()) next(it) # Schedule a key/value for removal and recreate it v = objects.pop().arg gc.collect() # just in case yield Object(v), v finally: it = None # should commit all removals self.check_weak_destroy_and_mutate_while_iterating(dict, testcontext)
def test_weak_values_destroy_while_iterating(self): # Issue #7105: iterators shouldn't crash when a key is implicitly removed dict, objects = self.make_weak_valued_dict() self.check_weak_destroy_while_iterating(dict, objects, 'keys') self.check_weak_destroy_while_iterating(dict, objects, 'items') self.check_weak_destroy_while_iterating(dict, objects, 'values') self.check_weak_destroy_while_iterating(dict, objects, 'itervaluerefs') self.check_weak_destroy_while_iterating(dict, objects, 'valuerefs') dict, objects = self.make_weak_valued_dict() @contextlib.contextmanager def testcontext(): try: it = iter(dict.items()) next(it) # Schedule a key/value for removal and recreate it k = objects.pop().arg gc.collect() # just in case yield k, Object(k) finally: it = None # should commit all removals self.check_weak_destroy_and_mutate_while_iterating(dict, testcontext)
def test_weak_keys_destroy_while_iterating(self): # Issue #7105: iterators shouldn't crash when a key is implicitly removed dict, objects = self.make_weak_keyed_dict() self.check_weak_destroy_while_iterating(dict, objects, 'iterkeys') self.check_weak_destroy_while_iterating(dict, objects, 'iteritems') self.check_weak_destroy_while_iterating(dict, objects, 'itervalues') self.check_weak_destroy_while_iterating(dict, objects, 'iterkeyrefs') dict, objects = self.make_weak_keyed_dict() @contextlib.contextmanager def testcontext(): try: it = iter(dict.iteritems()) next(it) # Schedule a key/value for removal and recreate it v = objects.pop().arg gc.collect() # just in case yield Object(v), v finally: it = None # should commit all removals gc.collect() self.check_weak_destroy_and_mutate_while_iterating(dict, testcontext)
def test_weak_values_destroy_while_iterating(self): # Issue #7105: iterators shouldn't crash when a key is implicitly removed dict, objects = self.make_weak_valued_dict() self.check_weak_destroy_while_iterating(dict, objects, 'iterkeys') self.check_weak_destroy_while_iterating(dict, objects, 'iteritems') self.check_weak_destroy_while_iterating(dict, objects, 'itervalues') self.check_weak_destroy_while_iterating(dict, objects, 'itervaluerefs') dict, objects = self.make_weak_valued_dict() @contextlib.contextmanager def testcontext(): try: it = iter(dict.iteritems()) next(it) # Schedule a key/value for removal and recreate it k = objects.pop().arg gc.collect() # just in case yield k, Object(k) finally: it = None # should commit all removals gc.collect() self.check_weak_destroy_and_mutate_while_iterating(dict, testcontext)
def redirect_stderr(x): """ Redirects stderr to another file-like object. This is some compatibility code to support Python 3.4. """ if hasattr(contextlib, 'redirect_stderr'): result = contextlib.redirect_stderr else: @contextlib.contextmanager def result(x): """ Stand-in for Python 3.5's `redirect_stderr`. Notes: Non-reentrant, non-threadsafe """ old_stderr = sys.stderr sys.stderr = x yield sys.stder = old_stderr return result(x) ###############################################################################
def setup_with_context_manager(testcase, cm): """ Use a contextmanager in a test setUp that persists until teardown. So instead of: with ctxmgr(a, b, c) as v: # do something with v that only persists for the `with` statement use: def setUp(self): self.v = setup_with_context_manager(self, ctxmgr(a, b, c)) def test_foo(self): # do something with self.v """ val = cm.__enter__() testcase.addCleanup(cm.__exit__, None, None, None) return val
def open_expiring(filename, at, unless_modified=True, unless_accessed=True, *args): """A contextmanager that provides a open file descriptor to a file that will be scheduled for expiry on exit. :param str filename: The file to open and schedule for expiry. :param at: A datetime object or string as recognized by the `at` command. :param bool unless_modified: Whether a condition has to be added to the job expiry script to expire the path only if it has not been modified since it was scheduled for expiry. Default: True :param bool unless_accessed: Whether a condition has to be added to the job expiry script to expire the path only if it has not been accessed since it was scheduled for expiry. Default: True :param *args: Any additional arguments to be passed on to the `open` builtin. :return: An open file object. :rtype: file """ try: with open(filename, *args) as fd: yield fd except: raise else: expire_path(filename, at, unless_modified, unless_accessed)
def test_weak_keys_destroy_while_iterating(self): # Issue #7105: iterators shouldn't crash when a key is implicitly removed dict, objects = self.make_weak_keyed_dict() self.check_weak_destroy_while_iterating(dict, objects, 'keys') self.check_weak_destroy_while_iterating(dict, objects, 'items') self.check_weak_destroy_while_iterating(dict, objects, 'values') self.check_weak_destroy_while_iterating(dict, objects, 'keyrefs') dict, objects = self.make_weak_keyed_dict() @contextlib.contextmanager def testcontext(): try: it = iter(dict.items()) next(it) # Schedule a key/value for removal and recreate it v = objects.pop().arg gc.collect() # just in case yield Object(v), v finally: it = None # should commit all removals gc.collect() self.check_weak_destroy_and_mutate_while_iterating(dict, testcontext) # Issue #21173: len() fragile when keys are both implicitly and # explicitly removed. dict, objects = self.make_weak_keyed_dict() self.check_weak_del_and_len_while_iterating(dict, testcontext)
def test_weak_values_destroy_while_iterating(self): # Issue #7105: iterators shouldn't crash when a key is implicitly removed dict, objects = self.make_weak_valued_dict() self.check_weak_destroy_while_iterating(dict, objects, 'keys') self.check_weak_destroy_while_iterating(dict, objects, 'items') self.check_weak_destroy_while_iterating(dict, objects, 'values') self.check_weak_destroy_while_iterating(dict, objects, 'itervaluerefs') self.check_weak_destroy_while_iterating(dict, objects, 'valuerefs') dict, objects = self.make_weak_valued_dict() @contextlib.contextmanager def testcontext(): try: it = iter(dict.items()) next(it) # Schedule a key/value for removal and recreate it k = objects.pop().arg gc.collect() # just in case yield k, Object(k) finally: it = None # should commit all removals gc.collect() self.check_weak_destroy_and_mutate_while_iterating(dict, testcontext) dict, objects = self.make_weak_valued_dict() self.check_weak_del_and_len_while_iterating(dict, testcontext)
def ViewMethod(w, tname, visibility, out_tname, name, params=(), throws=(), override=False): view_tname = 'View' if tname == out_tname else out_tname + '.View' with Method(w, visibility, view_tname, name, params, throws, override): @contextlib.contextmanager def view(): w('return new {}(model) {{', view_tname) with w.indent(): @contextlib.contextmanager def enumerator_method(src_var=None): with EnumeratorMethod(w, 'public', out_tname, source=src_var and (src_var, 'View.this', tname)) as enumerator: yield enumerator yield enumerator_method yield view w('}};')
def check_flake8(file_name, contents): from flake8.main import check_code if not file_name.endswith(".py"): raise StopIteration @contextlib.contextmanager def stdout_redirect(where): sys.stdout = where try: yield where finally: sys.stdout = sys.__stdout__ ignore = { "W291", # trailing whitespace; the standard tidy process will enforce no trailing whitespace "E501", # 80 character line length; the standard tidy process will enforce line length } output = StringIO.StringIO() with stdout_redirect(output): check_code(contents, ignore=ignore) for error in output.getvalue().splitlines(): _, line_num, _, message = error.split(":", 3) yield line_num, message.strip()
def z_add_owned_piece(self, p): #freeze:wasfrozen = False #freeze:if self.isFrozen: #freeze: wasfrozen = True #freeze: self.unfreeze() self.owned_pieces.add(p) self._update() #freeze:if wasfrozen: #freeze: self.freeze() #freeze:def freeze(self): #freeze: self.isFrozen = True #freeze: self.owned_pieces = list(self.owned_pieces) #freeze:def unfreeze(self): #freeze: self.isFrozen = False #freeze: self.owned_pieces = set(self.owned_pieces) #freeze:@contextlib.contextmanager #freeze:def frozen(self): #freeze: self.freeze() #freeze: yield #freeze: self.unfreeze()
def set_game(self, g): self.game = g self.cfg.set_game(g) #freeze:def freeze(self): #freeze: """Lock the board in place.""" #freeze: self.isfrozen = True #freeze: #self.pieces = list(self.pieces) #freeze: for player in self.players: #freeze: player.freeze() #freeze:def unfreeze(self): #freeze: """Unlock the board.""" #freeze: self.isfrozen = False #freeze: #self.pieces = set(self.pieces) #freeze: for player in self.players: #freeze: player.unfreeze() #freeze:@contextlib.contextmanager #freeze:def frozen(self): #freeze: self.freeze() #freeze: yield #freeze: self.unfreeze()
def singleton_contextmanager(func): class CtxManager(): def __init__(self, func): self.count = 0 self.func_cm = contextmanager(func) self._lock = RLock() def __enter__(self): with self._lock: if self.count == 0: self.ctm = self.func_cm() self.obj = self.ctm.__enter__() self.count += 1 def __exit__(self, *args): with self._lock: self.count -= 1 if self.count > 0: return self.ctm.__exit__(*sys.exc_info()) del self.ctm del self.obj return CtxManager(func)
def singleton_contextmanager_method(func): cached_attr_name = '__singleton_contextmanager_method__' + func.__name__ # Wrap with a context manager to get proper IPython documentation @contextmanager @wraps(func) def inner(self): with _singleton_contextmanager_method_attr_lock: try: cm = getattr(self, cached_attr_name) except AttributeError: cm = singleton_contextmanager(partial(func, self)) setattr(self, cached_attr_name, cm) with cm as val: yield val return inner
def open(filename, **credentials): """ A contextmanager to open the KeePass file with `filename`. Use a `password` and/or `keyfile` named argument for decryption. Files are identified using their signature and a reader suitable for the file format is intialized and returned. Note: `keyfile` is currently not supported for v3 KeePass files. """ kdb = None try: with io.open(filename, 'rb') as stream: signature = common.read_signature(stream) cls = get_kdb_reader(signature) kdb = cls(stream, **credentials) yield kdb kdb.close() except: if kdb: kdb.close() raise
def test_weak_keys_destroy_while_iterating(self): # Issue #7105: iterators shouldn't crash when a key is implicitly removed dict, objects = self.make_weak_keyed_dict() self.check_weak_destroy_while_iterating(dict, objects, 'keys') self.check_weak_destroy_while_iterating(dict, objects, 'items') self.check_weak_destroy_while_iterating(dict, objects, 'values') self.check_weak_destroy_while_iterating(dict, objects, 'keyrefs') dict, objects = self.make_weak_keyed_dict() @contextlib.contextmanager def testcontext(): try: it = iter(dict.items()) next(it) # Schedule a key/value for removal and recreate it v = objects.pop().arg gc.collect() # just in case yield Object(v), v finally: it = None # should commit all removals gc.collect() self.check_weak_destroy_and_mutate_while_iterating(dict, testcontext)
def test_weak_values_destroy_while_iterating(self): # Issue #7105: iterators shouldn't crash when a key is implicitly removed dict, objects = self.make_weak_valued_dict() self.check_weak_destroy_while_iterating(dict, objects, 'keys') self.check_weak_destroy_while_iterating(dict, objects, 'items') self.check_weak_destroy_while_iterating(dict, objects, 'values') self.check_weak_destroy_while_iterating(dict, objects, 'itervaluerefs') self.check_weak_destroy_while_iterating(dict, objects, 'valuerefs') dict, objects = self.make_weak_valued_dict() @contextlib.contextmanager def testcontext(): try: it = iter(dict.items()) next(it) # Schedule a key/value for removal and recreate it k = objects.pop().arg gc.collect() # just in case yield k, Object(k) finally: it = None # should commit all removals gc.collect() self.check_weak_destroy_and_mutate_while_iterating(dict, testcontext)
def run_xz_decompress(stdout, stderr=None): """Run xz --decompress and return a contextmanager object for the proc.""" xz = None try: xz = subprocess.Popen(["xz", "--decompress", "--stdout"], stdin=subprocess.PIPE, stdout=stdout, stderr=stderr) yield xz finally: if not xz: raise OSError("You must have an 'xz' binary in PATH.") xz.stdin.close() result = xz.wait() if result != 0: raise IOError("xz --decompress returned %d." % result)
def query2(self, query, bindata=None): """ Alias for query method which make use of the python 'with' statement. The contextmanager approach ensures that the generated cursor object instance is always closed whenever the execution goes out of the 'with' statement block. Example: >> with self.console.storage.query(query) as cursor: >> if not cursor.EOF: >> cursor.getRow() >> ... :param query: The query to execute. :param bindata: Data to bind to the given query. """ cursor = None try: cursor = self.query(query, bindata) yield cursor finally: if cursor: cursor.close()
def reconfigure(ns=namespace, **kwargs): """Reconfigures the given kwargs, restoring the current configuration for only those kwargs when the contextmanager exits. Args: ns: Namespace of the conf """ conf_namespace = staticconf.config.get_namespace(ns) starting_config = { k: v for k, v in conf_namespace.get_config_values().iteritems() if k in kwargs } staticconf.DictConfiguration(kwargs, namespace=ns) try: yield finally: final_config = { k: v for k, v in conf_namespace.get_config_values().iteritems() if k not in kwargs } final_config.update(starting_config) staticconf.config.get_namespace(ns).clear() staticconf.DictConfiguration(final_config, namespace=ns)
def _register_signal_handlers(self): """Register the handler SIGUSR2, which will toggle a profiler on and off. """ try: signal.signal(signal.SIGINT, self._handle_shutdown_signal) signal.signal(signal.SIGTERM, self._handle_shutdown_signal) signal.signal(signal.SIGUSR2, self._handle_profiler_signal) yield finally: # Cleanup for the profiler signal handler has to happen here, # because signals that are handled don't unwind up the stack in the # way that normal methods do. Any contextmanager or finally # statement won't live past the handler function returning. signal.signal(signal.SIGUSR2, signal.SIG_DFL) if self._profiler_running: self._disable_profiler()
def test_create_without_pause(self): self.flags(virt_type='lxc', group='libvirt') @contextlib.contextmanager def fake_lxc_disk_handler(*args, **kwargs): yield drvr = libvirt_driver.LibvirtDriver(fake.FakeVirtAPI(), False) instance = objects.Instance(**self.test_instance) with test.nested( mock.patch.object(drvr, '_lxc_disk_handler', side_effect=fake_lxc_disk_handler), mock.patch.object(drvr, 'plug_vifs'), mock.patch.object(drvr, 'firewall_driver'), mock.patch.object(drvr, '_create_domain'), mock.patch.object(drvr, 'cleanup')) as ( _handler, cleanup, firewall_driver, create, plug_vifs): domain = drvr._create_domain_and_network(self.context, 'xml', instance, None, None) self.assertEqual(0, create.call_args_list[0][1]['pause']) self.assertEqual(0, domain.resume.call_count)
def test_contextmanager_finally(self): state = [] @contextmanager def woohoo(): state.append(1) try: yield 42 finally: state.append(999) with self.assertRaises(ZeroDivisionError): with woohoo() as x: self.assertEqual(state, [1]) self.assertEqual(x, 42) state.append(x) raise ZeroDivisionError() self.assertEqual(state, [1, 42, 999])
def test_contextmanager_except(self): state = [] @contextmanager def woohoo(): state.append(1) try: yield 42 except ZeroDivisionError as e: state.append(e.args[0]) self.assertEqual(state, [1, 42, 999]) with woohoo() as x: self.assertEqual(state, [1]) self.assertEqual(x, 42) state.append(x) raise ZeroDivisionError(999) self.assertEqual(state, [1, 42, 999])
def test_contextmanager_except_pep479(self): code = """\ from __future__ import generator_stop from contextlib import contextmanager @contextmanager def woohoo(): yield """ locals = {} exec(code, locals, locals) woohoo = locals['woohoo'] stop_exc = StopIteration('spam') try: with woohoo(): raise stop_exc except Exception as ex: self.assertIs(ex, stop_exc) else: self.fail('StopIteration was suppressed')
def test_contextmanager_as_decorator(self): @contextmanager def woohoo(y): state.append(y) yield state.append(999) state = [] @woohoo(1) def test(x): self.assertEqual(state, [1]) state.append(x) test('something') self.assertEqual(state, [1, 'something', 999]) # Issue #11647: Ensure the decorated function is 'reusable' state = [] test('something else') self.assertEqual(state, [1, 'something else', 999]) # Detailed exception chaining checks only make sense on Python 3
def setup_with_context_manager(testcase, cm): """Use a contextmanager to setUp a test case. If you have a context manager you like:: with ctxmgr(a, b, c) as v: # do something with v and you want to have that effect for a test case, call this function from your setUp, and it will start the context manager for your test, and end it when the test is done:: def setUp(self): self.v = setup_with_context_manager(self, ctxmgr(a, b, c)) def test_foo(self): # do something with self.v """ val = cm.__enter__() testcase.addCleanup(cm.__exit__, None, None, None) return val
def contextmanager(fn): def oops(*args, **kw): raise RuntimeError("Python 2.5 or above is required to use " "context managers.") oops.__name__ = fn.__name__ return oops
def observation(observe, notify): """Simple boilerplate to link to the 'with' statement. Contextlib's contextmanager decorator is a very convenient way to create simple context managers, specifically the __enter__ and __exit__ special methods. """ proxy = Observation(observe, notify) try: yield proxy finally: proxy.close()
def named_context(self, name): @contextlib.contextmanager def context(): self.named_contexts.append(name) try: yield finally: self.assertEqual(self.named_contexts.pop(), name) return context
def _validContext(func): # Defined inside USBContext so we can access "self.__*". @contextlib.contextmanager def refcount(self): with self.__context_cond: if not self.__context_p and self.__auto_open: # BBB warnings.warn( 'Use "with USBContext() as context:" for safer cleanup' ' on interpreter shutdown. See also USBContext.open().', DeprecationWarning, ) self.open() self.__context_refcount += 1 try: yield finally: with self.__context_cond: self.__context_refcount -= 1 if not self.__context_refcount: self.__context_cond.notifyAll() if inspect.isgeneratorfunction(func): def wrapper(self, *args, **kw): with refcount(self): if self.__context_p: # pylint: disable=not-callable for value in func(self, *args, **kw): # pylint: enable=not-callable yield value else: def wrapper(self, *args, **kw): with refcount(self): if self.__context_p: # pylint: disable=not-callable return func(self, *args, **kw) # pylint: enable=not-callable functools.update_wrapper(wrapper, func) return wrapper # pylint: enable=no-self-argument,protected-access