我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用mock.mock.Mock()。
def test_resource_lazy_getattr(self): fake_manager = mock.Mock() return_resource = SampleResource(None, dict(uuid=mock.sentinel.fake_id, foo='bar', name='fake_name')) fake_manager.get.return_value = return_resource r = SampleResource(fake_manager, dict(uuid=mock.sentinel.fake_id, foo='bar')) self.assertTrue(hasattr(r, 'foo')) self.assertEqual('bar', r.foo) self.assertFalse(r.has_attached()) # Trigger load self.assertEqual('fake_name', r.name) fake_manager.get.assert_called_once_with(mock.sentinel.fake_id) self.assertTrue(r.has_attached()) # Missing stuff still fails after a second get self.assertRaises(AttributeError, getattr, r, 'abc')
def test_power_off_domain_not_running(self, mock_open): domain_name = 'domainA' domain_mock = mock.Mock() connection_mock = mock.Mock() connection_mock.lookupByName.return_value = domain_mock domain_mock.destroy.side_effect = \ DumbLibvirtError(DOMAIN_NOT_RUNNING, conn=connection_mock) mock_open.return_value = connection_mock self.driver.power_off(domain_name) domain_mock.destroy.assert_called_with() connection_mock.close.assert_called_with()
def test_when_the_config_key_does_not_exists(self): from airflow import logging_config def side_effect(*args): if args[1] == 'logging_config_class': raise AirflowConfigException else: return "bla_bla_from_test" logging_config.conf.get = mock.Mock(side_effect=side_effect) with patch.object(logging_config.log, 'debug') as mock_debug: logging_config.configure_logging() mock_debug.assert_any_call('Could not find key logging_config_class in config') # Just default
def test_upload(self): self.client.post.return_value = mock_upload_response('/files/content', httplib.CREATED, None, 'files', 'POST_response.json') file_name = 'upload.jpg' file_path = path.join(path.dirname(__file__), '..', 'fixtures', 'files', file_name) folder_id = 'folder-id' @mock.patch('orangecloud_client.files.guess_type', return_value=('image/jpeg', 'binary')) @mock.patch('__builtin__.open', spec=open, return_value=MockFile()) @mock.patch('orangecloud_client.files.MultipartEncoder', return_value=mock.Mock()) def fire_test(mock_multipart_encoder, mock_open, _): data = mock_multipart_encoder() data.content_type = 'upload content type' response_file = self.files.upload(file_path, folder_id) self.client.post.assert_called_with(self.client.post.return_value.url, data=data, headers={'Content-Type': data.content_type}) self.assertEqual('file-id', response_file.fileId) self.assertEqual('file-name', response_file.fileName) fire_test()
def test_poster_with_mentions_twitter(self, tweepy_mock, tweepy_oauth_mock, phantom_driver): mock_body = open(os.path.join(os.path.dirname(__file__), 'data', 'crs_page.html')).read() posters_register._posters = [] posters_register.register_poster(tweet_status, 'twitter_done') m = mock.Mock() m.update_with_media.return_value = mock.Mock(id='1') tweepy_mock.return_value = m phantom_driver.return_value = MockPhantomJS(mock_body) fetch_and_update() self.assertEqual(m.update_status.call_count, 1) self.assertEqual(m.update_with_media.call_count, 1) m.update_status.assert_called_with( '@4stagi @patrick91 Nuovo bollettino meteo ?', in_reply_to_status_id='1' )
def prepare(self): self.instance_ctl = mock.Mock() self.volume_ctl = mock.Mock() self.volume_type_ctl = mock.Mock() self.entity_ctl = mock.Mock() self.app_ctl = mock.Mock() self.auth_adapter = mock.Mock() routes.instance_ctl = self.instance_ctl routes.volume_ctl = self.volume_ctl routes.volume_type_ctl = self.volume_type_ctl routes.entity_ctl = self.entity_ctl routes.app_ctl = self.app_ctl routes.auth_adapter = self.auth_adapter self.app = flask.Flask("almanach") self.app.register_blueprint(routes.api)
def test_build_version_with_valid_package_name(self): package = Mock(project_name='foo', version='1.0.0') setattr(settings, 'HEARTBEAT', {'package_name': 'foo'}) with mock.patch.object(build.WorkingSet, 'find', return_value=package): distro = build.check(request=None) assert distro == {'name': 'foo', 'version': '1.0.0'}
def test_get_distribution_list(self, dist_list): dist_list.return_value = [ Mock(project_name=i, version='1.0.0') for i in range(3)] distro = distribution_list.check(request=None) assert {'version': '1.0.0', 'name': 1} in distro assert {'version': '1.0.0', 'name': 2} in distro
def test__lshw(self, udev_mock, po, wm, output_helper, hwd): wm.return_value = '/valid/path' udev_mock.return_value = "mocked_udevadm_out" process_mock = mock.Mock() attrs = {'communicate.return_value': (output_helper.lshw_out['stdout'], 'error')} process_mock.configure_mock(**attrs) po.return_value = process_mock out = hwd._lshw() expected = output_helper.lshw_out['expected_return'] assert out == expected
def test__udevadm(self, po, output_helper, hwd): # additional test for virtualized env? process_mock = mock.Mock() attrs = {'communicate.return_value': (output_helper._udevadm_out['stdout'], 'error')} process_mock.configure_mock(**attrs) po.return_value = process_mock out = hwd._udevadm('/dev/sda') expected = output_helper._udevadm_out['expected_return'] assert out == expected
def test__updates_needed_not(self, po, apt): """ No updates pending """ process_mock = mock.Mock() attrs = {'communicate.return_value': ("", "0;0")} process_mock.configure_mock(**attrs) po.return_value = process_mock ret = apt._updates_needed() assert po.called is True assert ret is False
def test__updates_needed(self, po, apt): """ Updates pending """ process_mock = mock.Mock() attrs = {'communicate.return_value': ("", "1;1")} process_mock.configure_mock(**attrs) po.return_value = process_mock ret = apt._updates_needed() assert po.called is True assert ret is True
def core_mock(self): return mock.Mock()
def setUp(self): super(TestSnmpClient, self).setUp() self.command_generator_mock = mock.Mock() self.pysnmp_mock = mock.Mock() self.oneliner_cmdgen = mock.Mock() self.oneliner_cmdgen.CommandGenerator.return_value = \ self.command_generator_mock self.oneliner_cmdgen.CommunityData.return_value = \ sentinel.community_data self.oneliner_cmdgen.UdpTransportTarget.return_value = \ sentinel.udp_transport_target self.snmp_client = snmp_client.SnmpClient( oneliner_cmdgen=self.oneliner_cmdgen, host=sentinel.hostname, port=sentinel.port, community=sentinel.community, timeout=sentinel.timeout, retries=sentinel.retries, ) self.all_error_indications = [ (errind.__dict__.get(a).__class__.__name__, errind.__dict__.get(a)) for a in dir(errind) if isinstance(errind.__dict__.get(a), ErrorIndication)]
def test_power_on(self, mock_open): domain_mock = mock.Mock() connection_mock = mock.Mock() connection_mock.lookupByName.return_value = domain_mock mock_open.return_value = connection_mock self.driver.power_on('domainA') mock_open.assert_called_with('hello') connection_mock.lookupByName.assert_called_with('domainA') domain_mock.create.assert_called_with() connection_mock.close.assert_called_with()
def test_power_off(self, mock_open): domain_mock = mock.Mock() connection_mock = mock.Mock() connection_mock.lookupByName.return_value = domain_mock mock_open.return_value = connection_mock self.driver.power_off('domainA') mock_open.assert_called_with('hello') connection_mock.lookupByName.assert_called_with('domainA') domain_mock.destroy.assert_called_with() connection_mock.close.assert_called_with()
def test_get_power_state_on(self, mock_open): domain_mock = mock.Mock() domain_mock.isActive.return_value = 1 connection_mock = mock.Mock() connection_mock.lookupByName.return_value = domain_mock mock_open.return_value = connection_mock self.assertEqual(drivers.POWER_ON, self.driver.get_power_state('domainA')) mock_open.assert_called_with('hello') connection_mock.lookupByName.assert_called_with('domainA') domain_mock.isActive.assert_called_with() connection_mock.close.assert_called_with()
def test_get_power_state_off(self, mock_open): domain_mock = mock.Mock() domain_mock.isActive.return_value = 0 connection_mock = mock.Mock() connection_mock.lookupByName.return_value = domain_mock mock_open.return_value = connection_mock self.assertEqual(drivers.POWER_OFF, self.driver.get_power_state('domainA')) mock_open.assert_called_with('hello') connection_mock.lookupByName.assert_called_with('domainA') domain_mock.isActive.assert_called_with() connection_mock.close.assert_called_with()
def test_get_power_state_domain_not_found(self, mock_open): connection_mock = mock.Mock() connection_mock.lookupByName.side_effect = \ libvirt.libvirtError('virDomainLookupByName() failed', conn=connection_mock) mock_open.return_value = connection_mock self.assertRaises(drivers.DeviceNotFound, self.driver.get_power_state, 'domainA') connection_mock.close.assert_called_with()
def test_power_on_domain_not_found(self, mock_open): connection_mock = mock.Mock() connection_mock.lookupByName.side_effect = \ libvirt.libvirtError('virDomainLookupByName() failed', conn=connection_mock) mock_open.return_value = connection_mock self.assertRaises(drivers.DeviceNotFound, self.driver.power_on, 'domainA') connection_mock.close.assert_called_with()
def test_power_off_domain_not_found(self, mock_open): connection_mock = mock.Mock() connection_mock.lookupByName.side_effect = \ libvirt.libvirtError('virDomainLookupByName() failed', conn=connection_mock) mock_open.return_value = connection_mock self.assertRaises(drivers.DeviceNotFound, self.driver.power_off, 'domainA') connection_mock.close.assert_called_with()
def test_power_on_generic_error_goes_through(self, mock_open): domain_mock = mock.Mock() connection_mock = mock.Mock() connection_mock.lookupByName.return_value = domain_mock domain_mock.create.side_effect = SpecificException() mock_open.return_value = connection_mock self.assertRaises(SpecificException, self.driver.power_on, 'domainA') connection_mock.close.assert_called_with()
def test_power_on_stays_connected(self, mock_open): domain_mock = mock.Mock() connection_mock = mock.Mock() connection_mock.lookupByName.return_value = domain_mock mock_open.return_value = connection_mock self.driver.power_on('domainA') mock_open.assert_called_with('hello') connection_mock.lookupByName.assert_called_with('domainA') domain_mock.create.assert_called_with() self.assertFalse(connection_mock.close.called)
def setUp(self): super(PDUTestCase, self).setUp() self.community = 'test1212' self.core_mock = mock.Mock() self.pdu = self.pdu_class(name='test_pdu', core=self.core_mock) self.pdu_test_harness = pysnmp_handler.SNMPPDUHarness( self.pdu, '127.0.0.1', random.randint(20000, 30000), self.community) self.pdu_test_harness.start()
def mocked_resource(): return mock.Mock()
def test_backup_mysql_status_updated(mock_get_dst, mock_backup_stream, mock_prep_status, mock_mysql_source, config_content): s_config = config_content.format(destination="ssh", port='1234') buf = StringIO.StringIO(s_config) config = ConfigParser.ConfigParser() config.readfp(buf) mock_dst = mock.Mock() mock_get_dst.return_value = mock_dst backup_mysql("hourly", config) mock_dst.status.assert_called_once()
def mock_paginator(): return mock.Mock(spec=botocore.client.Paginator)
def mock_as_client(mock_paginator): mock_as_client = mock.Mock() mock_as_client.get_paginator.return_value = mock_paginator return mock_as_client
def mock_ec2(): return mock.Mock(spec=botocore.client)
def mock_ec2_client(): return mock.Mock()
def mock_ssh_connection(): return mock.Mock(spec=SSHClient)
def mock_aws_manager(): return mock.Mock(spec=AwsManager)
def mock_instance_manager(): return mock.Mock(spec=InstanceSshManager)
def test_rum_gets_list_of_instances_to_upgrade( rolling_upgrade_manager, mock_aws_manager ): Instance = namedtuple('Instance', ['id']) instances = [Instance('instance1'), Instance('instance2'), Instance('instance3')] rolling_upgrade_manager.compare_instance_to_config = mock.Mock() mock_aws_manager.get_instances_for_asg.return_value = instances rolling_upgrade_manager.compare_instance_to_config.side_effect = \ ([], [], ['diff']) result = rolling_upgrade_manager.get_instances_to_upgrade("test-asg", {}) assert len(result) == 1 assert instances[2] in result rolling_upgrade_manager.compare_instance_to_config.side_effect = \ (['diff1', 'diff2'], [], ['diff']) result = rolling_upgrade_manager.get_instances_to_upgrade("test-asg", {}) assert len(result) == 2 assert instances[0] in result assert instances[2] in result rolling_upgrade_manager.compare_instance_to_config.side_effect = \ ([], [], []) result = rolling_upgrade_manager.get_instances_to_upgrade("test-asg", {}) assert len(result) == 0 mock_aws_manager.get_instances_for_asg.assert_has_calls(( mock.call('test-asg'), mock.call('test-asg'), mock.call('test-asg'), ))
def test_poster_with_tags_facebook(self, facebook_mock, phantom_driver): mock_body = open(os.path.join(os.path.dirname(__file__), 'data', 'crs_page.html')).read() posters_register._posters = [] posters_register.register_poster(facebook_status, 'facebook_done') m = mock.Mock() m.put_photo.side_effect = mock.Mock(id='1') facebook_mock.return_value = m phantom_driver.return_value = MockPhantomJS(mock_body) self.assertEqual(len(Log.objects.all()), 0) meteo_fake_status = '?? Emergenza: A+ , A- , O+ , O- , B- , AB- , AB+\n' meteo_upload_fake = os.path.join(os.path.dirname(__file__), 'data', 'meteo_fake_upload.png') facebook_status(meteo_fake_status, meteo_upload_fake) self.assertEqual(m.put_wall_post.call_count, 0) self.assertEqual(m.put_photo.call_count, 1) self.assertEqual( m.put_photo.call_args[1]['tags'], '[{"tag_uid": "andrea.stagi"}, {"tag_uid": "patrick.arminio"}]' )