我们从Python开源项目中,提取了以下46个代码示例,用于说明如何使用contextlib.ExitStack()。
def _predict(predictors: Dict[str, str]): def predict_inner(args: argparse.Namespace) -> None: predictor = _get_predictor(args, predictors) output_file = None if args.silent and not args.output_file: print("--silent specified without --output-file.") print("Exiting early because no output will be created.") sys.exit(0) # ExitStack allows us to conditionally context-manage `output_file`, which may or may not exist with ExitStack() as stack: input_file = stack.enter_context(args.input_file) # type: ignore if args.output_file: output_file = stack.enter_context(args.output_file) # type: ignore _run(predictor, input_file, output_file, args.batch_size, not args.silent, args.cuda_device) return predict_inner
def adversarial_discriminator(net, layers, scope='adversary', leaky=False): if leaky: activation_fn = tflearn.activations.leaky_relu else: activation_fn = tf.nn.relu with ExitStack() as stack: stack.enter_context(tf.variable_scope(scope)) stack.enter_context( slim.arg_scope( [slim.fully_connected], activation_fn=activation_fn, weights_regularizer=slim.l2_regularizer(2.5e-5))) for dim in layers: net = slim.fully_connected(net, dim) net = slim.fully_connected(net, 2, activation_fn=None) return net
def search(timeout): ''' Search for devices implementing WANCommonInterfaceConfig on the network. Search ends the specified number of seconds after the last result (if any) was received. Returns an iterator of root device URLs. ''' with contextlib.ExitStack() as stack: sockets = [] sockets.append(stack.enter_context(socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP))) sockets.append(stack.enter_context(socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP))) for s in sockets: s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) if s.family == socket.AF_INET6: s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1) with concurrent.futures.ThreadPoolExecutor(len(sockets)) as ex: return itertools.chain.from_iterable(ex.map(lambda s: search_socket(s, timeout, ns['i']), sockets))
def write_split_emote(cls): retweet_re = re.compile(r'RT\s*"?[@?][a-zA-Z0-9_]+:?') with open('emote/all.txt', encoding='utf-8') as in_sr, ExitStack() as stack: out_srs = { name: stack.enter_context(open('emote/class_{}.txt'.format(name), 'w', encoding='utf-8')) for name in ['pos', 'neg'] } for i, line in enumerate(in_sr): if retweet_re.search(line): continue counts = [0, 0, 0] for match in cls.emoticon_re.finditer(line): counts[Emoticons.assess_match(match)] += 1 label = None if counts[0] > 0 and counts[1] == 0 and counts[2] == 0: label = 0 elif counts[0] == 0 and counts[1] == 0 and counts[2] > 0: label = 2 if label is not None: out_srs[label].write(cls.emoticon_re.sub(' ', line).strip() + '\n')
def post(url: str, data: object, file_paths: List[str] = None, username: str="", password: str=""): request = { "url": url, "data": json.dumps(data, cls=JSONFloatEncoder) } if username: if not password: password = getpass.getpass("Enter password for '{}': ".format(username)) request["auth"] = (username, password) with ExitStack() as es: if file_paths: request["data"] = {"data": request["data"]} request["files"] = [__create_file_tuple(path, es) for path in file_paths] response = requests.post(**request) response.raise_for_status()
def test_exit_stack_exception_propagate(): h1 = mock.MagicMock() h2 = mock.MagicMock() v1 = mock.MagicMock() v2 = mock.MagicMock() error = ValueError('FUUU') with pytest.raises(ValueError) as exc: with ExitStack() as stack: v = stack.enter_context(AutoClose(h1, v=v1)) assert v is v1 v = stack.enter_context(AutoClose(h2, v=v2)) assert v is v2 raise error assert exc.value is error h2.close.assert_called_once_with(ValueError, error, mock.ANY) h1.close.assert_called_once_with(ValueError, error, mock.ANY)
def __call__(self): status = 1 with log_exception(status=1): args = self.parser.parse_args() log_args(args) config.log_cached() logger = getLogger('django') with ExitStack() as stack: if self._pid_file: stack.enter_context(pid_file(dirname=config.PID_DIR, max_age=self._pid_file_max_age)) if self._stoppable: self._stoppable_instance = stoppable() stack.enter_context(self._stoppable_instance) status = self.run(args, logger) or 0 sys.exit(status)
def setUp(self): super().setUp() self._resources = ExitStack() self.addCleanup(self._resources.close) # Capture builtin print() output. self._stdout = StringIO() self._stderr = StringIO() self._resources.enter_context( patch('argparse._sys.stdout', self._stdout)) # Capture stderr since this is where argparse will spew to. self._resources.enter_context( patch('argparse._sys.stderr', self._stderr)) # Set up a few other useful things for these tests. self._resources.enter_context( patch('ubuntu_image.__main__.logging.basicConfig')) self.model_assertion = resource_filename( 'ubuntu_image.tests.data', 'model.assertion') self.classic_gadget_tree = resource_filename( 'ubuntu_image.tests.data', 'gadget_tree')
def test_duplicate_volume_name(self): with ExitStack() as resources: cm = resources.enter_context( self.assertRaises(GadgetSpecificationError)) parse("""\ volumes: first: bootloader: u-boot structure: - name: one type: 00000000-0000-0000-0000-0000deadbeef size: 400M second: structure: - name: two type: 00000000-0000-0000-0000-0000deadbeef size: 400M first: structure: - name: three type: 00000000-0000-0000-0000-0000deadbeef size: 400M """) self.assertEqual(str(cm.exception), 'Duplicate key: first')
def test_bad_hybrid_volume_type_2(self): with ExitStack() as resources: cm = resources.enter_context( self.assertRaises(GadgetSpecificationError)) parse("""\ volumes: first-image: bootloader: u-boot structure: - type: 00000000-0000-0000-0000-0000deadbeef,\ 00000000-0000-0000-0000-0000deadbeef size: 400M """) self.assertEqual( str(cm.exception), 'Invalid gadget.yaml @ volumes:first-image:structure:0:type')
def test_volume_filesystem_bad(self): with ExitStack() as resources: cm = resources.enter_context( self.assertRaises(GadgetSpecificationError)) parse("""\ volumes: first-image: bootloader: u-boot structure: - type: 00000000-0000-0000-0000-0000deadbeef size: 400M filesystem: zfs """) self.assertEqual( str(cm.exception), ("Invalid gadget.yaml value 'zfs' @ " 'volumes:<volume name>:structure:<N>:filesystem'))
def test_volume_structure_role_system_data_bad_label(self): with ExitStack() as resources: cm = resources.enter_context( self.assertRaises(GadgetSpecificationError)) parse("""\ volumes: first-image: bootloader: u-boot structure: - type: 00000000-0000-0000-0000-0000feedface size: 200 role: system-data filesystem-label: foobar """) self.assertEqual( str(cm.exception), ('`role: system-data` structure must have an implicit label, ' "or 'writable': foobar"))
def test_volume_structure_type_role_conflict_1(self): # type:none means there's no partition, so you can't have a role of # system-{boot,data}. with ExitStack() as resources: cm = resources.enter_context( self.assertRaises(GadgetSpecificationError)) parse(""" volumes: first-image: schema: mbr bootloader: u-boot structure: - type: ef size: 100 role: mbr second-image: structure: - type: bare size: 200 role: system-boot """) self.assertEqual( str(cm.exception), 'Invalid gadget.yaml: structure role/type conflict')
def test_volume_structure_type_role_conflict_2(self): # type:none means there's no partition, so you can't have a role of # system-{boot,data}. with ExitStack() as resources: cm = resources.enter_context( self.assertRaises(GadgetSpecificationError)) parse(""" volumes: first-image: schema: mbr bootloader: u-boot structure: - type: ef size: 100 role: mbr second-image: structure: - type: bare size: 200 role: system-data """) self.assertEqual( str(cm.exception), 'Invalid gadget.yaml: structure role/type conflict')
def test_volume_structure_mbr_conflicting_id(self): with ExitStack() as resources: cm = resources.enter_context( self.assertRaises(GadgetSpecificationError)) parse("""\ volumes: first-image: schema: mbr bootloader: u-boot structure: - type: ef role: mbr size: 100 id: 00000000-0000-0000-0000-0000deadbeef """) self.assertEqual( str(cm.exception), 'mbr structures must not specify partition id')
def test_volume_structure_mbr_conflicting_filesystem(self): with ExitStack() as resources: cm = resources.enter_context( self.assertRaises(GadgetSpecificationError)) parse("""\ volumes: first-image: schema: mbr bootloader: u-boot structure: - type: ef role: mbr size: 100 filesystem: ext4 """) self.assertEqual( str(cm.exception), 'mbr structures must not specify a file system')
def test_volume_special_type_mbr_and_role(self): with ExitStack() as resources: cm = resources.enter_context( self.assertRaises(GadgetSpecificationError)) parse("""\ volumes: first-image: schema: mbr bootloader: u-boot structure: - type: mbr role: mbr size: 100 """) self.assertEqual( str(cm.exception), 'Type mbr and role fields assigned at the same time, please use ' 'the mbr role instead')
def test_volume_special_type_mbr_and_filesystem(self): with ExitStack() as resources: cm = resources.enter_context( self.assertRaises(GadgetSpecificationError)) parse("""\ volumes: first-image: schema: mbr bootloader: u-boot structure: - type: mbr size: 100 filesystem: ext4 """) self.assertEqual( str(cm.exception), 'mbr structures must not specify a file system')
def test_bad_schema(self): with ExitStack() as resources: cm = resources.enter_context( self.assertRaises(GadgetSpecificationError)) parse("""\ volumes: first-image: schema: bad bootloader: u-boot structure: - type: ef size: 400M """) self.assertEqual( str(cm.exception), "Invalid gadget.yaml value 'bad' @ volumes:<volume name>:schema")
def test_mbr_with_bogus_type(self): with ExitStack() as resources: cm = resources.enter_context( self.assertRaises(GadgetSpecificationError)) parse("""\ volumes: first-image: schema: mbr bootloader: u-boot structure: - type: 801 size: 400M """) self.assertEqual( str(cm.exception), 'Invalid gadget.yaml @ volumes:first-image:structure:0:type')
def test_bad_bootloader(self): with ExitStack() as resources: cm = resources.enter_context( self.assertRaises(GadgetSpecificationError)) parse("""\ volumes: first-image: schema: gpt bootloader: u-boat structure: - type: 00000000-0000-0000-0000-0000deadbeef size: 400M """) self.assertEqual( str(cm.exception), ("Invalid gadget.yaml value 'u-boat' @ " 'volumes:<volume name>:bootloader'))
def test_missing_bootloader_multiple_volumes(self): with ExitStack() as resources: cm = resources.enter_context( self.assertRaises(GadgetSpecificationError)) parse("""\ volumes: first-image: schema: gpt structure: - type: 00000000-0000-0000-0000-0000deadbeef size: 400M second-image: schema: gpt structure: - type: 00000000-0000-0000-0000-0000deadbeef size: 400M third-image: schema: gpt structure: - type: 00000000-0000-0000-0000-0000deadbeef size: 400M """) self.assertEqual(str(cm.exception), 'No bootloader structure named')
def test_bad_volume_id(self): with ExitStack() as resources: cm = resources.enter_context( self.assertRaises(GadgetSpecificationError)) parse("""\ volumes: first-image: schema: gpt bootloader: u-boot id: 3g structure: - type: 00000000-0000-0000-0000-0000deadbeef size: 400M """) self.assertEqual(str(cm.exception), 'Invalid gadget.yaml @ volumes:first-image:id')
def test_bad_integer_volume_id(self): with ExitStack() as resources: cm = resources.enter_context( self.assertRaises(GadgetSpecificationError)) parse("""\ volumes: first-image: schema: gpt bootloader: u-boot id: 801 structure: - type: 00000000-0000-0000-0000-0000deadbeef size: 400M """) self.assertEqual(str(cm.exception), 'Invalid gadget.yaml @ volumes:first-image:id')
def test_disallow_hybrid_volume_id(self): with ExitStack() as resources: cm = resources.enter_context( self.assertRaises(GadgetSpecificationError)) parse("""\ volumes: first-image: bootloader: u-boot structure: - type: 00000000-0000-0000-0000-0000deadbeef size: 400M id: 80,00000000-0000-0000-0000-0000deadbeef """) self.assertEqual( str(cm.exception), 'Invalid gadget.yaml @ volumes:first-image:structure:0:id')
def test_volume_offset_write_relative_syntax_error(self): with ExitStack() as resources: cm = resources.enter_context( self.assertRaises(GadgetSpecificationError)) parse("""\ volumes: first-image: schema: gpt bootloader: u-boot structure: - type: 00000000-0000-0000-0000-0000deadbeef size: 400M offset-write: some_label%2112 """) self.assertEqual( str(cm.exception), ('Invalid gadget.yaml @ ' 'volumes:first-image:structure:0:offset-write'))
def test_volume_offset_write_larger_than_32bit(self): with ExitStack() as resources: cm = resources.enter_context( self.assertRaises(GadgetSpecificationError)) parse("""\ volumes: first-image: schema: gpt bootloader: u-boot structure: - type: 00000000-0000-0000-0000-0000deadbeef size: 400M offset-write: 8G """) self.assertEqual( str(cm.exception), ('Invalid gadget.yaml @ ' 'volumes:first-image:structure:0:offset-write'))
def test_volume_offset_write_is_4G(self): # 4GiB is just outside 32 bits. with ExitStack() as resources: cm = resources.enter_context( self.assertRaises(GadgetSpecificationError)) parse("""\ volumes: first-image: schema: gpt bootloader: u-boot structure: - type: 00000000-0000-0000-0000-0000deadbeef size: 400M offset-write: 4G """) self.assertEqual( str(cm.exception), ('Invalid gadget.yaml @ ' 'volumes:first-image:structure:0:offset-write'))
def test_content_spec_b_offset_write_is_4G(self): # 4GiB is just outside 32 bits. with ExitStack() as resources: cm = resources.enter_context( self.assertRaises(GadgetSpecificationError)) parse("""\ volumes: first-image: schema: gpt bootloader: u-boot structure: - type: 00000000-0000-0000-0000-0000deadbeef size: 400M content: - image: foo.img offset-write: 4G """) # XXX https://github.com/alecthomas/voluptuous/issues/239 front, colon, end = str(cm.exception).rpartition(':') self.assertEqual( front, 'Invalid gadget.yaml @ volumes:first-image:structure:0:content:0') self.assertIn(end, ['offset-write', 'image'])
def test_wrong_content_1(self): with ExitStack() as resources: cm = resources.enter_context( self.assertRaises(GadgetSpecificationError)) parse("""\ volumes: first-image: schema: gpt bootloader: u-boot structure: - type: 00000000-0000-0000-0000-0000deadbeef size: 400M filesystem: none content: - source: subdir/ target: / """) self.assertEqual(str(cm.exception), 'filesystem: none missing image file name')
def test_wrong_content_2(self): with ExitStack() as resources: cm = resources.enter_context( self.assertRaises(GadgetSpecificationError)) parse("""\ volumes: first-image: schema: gpt bootloader: u-boot structure: - type: 00000000-0000-0000-0000-0000deadbeef size: 400M filesystem: ext4 content: - image: foo.img """) self.assertEqual(str(cm.exception), 'filesystem: vfat|ext4 missing source/target')
def test_mbr_structure_not_at_offset_zero_explicit(self): with ExitStack() as resources: cm = resources.enter_context( self.assertRaises(GadgetSpecificationError)) parse("""\ volumes: first-image: bootloader: u-boot structure: - role: mbr type: 00000000-0000-0000-0000-0000deadbeef size: 446 offset: 10 """) self.assertEqual(str(cm.exception), 'mbr structure must start at offset 0')
def test_mbr_structure_not_at_offset_zero_implicit(self): with ExitStack() as resources: cm = resources.enter_context( self.assertRaises(GadgetSpecificationError)) parse("""\ volumes: first-image: bootloader: u-boot structure: - type: 00000000-0000-0000-0000-0000deadbeef size: 2M - role: mbr type: 00000000-0000-0000-0000-0000deadbeef size: 446 """) self.assertEqual(str(cm.exception), 'mbr structure must start at offset 0')
def test_mkfs_ext4(self): with ExitStack() as resources: tmpdir = resources.enter_context(TemporaryDirectory()) results_dir = os.path.join(tmpdir, 'results') mock = MountMocker(results_dir) resources.enter_context( patch('ubuntu_image.helpers.run', mock.run)) # Create a temporary directory and populate it with some stuff. contents_dir = resources.enter_context(TemporaryDirectory()) with open(os.path.join(contents_dir, 'a.dat'), 'wb') as fp: fp.write(b'01234') with open(os.path.join(contents_dir, 'b.dat'), 'wb') as fp: fp.write(b'56789') # And a fake image file. img_file = resources.enter_context(NamedTemporaryFile()) mkfs_ext4(img_file, contents_dir) # Two files were put in the "mountpoint" directory, but because of # above, we have to check them in the results copy. with open(os.path.join(mock.results_dir, 'a.dat'), 'rb') as fp: self.assertEqual(fp.read(), b'01234') with open(os.path.join(mock.results_dir, 'b.dat'), 'rb') as fp: self.assertEqual(fp.read(), b'56789')
def test_mkfs_ext4_no_contents(self): with ExitStack() as resources: tmpdir = resources.enter_context(TemporaryDirectory()) results_dir = os.path.join(tmpdir, 'results') mock = MountMocker(results_dir) resources.enter_context( patch('ubuntu_image.helpers.run', mock.run)) # Create a temporary directory, but this time without contents. contents_dir = resources.enter_context(TemporaryDirectory()) # And a fake image file. img_file = resources.enter_context(NamedTemporaryFile()) mkfs_ext4(img_file, contents_dir) # Because there were no contents, the `sudo cp` was never called, # the mock's shutil.copytree() was also never called, therefore # the results_dir was never created. self.assertFalse(os.path.exists(results_dir))
def test_mkfs_ext4_preserve_ownership(self): with ExitStack() as resources: tmpdir = resources.enter_context(TemporaryDirectory()) results_dir = os.path.join(tmpdir, 'results') mock = MountMocker(results_dir) resources.enter_context( patch('ubuntu_image.helpers.run', mock.run)) # Create a temporary directory and populate it with some stuff. contents_dir = resources.enter_context(TemporaryDirectory()) with open(os.path.join(contents_dir, 'a.dat'), 'wb') as fp: fp.write(b'01234') # And a fake image file. img_file = resources.enter_context(NamedTemporaryFile()) mkfs_ext4(img_file, contents_dir, preserve_ownership=True) with open(os.path.join(mock.results_dir, 'a.dat'), 'rb') as fp: self.assertEqual(fp.read(), b'01234') self.assertTrue(mock.preserves_ownership)
def test_hook_fired(self): with ExitStack() as resources: hooksdir = resources.enter_context(TemporaryDirectory()) hookfile = os.path.join(hooksdir, 'test-hook') resultfile = os.path.join(hooksdir, 'result') env = {'UBUNTU_IMAGE_TEST_ENV': 'true'} with open(hookfile, 'w') as fp: fp.write("""\ #!/bin/sh echo -n "$UBUNTU_IMAGE_TEST_ENV" >>{} """.format(resultfile)) os.chmod(hookfile, 0o744) manager = HookManager([hooksdir]) manager.fire('test-hook', env) # Check if the script ran once as expected. self.assertTrue(os.path.exists(resultfile)) with open(resultfile, 'r') as fp: self.assertEqual(fp.read(), 'true')
def test_hook_error(self): with ExitStack() as resources: hooksdir = resources.enter_context(TemporaryDirectory()) hookfile = os.path.join(hooksdir, 'test-hook') with open(hookfile, 'w') as fp: fp.write(dedent("""\ #!/bin/sh echo -n "error" 1>&2 exit 1 """)) os.chmod(hookfile, 0o744) manager = HookManager([hooksdir]) # Check if hook script failures are properly reported with self.assertRaises(HookError) as cm: manager.fire('test-hook') self.assertEqual(cm.exception.hook_name, 'test-hook') self.assertEqual(cm.exception.hook_path, hookfile) self.assertEqual(cm.exception.hook_retcode, 1) self.assertEqual(cm.exception.hook_stderr, 'error')
def atomic(dst, encoding='utf-8'): """Open a temporary file for writing using the given encoding. The context manager returns an open file object, into which you can write text or bytes depending on the encoding it was opened with. Upon exit, the temporary file is moved atomically to the destination. If an exception occurs, the temporary file is removed. :param dst: The path name of the target file. :param encoding: The encoding to use for the open file. If None, then file is opened in binary mode. """ directory = os.path.dirname(dst) mode = 'wb' if encoding is None else 'wt' with ExitStack() as resources: fp = resources.enter_context(NamedTemporaryFile( mode=mode, encoding=encoding, dir=directory, delete=False)) yield fp os.rename(fp.name, dst)
def update_changelog(repo, series, version): # Update d/changelog. with ExitStack() as resources: debian_changelog = os.path.join( repo.working_dir, 'debian', 'changelog') infp = resources.enter_context( open(debian_changelog, 'r', encoding='utf-8')) outfp = resources.enter_context(atomic(debian_changelog)) changelog = Changelog(infp) # Currently, master is always Zesty. changelog.distributions = series series_version = { 'bionic': '18.04', 'artful': '17.10', 'zesty': '17.04', 'xenial': '16.04', }[series] new_version = '{}+{}ubuntu1'.format(version, series_version) changelog.version = new_version changelog.write_to_open_file(outfp) return new_version
def __call__(self, f): @wrapt.decorator def test_with_fixtures(wrapped, instance, args, kwargs): fixture_instances = [i for i in list(args) + list(kwargs.values()) if i.__class__ in self.fixture_classes or i.scenario in self.requested_fixtures] dependency_ordered_fixtures = self.topological_sort_instances(fixture_instances) if six.PY2: with contextlib.nested(*list(dependency_ordered_fixtures)): return wrapped(*args, **kwargs) else: with contextlib.ExitStack() as stack: for fixture in dependency_ordered_fixtures: stack.enter_context(fixture) return wrapped(*args, **kwargs) ff = test_with_fixtures(f) arg_names = self.fixture_arg_names(ff) return pytest.mark.parametrize(','.join(arg_names), self.fixture_permutations())(ff)
def test_create_snapshot(self): raw_traces = [(5, (('a.py', 2),))] with contextlib.ExitStack() as stack: stack.enter_context(patch.object(tracemalloc, 'is_tracing', return_value=True)) stack.enter_context(patch.object(tracemalloc, 'get_traceback_limit', return_value=5)) stack.enter_context(patch.object(tracemalloc, '_get_traces', return_value=raw_traces)) snapshot = tracemalloc.take_snapshot() self.assertEqual(snapshot.traceback_limit, 5) self.assertEqual(len(snapshot.traces), 1) trace = snapshot.traces[0] self.assertEqual(trace.size, 5) self.assertEqual(len(trace.traceback), 1) self.assertEqual(trace.traceback[0].filename, 'a.py') self.assertEqual(trace.traceback[0].lineno, 2)
def setUp(self): old_log_level = log.getEffectiveLevel() self.addCleanup(log.setLevel, old_log_level) self.resources = ExitStack() # Create a new event loop, and arrange for that loop to end almost # immediately. This will allow the calls to main() in these tests to # also exit almost immediately. Otherwise, the foreground test # process will hang. # # I think this introduces a race condition. It depends on whether the # call_later() can possibly run before the run_forever() does, or could # cause it to not complete all its tasks. In that case, you'd likely # get an error or warning on stderr, which may or may not cause the # test to fail. I've only seen this happen once and don't have enough # information to know for sure. default_loop = asyncio.get_event_loop() loop = asyncio.new_event_loop() loop.call_later(0.1, loop.stop) self.resources.callback(asyncio.set_event_loop, default_loop) asyncio.set_event_loop(loop) self.addCleanup(self.resources.close)
def test_exception_handler_exception(self): handler = ErroringErrorHandler() controller = ErrorController(handler) controller.start() self.addCleanup(controller.stop) with ExitStack() as resources: # Suppress logging to the console during the tests. Depending on # timing, the exception may or may not be logged. resources.enter_context(patch('aiosmtpd.smtp.log.exception')) client = resources.enter_context( SMTP(controller.hostname, controller.port)) code, response = client.helo('example.com') self.assertEqual(code, 500) self.assertEqual(response, b'Error: (ValueError) ErroringErrorHandler test') self.assertIsInstance(handler.error, ValueError)