我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用contextlib.nested()。
def nested(*managers): exits = [] vars = [] exc = (None, None, None) try: for mgr in managers: exit = mgr.__exit__ enter = mgr.__enter__ vars.append(enter()) exits.append(exit) yield vars except: exc = sys.exc_info() finally: while exits: exit = exits.pop() try: if exit(*exc): exc = (None, None, None) except: exc = sys.exc_info() if exc != (None, None, None): reraise(exc[0], exc[1], exc[2])
def test_spawn(self): self._create_instance() self._create_network() with contextlib.nested( mock.patch.object(EC2Driver, '_get_image_ami_id_from_meta'), mock.patch.object(EC2Driver, '_process_network_info'), mock.patch.object(EC2Driver, '_get_instance_sec_grps'), ) as (mock_image, mock_network, mock_secgrp): mock_image.return_value = 'ami-1234abc' mock_network.return_value = (self.subnet_id, '192.168.10.5', None, None) mock_secgrp.return_value = [] self._create_nova_vm() fake_instances = self.fake_ec2_conn.get_only_instances() self.assertEqual(len(fake_instances), 1) inst = fake_instances[0] self.assertEqual(inst.vpc_id, self.vpc.id) self.assertEqual(self.subnet_id, inst.subnet_id) self.assertEqual(inst.tags['Name'], 'fake_instance') self.assertEqual(inst.tags['openstack_id'], self.uuid) self.assertEqual(inst.image_id, 'ami-1234abc') self.assertEqual(inst.region.name, self.region_name) self.assertEqual(inst.key_name, 'None') self.assertEqual(inst.instance_type, 't2.small') self.reset()
def test_spawn_with_key(self): self._create_instance(key_name='fake_key', key_data='fake_key_data') self._create_network() with contextlib.nested( mock.patch.object(EC2Driver, '_get_image_ami_id_from_meta'), mock.patch.object(EC2Driver, '_process_network_info'), mock.patch.object(EC2Driver, '_get_instance_sec_grps'), ) as (mock_image, mock_network, mock_secgrp): mock_image.return_value = 'ami-1234abc' mock_network.return_value = (self.subnet_id, '192.168.10.5', None, None) mock_secgrp.return_value = [] self._create_nova_vm() fake_instances = self.fake_ec2_conn.get_only_instances() self.assertEqual(len(fake_instances), 1) inst = fake_instances[0] self.assertEqual(inst.key_name, 'fake_key') self.reset()
def test_destory_instance_terminated_on_aws(self): self._create_vm_in_aws_nova() fake_instances = self.fake_ec2_conn.get_only_instances() self.fake_ec2_conn.stop_instances(instance_ids=[fake_instances[0].id]) self.fake_ec2_conn.terminate_instances( instance_ids=[fake_instances[0].id]) with contextlib.nested( mock.patch.object(boto.ec2.EC2Connection, 'stop_instances'), mock.patch.object(boto.ec2.EC2Connection, 'terminate_instances'), mock.patch.object(EC2Driver, '_wait_for_state'), ) as (fake_stop, fake_terminate, fake_wait): self.conn.destroy(self.context, self.instance, None, None) fake_stop.assert_not_called() fake_terminate.assert_not_called() fake_wait.assert_not_called() self.reset()
def _stubs(self, network_info, subnet_info, ports_info): self.ipam = quark.ipam.QuarkIpamANY() with contextlib.nested( mock.patch("neutron.common.rpc.get_notifier"), mock.patch("neutron.quota.QUOTAS.limit_check")): net = network_api.create_network(self.context, network_info) mac = {'mac_address_range': dict(cidr="AA:BB:CC")} self.context.is_admin = True macrng_api.create_mac_address_range(self.context, mac) self.context.is_admin = False subnet_info['subnet']['network_id'] = net['id'] sub = subnet_api.create_subnet(self.context, subnet_info) ports = [] for port_info in ports_info: port_info['port']['network_id'] = net['id'] ports.append(port_api.create_port(self.context, port_info)) yield net, sub, ports
def vulture(): """ try to find dead code paths """ with api.quiet(): if not api.local('which vulture').succeeded: print 'vulture not found, installing it' api.local('pip install vulture') ignore_functions_grep = 'egrep -v "{0}"'.format( '|'.join(VULTURE_IGNORE_FUNCTIONS)) excluded = ",".join(VULTURE_EXCLUDE_PATHS) excluded_paths = (' --exclude ' + excluded) if excluded else '' vulture_cmd = '\n vulture {pkg_name}{exclude}{pipes}' vulture_cmd = vulture_cmd.format( pkg_name=PKG_NAME, exclude=excluded_paths, pipes='|'.join(['', ignore_functions_grep])) changedir = api.lcd(os.path.dirname(__file__)) warn_only = api.settings(warn_only=True) be_quit = api.hide('warnings') with contextlib.nested(changedir, warn_only, be_quit): result = api.local(vulture_cmd, capture=True) exit_code = result.return_code print result.strip() raise SystemExit(exit_code)
def _patch_and_run_amira( self, region_name, queue_name, contents, created_objects): """Patches all the external dependencies and runs AMIRA.""" self._results_uploader_mock = MagicMock() with nested( patch.object( S3Handler, '__init__', autospec=True, return_value=None), patch.object( S3Handler, 'get_contents_as_string', autospec=True, side_effect=contents), patch.object( SqsHandler, '__init__', autospec=True, return_value=None), patch.object( SqsHandler, 'get_created_objects', autospec=True, side_effect=created_objects), ) as ( __, self._patched_get_contents_as_string, __, self._patched_get_created_objects, ): amira = AMIRA(region_name, queue_name) amira.register_results_uploader(self._results_uploader_mock) amira.run()
def test_run_rule_calls_garbage_collect(ea): start_time = '2014-09-26T00:00:00Z' end_time = '2014-09-26T12:00:00Z' ea.buffer_time = datetime.timedelta(hours=1) ea.run_every = datetime.timedelta(hours=1) with contextlib.nested(mock.patch.object(ea.rules[0]['type'], 'garbage_collect'), mock.patch.object(ea, 'run_query')) as (mock_gc, mock_get_hits): ea.run_rule(ea.rules[0], ts_to_dt(end_time), ts_to_dt(start_time)) # Running ElastAlert every hour for 12 hours, we should see self.garbage_collect called 12 times. assert mock_gc.call_count == 12 # The calls should be spaced 1 hour apart expected_calls = [ts_to_dt(start_time) + datetime.timedelta(hours=i) for i in range(1, 13)] for e in expected_calls: mock_gc.assert_any_call(e)
def __call__(self, f): @wrapt.decorator def test_with_fixtures(wrapped, instance, args, kwargs): fixture_instances = [i for i in list(args) + list(kwargs.values()) if i.__class__ in self.fixture_classes or i.scenario in self.requested_fixtures] dependency_ordered_fixtures = self.topological_sort_instances(fixture_instances) if six.PY2: with contextlib.nested(*list(dependency_ordered_fixtures)): return wrapped(*args, **kwargs) else: with contextlib.ExitStack() as stack: for fixture in dependency_ordered_fixtures: stack.enter_context(fixture) return wrapped(*args, **kwargs) ff = test_with_fixtures(f) arg_names = self.fixture_arg_names(ff) return pytest.mark.parametrize(','.join(arg_names), self.fixture_permutations())(ff)
def _exec_(self, source): source = '''\ (function() {{ {0}; {1}; }})()'''.format( encode_unicode_codepoints(self._source), encode_unicode_codepoints(source) ) source = str(source) # backward compatibility with contextlib.nested(PyV8.JSContext(), PyV8.JSEngine()) as (ctxt, engine): js_errors = (PyV8.JSError, IndexError, ReferenceError, SyntaxError, TypeError) try: script = engine.compile(source) except js_errors as e: raise exceptions.RuntimeError(e) try: value = script.run() except js_errors as e: raise exceptions.ProgramError(e) return self.convert(value)
def exec_(self, source): source = '''\ (function() {{ {0}; {1}; }})()'''.format( encode_unicode_codepoints(self._source), encode_unicode_codepoints(source) ) source = str(source) import PyV8 import contextlib #backward compatibility with contextlib.nested(PyV8.JSContext(), PyV8.JSEngine()) as (ctxt, engine): js_errors = (PyV8.JSError, IndexError, ReferenceError, SyntaxError, TypeError) try: script = engine.compile(source) except js_errors as e: raise RuntimeError(e) try: value = script.run() except js_errors as e: raise ProgramError(e) return self.convert(value)
def test_pkey_calls_paramiko_RSAKey(self): with contextlib.nested( mock.patch('paramiko.RSAKey.from_private_key'), mock.patch('cStringIO.StringIO')) as (rsa_mock, cs_mock): cs_mock.return_value = mock.sentinel.csio pkey = 'mykey' ssh.Client('localhost', 'root', pkey=pkey) rsa_mock.assert_called_once_with(mock.sentinel.csio) cs_mock.assert_called_once_with('mykey') rsa_mock.reset_mock() cs_mock.rest_mock() pkey = mock.sentinel.pkey # Shouldn't call out to load a file from RSAKey, since # a sentinel isn't a basestring... ssh.Client('localhost', 'root', pkey=pkey) rsa_mock.assert_not_called() cs_mock.assert_not_called()
def nested(*contexts): with contextlib.ExitStack() as stack: yield [stack.enter_context(c) for c in contexts]
def nested(*contexts): """ Reimplementation of nested in python 3. """ with ExitStack() as stack: results = [ stack.enter_context(context) for context in contexts ] yield results
def create_upload_func(self, ns, definition, path, operation): request_schema = definition.request_schema or Schema() response_schema = definition.response_schema or Schema() @self.add_route(path, operation, ns) @wraps(definition.func) def upload(**path_data): request_data = load_query_string_data(request_schema) if not request.files: raise BadRequest("No files were uploaded") uploads = [ temporary_upload(name, fileobj) for name, fileobj in request.files.items() if not self.exclude_func(name, fileobj) ] with nested(*uploads) as files: response_data = definition.func(files, **merge_data(path_data, request_data)) if response_data is None: return "", 204 return dump_response_data(response_schema, response_data, operation.value.default_code) if definition.request_schema: upload = qs(definition.request_schema)(upload) if definition.response_schema: upload = response(definition.response_schema)(upload) return upload
def flatten(self, *args): """Flatten a nested tuple/list of tuples/lists""" flattened_list = [] for element in args: if isinstance(element, (tuple, list)): flattened_list.extend(self.flatten(*element)) else: flattened_list.extend([element]) return flattened_list
def _create_vm_in_aws_nova(self): self._create_instance() self._create_network() with contextlib.nested( mock.patch.object(EC2Driver, '_get_image_ami_id_from_meta'), mock.patch.object(EC2Driver, '_process_network_info'), mock.patch.object(EC2Driver, '_get_instance_sec_grps'), ) as (mock_image, mock_network, mock_secgrp): mock_image.return_value = 'ami-1234abc' mock_network.return_value = (self.subnet_id, '192.168.10.5', None, None) mock_secgrp.return_value = [] self._create_nova_vm()
def test_add_ssh_keys_key_exists(self): fake_key = 'fake_key' fake_key_data = 'abcdefgh' self.conn.ec2_conn.import_key_pair(fake_key, fake_key_data) with contextlib.nested( mock.patch.object(boto.ec2.EC2Connection, 'get_key_pair'), mock.patch.object(boto.ec2.EC2Connection, 'import_key_pair'), ) as (fake_get, fake_import): fake_get.return_value = True self.conn._add_ssh_keys(fake_key, fake_key_data) fake_get.assert_called_once_with(fake_key) fake_import.assert_not_called()
def test_add_ssh_keys_key_absent(self): fake_key = 'fake_key' fake_key_data = 'abcdefgh' with contextlib.nested( mock.patch.object(boto.ec2.EC2Connection, 'get_key_pair'), mock.patch.object(boto.ec2.EC2Connection, 'import_key_pair'), ) as (fake_get, fake_import): fake_get.return_value = False self.conn._add_ssh_keys(fake_key, fake_key_data) fake_get.assert_called_once_with(fake_key) fake_import.assert_called_once_with(fake_key, fake_key_data)
def test_spawn_with_network_error(self): self._create_instance() with contextlib.nested( mock.patch.object(EC2Driver, '_get_image_ami_id_from_meta'), mock.patch.object(EC2Driver, '_process_network_info'), mock.patch.object(EC2Driver, '_get_instance_sec_grps'), ) as (mock_image, mock_network, mock_secgrp): mock_image.return_value = 'ami-1234abc' mock_network.return_value = (None, None, None, None) mock_secgrp.return_value = [] self.assertRaises(exception.BuildAbortException, self._create_nova_vm) self.reset()
def test_spawn_with_network_error_from_aws(self): self._create_instance() with contextlib.nested( mock.patch.object(EC2Driver, '_get_image_ami_id_from_meta'), mock.patch.object(EC2Driver, '_process_network_info'), mock.patch.object(EC2Driver, '_get_instance_sec_grps'), ) as (mock_image, mock_network, mock_secgrp): mock_image.return_value = 'ami-1234abc' mock_network.return_value = (None, '192.168.10.5', None, None) mock_secgrp.return_value = [] self.assertRaises(exception.BuildAbortException, self._create_nova_vm) self.reset()
def test_spawn_with_image_error(self): self._create_instance() self._create_network() with contextlib.nested( mock.patch.object(EC2Driver, '_get_image_ami_id_from_meta'), mock.patch.object(EC2Driver, '_process_network_info'), mock.patch.object(EC2Driver, '_get_instance_sec_grps'), ) as (mock_image, mock_network, mock_secgrp): mock_image.side_effect = exception.BuildAbortException('fake') mock_network.return_value = ('subnet-1234abc', '192.168.10.5', None, None) mock_secgrp.return_value = [] self.assertRaises(exception.BuildAbortException, self._create_nova_vm) self.reset()
def test_destroy_instance_not_found(self): self._create_instance() with contextlib.nested( mock.patch.object(boto.ec2.EC2Connection, 'stop_instances'), mock.patch.object(boto.ec2.EC2Connection, 'terminate_instances'), mock.patch.object(EC2Driver, '_wait_for_state'), ) as (fake_stop, fake_terminate, fake_wait): self.conn.destroy(self.context, self.instance, None, None) fake_stop.assert_not_called() fake_terminate.assert_not_called() fake_wait.assert_not_called() self.reset()
def _stubs(self, network_info, subnet_info, port_info): self.ipam = quark.ipam.QuarkIpamANY() with contextlib.nested( mock.patch("neutron.common.rpc.get_notifier"), mock.patch("neutron.quota.QUOTAS.limit_check")): net = network_api.create_network(self.context, network_info) mac = {'mac_address_range': dict(cidr="AA:BB:CC")} self.context.is_admin = True macrng_api.create_mac_address_range(self.context, mac) self.context.is_admin = False subnet_info['subnet']['network_id'] = net['id'] port_info['port']['network_id'] = net['id'] sub = subnet_api.create_subnet(self.context, subnet_info) port = port_api.create_port(self.context, port_info) yield net, sub, port
def unzip(infile, outfile): with contextlib.nested(gzip.open(infile, 'rb'), open(outfile, 'w')) as (gz, out): for data in iter(functools.partial(gz.read, 1024), ''): out.write(data) ########################################################################### # END: Utility functions ###########################################################################
def read_files(*file_paths): files = [] for i, p in enumerate(file_paths): if p: files.append(open(p, mode="r")) print 'Opened:', p else: files.append(EmptyFile()) print 'WARNING: no path provided for file {} in list.'.format(i) with contextlib.nested(*files) as entered_files: for lines in izip(*entered_files): yield lines
def makeSummaryFiles(iniName): os.chdir(dirs['tmp']) datacsv = 'filtered_measurements.csv' ndatacsv = 'fastest_measurements.csv' bulkdatacsv = 'all_measurements.csv' with nested( open( join(datacsv), 'w'), open( join(ndatacsv), 'w')) as (df,ndf): try: #bdf = bz2.BZ2File( join(bulkdatacsv), 'w') bdf = open( join(bulkdatacsv), 'w') writeHeader(df); writeHeader(ndf); writeHeader(bdf) walkDatFiles(df,ndf,bdf) except (OSError,IOError), err: if logger: logger.error(err) finally: bdf.close() try: if dirs['dat_sweep']: copy2( join(datacsv), dirs['dat_sweep']) copy2( join(ndatacsv), dirs['dat_sweep']) copy2( join(bulkdatacsv), dirs['dat_sweep']) if logger: logger.info('copy %s files to %s','*.csv',dirs['dat_sweep']) except (OSError,IOError), err: if logger: logger.error(err) # ============================= # sweep .code & .log # =============================
def test_interface_initially_up(self): combined_id = WloadEndpointId("host_id", "orchestrator_id", "workload_id", "endpoint_id") ip_type = futils.IPV4 local_ep = self.create_endpoint(combined_id, ip_type) ips = ["1.2.3.4"] iface = "tapabcdef" data = { 'state': "active", 'endpoint': "endpoint_id", 'mac': stub_utils.get_mac(), 'name': iface, 'ipv4_nets': ips, 'profile_ids': ["prof1"] } # We can only get on_interface_update calls after the first # on_endpoint_update, so trigger that. with nested( mock.patch('calico.felix.devices.set_routes'), mock.patch('calico.felix.devices.configure_interface_ipv4'), mock.patch('calico.felix.devices.interface_up'), mock.patch('calico.felix.devices.interface_exists'), ) as [m_set_routes, m_conf, m_iface_up, m_iface_exists]: m_iface_up.return_value = True m_iface_exists.return_value = True local_ep.on_endpoint_update(data, async=True) self.step_actor(local_ep) self.assertEqual(local_ep._mac, data['mac']) self.assertTrue(m_conf.called) self.assertTrue(local_ep._device_in_sync) # Interface is up so we should get the routes striaght away. m_set_routes.assert_called_once_with(ip_type, set(ips), iface, data['mac'], reset_arp=True)
def test_interface_goes_down_removes_routes(self): combined_id = WloadEndpointId("host_id", "orchestrator_id", "workload_id", "endpoint_id") ip_type = futils.IPV4 local_ep = self.create_endpoint(combined_id, ip_type) ips = ["1.2.3.4"] iface = "tapabcdef" data = { 'state': "active", 'endpoint': "endpoint_id", 'mac': stub_utils.get_mac(), 'name': iface, 'ipv4_nets': ips, 'profile_ids': ["prof1"] } # We can only get on_interface_update calls after the first # on_endpoint_update, so trigger that. with nested( mock.patch('calico.felix.devices.set_routes'), mock.patch('calico.felix.devices.configure_interface_ipv4'), mock.patch('calico.felix.devices.interface_up'), mock.patch('calico.felix.devices.interface_exists'), ) as [m_set_routes, m_conf, m_iface_up, m_iface_exists]: m_iface_up.return_value = True m_iface_exists.return_value = True local_ep.on_endpoint_update(data, async=True) self.step_actor(local_ep) self.assertTrue(m_conf.called) # Check we did the "UP" path. # Now pretend send an interface update. with mock.patch('calico.felix.devices.set_routes') as m_set_routes: local_ep.on_interface_update(False, async=True) self.step_actor(local_ep) m_set_routes.assert_called_once_with(ip_type, set(), iface, None) self.assertTrue(local_ep._device_in_sync)
def test_configure_interface_ipv6_mainline(self): """ Test that configure_interface_ipv6_mainline - opens and writes to the /proc system to enable proxy NDP on the interface. - calls ip -6 neigh to set up the proxy targets. Mainline test has two proxy targets. """ m_open = mock.mock_open() rc = futils.CommandOutput("", "") if_name = "tap3e5a2b34222" proxy_target = "2001::3:4" open_patch = mock.patch('__builtin__.open', m_open, create=True) m_check_call = mock.patch('calico.felix.futils.check_call', return_value=rc) with nested(open_patch, m_check_call) as (_, m_check_call): devices.configure_interface_ipv6(if_name, proxy_target) calls = [mock.call('/proc/sys/net/ipv6/conf/%s/proxy_ndp' % if_name, 'wb'), M_ENTER, mock.call().write('1'), M_CLEAN_EXIT] m_open.assert_has_calls(calls) ip_calls = [mock.call(["ip", "-6", "neigh", "add", "proxy", str(proxy_target), "dev", if_name])] m_check_call.assert_has_calls(ip_calls)