Python mock.mock 模块,Mock() 实例源码

我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用mock.mock.Mock()

项目:OpenStackClient_VBS    作者:Huawei    | 项目源码 | 文件源码
def test_resource_lazy_getattr(self):
        fake_manager = mock.Mock()
        return_resource = SampleResource(None, dict(uuid=mock.sentinel.fake_id,
                                                    foo='bar',
                                                    name='fake_name'))
        fake_manager.get.return_value = return_resource

        r = SampleResource(fake_manager,
                           dict(uuid=mock.sentinel.fake_id, foo='bar'))
        self.assertTrue(hasattr(r, 'foo'))
        self.assertEqual('bar', r.foo)
        self.assertFalse(r.has_attached())

        # Trigger load
        self.assertEqual('fake_name', r.name)
        fake_manager.get.assert_called_once_with(mock.sentinel.fake_id)
        self.assertTrue(r.has_attached())

        # Missing stuff still fails after a second get
        self.assertRaises(AttributeError, getattr, r, 'abc')
项目:virtualpdu    作者:openstack    | 项目源码 | 文件源码
def test_power_off_domain_not_running(self, mock_open):
        domain_name = 'domainA'

        domain_mock = mock.Mock()
        connection_mock = mock.Mock()
        connection_mock.lookupByName.return_value = domain_mock

        domain_mock.destroy.side_effect = \
            DumbLibvirtError(DOMAIN_NOT_RUNNING,
                             conn=connection_mock)
        mock_open.return_value = connection_mock

        self.driver.power_off(domain_name)

        domain_mock.destroy.assert_called_with()
        connection_mock.close.assert_called_with()
项目:OpenStackClient_Workspace    作者:Huawei    | 项目源码 | 文件源码
def test_resource_lazy_getattr(self):
        fake_manager = mock.Mock()
        return_resource = SampleResource(None, dict(uuid=mock.sentinel.fake_id,
                                                    foo='bar',
                                                    name='fake_name'))
        fake_manager.get.return_value = return_resource

        r = SampleResource(fake_manager,
                           dict(uuid=mock.sentinel.fake_id, foo='bar'))
        self.assertTrue(hasattr(r, 'foo'))
        self.assertEqual('bar', r.foo)
        self.assertFalse(r.has_attached())

        # Trigger load
        self.assertEqual('fake_name', r.name)
        fake_manager.get.assert_called_once_with(mock.sentinel.fake_id)
        self.assertTrue(r.has_attached())

        # Missing stuff still fails after a second get
        self.assertRaises(AttributeError, getattr, r, 'abc')
项目:incubator-airflow-old    作者:apache    | 项目源码 | 文件源码
def test_when_the_config_key_does_not_exists(self):
        from airflow import logging_config

        def side_effect(*args):
            if args[1] == 'logging_config_class':
                raise AirflowConfigException
            else:
                return "bla_bla_from_test"

        logging_config.conf.get = mock.Mock(side_effect=side_effect)

        with patch.object(logging_config.log, 'debug') as mock_debug:
            logging_config.configure_logging()
            mock_debug.assert_any_call('Could not find key logging_config_class in config')

    # Just default
项目:orangecloud-client    作者:antechrestos    | 项目源码 | 文件源码
def test_upload(self):
        self.client.post.return_value = mock_upload_response('/files/content',
                                                             httplib.CREATED,
                                                             None,
                                                             'files', 'POST_response.json')
        file_name = 'upload.jpg'
        file_path = path.join(path.dirname(__file__), '..', 'fixtures', 'files', file_name)
        folder_id = 'folder-id'

        @mock.patch('orangecloud_client.files.guess_type', return_value=('image/jpeg', 'binary'))
        @mock.patch('__builtin__.open', spec=open, return_value=MockFile())
        @mock.patch('orangecloud_client.files.MultipartEncoder', return_value=mock.Mock())
        def fire_test(mock_multipart_encoder, mock_open, _):
            data = mock_multipart_encoder()
            data.content_type = 'upload content type'
            response_file = self.files.upload(file_path, folder_id)
            self.client.post.assert_called_with(self.client.post.return_value.url,
                                                data=data,
                                                headers={'Content-Type': data.content_type})
            self.assertEqual('file-id', response_file.fileId)
            self.assertEqual('file-name', response_file.fileName)

        fire_test()
项目:meteosangue    作者:meteosangue    | 项目源码 | 文件源码
def test_poster_with_mentions_twitter(self, tweepy_mock, tweepy_oauth_mock, phantom_driver):
        mock_body = open(os.path.join(os.path.dirname(__file__), 'data', 'crs_page.html')).read()

        posters_register._posters = []
        posters_register.register_poster(tweet_status, 'twitter_done')

        m = mock.Mock()
        m.update_with_media.return_value = mock.Mock(id='1')

        tweepy_mock.return_value = m

        phantom_driver.return_value = MockPhantomJS(mock_body)

        fetch_and_update()

        self.assertEqual(m.update_status.call_count, 1)
        self.assertEqual(m.update_with_media.call_count, 1)
        m.update_status.assert_called_with(
            '@4stagi @patrick91 Nuovo bollettino meteo ?',
            in_reply_to_status_id='1'
        )
项目:virtualpdu    作者:internap    | 项目源码 | 文件源码
def test_power_off_domain_not_running(self, mock_open):
        domain_name = 'domainA'

        domain_mock = mock.Mock()
        connection_mock = mock.Mock()
        connection_mock.lookupByName.return_value = domain_mock

        domain_mock.destroy.side_effect = \
            DumbLibvirtError(DOMAIN_NOT_RUNNING,
                             conn=connection_mock)
        mock_open.return_value = connection_mock

        self.driver.power_off(domain_name)

        domain_mock.destroy.assert_called_with()
        connection_mock.close.assert_called_with()
项目:almanach    作者:openstack    | 项目源码 | 文件源码
def prepare(self):
        self.instance_ctl = mock.Mock()
        self.volume_ctl = mock.Mock()
        self.volume_type_ctl = mock.Mock()
        self.entity_ctl = mock.Mock()
        self.app_ctl = mock.Mock()
        self.auth_adapter = mock.Mock()
        routes.instance_ctl = self.instance_ctl
        routes.volume_ctl = self.volume_ctl
        routes.volume_type_ctl = self.volume_type_ctl
        routes.entity_ctl = self.entity_ctl
        routes.app_ctl = self.app_ctl
        routes.auth_adapter = self.auth_adapter

        self.app = flask.Flask("almanach")
        self.app.register_blueprint(routes.api)
项目:django-heartbeat    作者:pbs    | 项目源码 | 文件源码
def test_build_version_with_valid_package_name(self):
        package = Mock(project_name='foo', version='1.0.0')
        setattr(settings, 'HEARTBEAT', {'package_name': 'foo'})
        with mock.patch.object(build.WorkingSet, 'find', return_value=package):
            distro = build.check(request=None)
            assert distro == {'name': 'foo', 'version': '1.0.0'}
项目:django-heartbeat    作者:pbs    | 项目源码 | 文件源码
def test_get_distribution_list(self, dist_list):
        dist_list.return_value = [
            Mock(project_name=i, version='1.0.0') for i in range(3)]
        distro = distribution_list.check(request=None)
        assert {'version': '1.0.0', 'name': 1} in distro
        assert {'version': '1.0.0', 'name': 2} in distro
项目:DeepSea    作者:SUSE    | 项目源码 | 文件源码
def test__lshw(self, udev_mock, po, wm, output_helper, hwd):
        wm.return_value = '/valid/path'
        udev_mock.return_value = "mocked_udevadm_out"
        process_mock = mock.Mock()
        attrs = {'communicate.return_value': (output_helper.lshw_out['stdout'], 'error')}
        process_mock.configure_mock(**attrs)
        po.return_value = process_mock 
        out = hwd._lshw()
        expected = output_helper.lshw_out['expected_return']
        assert out == expected
项目:DeepSea    作者:SUSE    | 项目源码 | 文件源码
def test__udevadm(self, po, output_helper, hwd):
        # additional test for virtualized env?
        process_mock = mock.Mock()
        attrs = {'communicate.return_value': (output_helper._udevadm_out['stdout'], 'error')}
        process_mock.configure_mock(**attrs)
        po.return_value = process_mock 
        out = hwd._udevadm('/dev/sda')
        expected = output_helper._udevadm_out['expected_return']
        assert out == expected
项目:DeepSea    作者:SUSE    | 项目源码 | 文件源码
def test__updates_needed_not(self, po, apt):
        """
        No updates pending
        """
        process_mock = mock.Mock()
        attrs = {'communicate.return_value': ("", "0;0")}
        process_mock.configure_mock(**attrs)
        po.return_value = process_mock
        ret = apt._updates_needed()
        assert po.called is True
        assert ret is False
项目:DeepSea    作者:SUSE    | 项目源码 | 文件源码
def test__updates_needed(self, po, apt):
        """
        Updates pending
        """
        process_mock = mock.Mock()
        attrs = {'communicate.return_value': ("", "1;1")}
        process_mock.configure_mock(**attrs)
        po.return_value = process_mock
        ret = apt._updates_needed()
        assert po.called is True
        assert ret is True
项目:virtualpdu    作者:openstack    | 项目源码 | 文件源码
def core_mock(self):
        return mock.Mock()
项目:virtualpdu    作者:openstack    | 项目源码 | 文件源码
def setUp(self):
        super(TestSnmpClient, self).setUp()
        self.command_generator_mock = mock.Mock()
        self.pysnmp_mock = mock.Mock()
        self.oneliner_cmdgen = mock.Mock()

        self.oneliner_cmdgen.CommandGenerator.return_value = \
            self.command_generator_mock

        self.oneliner_cmdgen.CommunityData.return_value = \
            sentinel.community_data

        self.oneliner_cmdgen.UdpTransportTarget.return_value = \
            sentinel.udp_transport_target

        self.snmp_client = snmp_client.SnmpClient(
            oneliner_cmdgen=self.oneliner_cmdgen,
            host=sentinel.hostname,
            port=sentinel.port,
            community=sentinel.community,
            timeout=sentinel.timeout,
            retries=sentinel.retries,
        )

        self.all_error_indications = [
            (errind.__dict__.get(a).__class__.__name__, errind.__dict__.get(a))
            for a in dir(errind)
            if isinstance(errind.__dict__.get(a), ErrorIndication)]
项目:virtualpdu    作者:openstack    | 项目源码 | 文件源码
def test_power_on(self, mock_open):
        domain_mock = mock.Mock()
        connection_mock = mock.Mock()
        connection_mock.lookupByName.return_value = domain_mock
        mock_open.return_value = connection_mock

        self.driver.power_on('domainA')

        mock_open.assert_called_with('hello')
        connection_mock.lookupByName.assert_called_with('domainA')
        domain_mock.create.assert_called_with()
        connection_mock.close.assert_called_with()
项目:virtualpdu    作者:openstack    | 项目源码 | 文件源码
def test_power_off(self, mock_open):
        domain_mock = mock.Mock()
        connection_mock = mock.Mock()
        connection_mock.lookupByName.return_value = domain_mock
        mock_open.return_value = connection_mock

        self.driver.power_off('domainA')

        mock_open.assert_called_with('hello')
        connection_mock.lookupByName.assert_called_with('domainA')
        domain_mock.destroy.assert_called_with()
        connection_mock.close.assert_called_with()
项目:virtualpdu    作者:openstack    | 项目源码 | 文件源码
def test_get_power_state_on(self, mock_open):
        domain_mock = mock.Mock()
        domain_mock.isActive.return_value = 1
        connection_mock = mock.Mock()
        connection_mock.lookupByName.return_value = domain_mock
        mock_open.return_value = connection_mock

        self.assertEqual(drivers.POWER_ON,
                         self.driver.get_power_state('domainA'))

        mock_open.assert_called_with('hello')
        connection_mock.lookupByName.assert_called_with('domainA')
        domain_mock.isActive.assert_called_with()
        connection_mock.close.assert_called_with()
项目:virtualpdu    作者:openstack    | 项目源码 | 文件源码
def test_get_power_state_off(self, mock_open):
        domain_mock = mock.Mock()
        domain_mock.isActive.return_value = 0
        connection_mock = mock.Mock()
        connection_mock.lookupByName.return_value = domain_mock
        mock_open.return_value = connection_mock

        self.assertEqual(drivers.POWER_OFF,
                         self.driver.get_power_state('domainA'))

        mock_open.assert_called_with('hello')
        connection_mock.lookupByName.assert_called_with('domainA')
        domain_mock.isActive.assert_called_with()
        connection_mock.close.assert_called_with()
项目:virtualpdu    作者:openstack    | 项目源码 | 文件源码
def test_get_power_state_domain_not_found(self, mock_open):
        connection_mock = mock.Mock()
        connection_mock.lookupByName.side_effect = \
            libvirt.libvirtError('virDomainLookupByName() failed',
                                 conn=connection_mock)
        mock_open.return_value = connection_mock

        self.assertRaises(drivers.DeviceNotFound,
                          self.driver.get_power_state, 'domainA')

        connection_mock.close.assert_called_with()
项目:virtualpdu    作者:openstack    | 项目源码 | 文件源码
def test_power_on_domain_not_found(self, mock_open):
        connection_mock = mock.Mock()
        connection_mock.lookupByName.side_effect = \
            libvirt.libvirtError('virDomainLookupByName() failed',
                                 conn=connection_mock)
        mock_open.return_value = connection_mock

        self.assertRaises(drivers.DeviceNotFound,
                          self.driver.power_on, 'domainA')

        connection_mock.close.assert_called_with()
项目:virtualpdu    作者:openstack    | 项目源码 | 文件源码
def test_power_off_domain_not_found(self, mock_open):
        connection_mock = mock.Mock()
        connection_mock.lookupByName.side_effect = \
            libvirt.libvirtError('virDomainLookupByName() failed',
                                 conn=connection_mock)
        mock_open.return_value = connection_mock

        self.assertRaises(drivers.DeviceNotFound,
                          self.driver.power_off, 'domainA')

        connection_mock.close.assert_called_with()
项目:virtualpdu    作者:openstack    | 项目源码 | 文件源码
def test_power_on_generic_error_goes_through(self, mock_open):
        domain_mock = mock.Mock()
        connection_mock = mock.Mock()
        connection_mock.lookupByName.return_value = domain_mock

        domain_mock.create.side_effect = SpecificException()
        mock_open.return_value = connection_mock

        self.assertRaises(SpecificException, self.driver.power_on, 'domainA')

        connection_mock.close.assert_called_with()
项目:virtualpdu    作者:openstack    | 项目源码 | 文件源码
def test_power_on_stays_connected(self, mock_open):
        domain_mock = mock.Mock()
        connection_mock = mock.Mock()
        connection_mock.lookupByName.return_value = domain_mock
        mock_open.return_value = connection_mock

        self.driver.power_on('domainA')

        mock_open.assert_called_with('hello')
        connection_mock.lookupByName.assert_called_with('domainA')
        domain_mock.create.assert_called_with()
        self.assertFalse(connection_mock.close.called)
项目:virtualpdu    作者:openstack    | 项目源码 | 文件源码
def setUp(self):
        super(PDUTestCase, self).setUp()
        self.community = 'test1212'
        self.core_mock = mock.Mock()
        self.pdu = self.pdu_class(name='test_pdu', core=self.core_mock)
        self.pdu_test_harness = pysnmp_handler.SNMPPDUHarness(
            self.pdu,
            '127.0.0.1',
            random.randint(20000, 30000),
            self.community)
        self.pdu_test_harness.start()
项目:cmdtree    作者:winkidney    | 项目源码 | 文件源码
def mocked_resource():
    return mock.Mock()
项目:backup    作者:twindb    | 项目源码 | 文件源码
def test_backup_mysql_status_updated(mock_get_dst,
                                     mock_backup_stream,
                                     mock_prep_status,
                                     mock_mysql_source,
                                     config_content):
    s_config = config_content.format(destination="ssh", port='1234')
    buf = StringIO.StringIO(s_config)
    config = ConfigParser.ConfigParser()
    config.readfp(buf)
    mock_dst = mock.Mock()
    mock_get_dst.return_value = mock_dst
    backup_mysql("hourly", config)
    mock_dst.status.assert_called_once()
项目:asg-rolling-upgrade    作者:crunch-accounting    | 项目源码 | 文件源码
def mock_paginator():
    return mock.Mock(spec=botocore.client.Paginator)
项目:asg-rolling-upgrade    作者:crunch-accounting    | 项目源码 | 文件源码
def mock_as_client(mock_paginator):
    mock_as_client = mock.Mock()
    mock_as_client.get_paginator.return_value = mock_paginator
    return mock_as_client
项目:asg-rolling-upgrade    作者:crunch-accounting    | 项目源码 | 文件源码
def mock_ec2():
    return mock.Mock(spec=botocore.client)
项目:asg-rolling-upgrade    作者:crunch-accounting    | 项目源码 | 文件源码
def mock_ec2_client():
    return mock.Mock()
项目:asg-rolling-upgrade    作者:crunch-accounting    | 项目源码 | 文件源码
def mock_ssh_connection():
    return mock.Mock(spec=SSHClient)
项目:asg-rolling-upgrade    作者:crunch-accounting    | 项目源码 | 文件源码
def mock_aws_manager():
    return mock.Mock(spec=AwsManager)
项目:asg-rolling-upgrade    作者:crunch-accounting    | 项目源码 | 文件源码
def mock_instance_manager():
    return mock.Mock(spec=InstanceSshManager)
项目:asg-rolling-upgrade    作者:crunch-accounting    | 项目源码 | 文件源码
def test_rum_gets_list_of_instances_to_upgrade(
    rolling_upgrade_manager,
    mock_aws_manager
):
    Instance = namedtuple('Instance', ['id'])

    instances = [Instance('instance1'),
                 Instance('instance2'),
                 Instance('instance3')]
    rolling_upgrade_manager.compare_instance_to_config = mock.Mock()
    mock_aws_manager.get_instances_for_asg.return_value = instances

    rolling_upgrade_manager.compare_instance_to_config.side_effect = \
        ([], [], ['diff'])

    result = rolling_upgrade_manager.get_instances_to_upgrade("test-asg", {})

    assert len(result) == 1
    assert instances[2] in result

    rolling_upgrade_manager.compare_instance_to_config.side_effect = \
        (['diff1', 'diff2'], [], ['diff'])
    result = rolling_upgrade_manager.get_instances_to_upgrade("test-asg", {})

    assert len(result) == 2
    assert instances[0] in result
    assert instances[2] in result

    rolling_upgrade_manager.compare_instance_to_config.side_effect = \
        ([], [], [])
    result = rolling_upgrade_manager.get_instances_to_upgrade("test-asg", {})

    assert len(result) == 0

    mock_aws_manager.get_instances_for_asg.assert_has_calls((
        mock.call('test-asg'),
        mock.call('test-asg'),
        mock.call('test-asg'),
    ))
项目:meteosangue    作者:meteosangue    | 项目源码 | 文件源码
def test_poster_with_tags_facebook(self, facebook_mock, phantom_driver):
        mock_body = open(os.path.join(os.path.dirname(__file__), 'data', 'crs_page.html')).read()

        posters_register._posters = []
        posters_register.register_poster(facebook_status, 'facebook_done')

        m = mock.Mock()

        m.put_photo.side_effect = mock.Mock(id='1')

        facebook_mock.return_value = m

        phantom_driver.return_value = MockPhantomJS(mock_body)

        self.assertEqual(len(Log.objects.all()), 0)

        meteo_fake_status = '?? Emergenza: A+ , A- , O+ , O- , B- , AB- , AB+\n'
        meteo_upload_fake = os.path.join(os.path.dirname(__file__), 'data', 'meteo_fake_upload.png')

        facebook_status(meteo_fake_status, meteo_upload_fake)

        self.assertEqual(m.put_wall_post.call_count, 0)
        self.assertEqual(m.put_photo.call_count, 1)
        self.assertEqual(
            m.put_photo.call_args[1]['tags'],
            '[{"tag_uid": "andrea.stagi"}, {"tag_uid": "patrick.arminio"}]'
        )
项目:virtualpdu    作者:internap    | 项目源码 | 文件源码
def core_mock(self):
        return mock.Mock()
项目:virtualpdu    作者:internap    | 项目源码 | 文件源码
def setUp(self):
        super(TestSnmpClient, self).setUp()
        self.command_generator_mock = mock.Mock()
        self.pysnmp_mock = mock.Mock()
        self.oneliner_cmdgen = mock.Mock()

        self.oneliner_cmdgen.CommandGenerator.return_value = \
            self.command_generator_mock

        self.oneliner_cmdgen.CommunityData.return_value = \
            sentinel.community_data

        self.oneliner_cmdgen.UdpTransportTarget.return_value = \
            sentinel.udp_transport_target

        self.snmp_client = snmp_client.SnmpClient(
            oneliner_cmdgen=self.oneliner_cmdgen,
            host=sentinel.hostname,
            port=sentinel.port,
            community=sentinel.community,
            timeout=sentinel.timeout,
            retries=sentinel.retries,
        )

        self.all_error_indications = [
            (errind.__dict__.get(a).__class__.__name__, errind.__dict__.get(a))
            for a in dir(errind)
            if isinstance(errind.__dict__.get(a), ErrorIndication)]
项目:virtualpdu    作者:internap    | 项目源码 | 文件源码
def test_power_on(self, mock_open):
        domain_mock = mock.Mock()
        connection_mock = mock.Mock()
        connection_mock.lookupByName.return_value = domain_mock
        mock_open.return_value = connection_mock

        self.driver.power_on('domainA')

        mock_open.assert_called_with('hello')
        connection_mock.lookupByName.assert_called_with('domainA')
        domain_mock.create.assert_called_with()
        connection_mock.close.assert_called_with()
项目:virtualpdu    作者:internap    | 项目源码 | 文件源码
def test_power_off(self, mock_open):
        domain_mock = mock.Mock()
        connection_mock = mock.Mock()
        connection_mock.lookupByName.return_value = domain_mock
        mock_open.return_value = connection_mock

        self.driver.power_off('domainA')

        mock_open.assert_called_with('hello')
        connection_mock.lookupByName.assert_called_with('domainA')
        domain_mock.destroy.assert_called_with()
        connection_mock.close.assert_called_with()
项目:virtualpdu    作者:internap    | 项目源码 | 文件源码
def test_get_power_state_on(self, mock_open):
        domain_mock = mock.Mock()
        domain_mock.isActive.return_value = 1
        connection_mock = mock.Mock()
        connection_mock.lookupByName.return_value = domain_mock
        mock_open.return_value = connection_mock

        self.assertEqual(drivers.POWER_ON,
                         self.driver.get_power_state('domainA'))

        mock_open.assert_called_with('hello')
        connection_mock.lookupByName.assert_called_with('domainA')
        domain_mock.isActive.assert_called_with()
        connection_mock.close.assert_called_with()
项目:virtualpdu    作者:internap    | 项目源码 | 文件源码
def test_get_power_state_off(self, mock_open):
        domain_mock = mock.Mock()
        domain_mock.isActive.return_value = 0
        connection_mock = mock.Mock()
        connection_mock.lookupByName.return_value = domain_mock
        mock_open.return_value = connection_mock

        self.assertEqual(drivers.POWER_OFF,
                         self.driver.get_power_state('domainA'))

        mock_open.assert_called_with('hello')
        connection_mock.lookupByName.assert_called_with('domainA')
        domain_mock.isActive.assert_called_with()
        connection_mock.close.assert_called_with()
项目:virtualpdu    作者:internap    | 项目源码 | 文件源码
def test_get_power_state_domain_not_found(self, mock_open):
        connection_mock = mock.Mock()
        connection_mock.lookupByName.side_effect = \
            libvirt.libvirtError('virDomainLookupByName() failed',
                                 conn=connection_mock)
        mock_open.return_value = connection_mock

        self.assertRaises(drivers.DeviceNotFound,
                          self.driver.get_power_state, 'domainA')

        connection_mock.close.assert_called_with()
项目:virtualpdu    作者:internap    | 项目源码 | 文件源码
def test_power_on_domain_not_found(self, mock_open):
        connection_mock = mock.Mock()
        connection_mock.lookupByName.side_effect = \
            libvirt.libvirtError('virDomainLookupByName() failed',
                                 conn=connection_mock)
        mock_open.return_value = connection_mock

        self.assertRaises(drivers.DeviceNotFound,
                          self.driver.power_on, 'domainA')

        connection_mock.close.assert_called_with()
项目:virtualpdu    作者:internap    | 项目源码 | 文件源码
def test_power_off_domain_not_found(self, mock_open):
        connection_mock = mock.Mock()
        connection_mock.lookupByName.side_effect = \
            libvirt.libvirtError('virDomainLookupByName() failed',
                                 conn=connection_mock)
        mock_open.return_value = connection_mock

        self.assertRaises(drivers.DeviceNotFound,
                          self.driver.power_off, 'domainA')

        connection_mock.close.assert_called_with()
项目:virtualpdu    作者:internap    | 项目源码 | 文件源码
def test_power_on_generic_error_goes_through(self, mock_open):
        domain_mock = mock.Mock()
        connection_mock = mock.Mock()
        connection_mock.lookupByName.return_value = domain_mock

        domain_mock.create.side_effect = SpecificException()
        mock_open.return_value = connection_mock

        self.assertRaises(SpecificException, self.driver.power_on, 'domainA')

        connection_mock.close.assert_called_with()
项目:virtualpdu    作者:internap    | 项目源码 | 文件源码
def test_power_on_stays_connected(self, mock_open):
        domain_mock = mock.Mock()
        connection_mock = mock.Mock()
        connection_mock.lookupByName.return_value = domain_mock
        mock_open.return_value = connection_mock

        self.driver.power_on('domainA')

        mock_open.assert_called_with('hello')
        connection_mock.lookupByName.assert_called_with('domainA')
        domain_mock.create.assert_called_with()
        self.assertFalse(connection_mock.close.called)
项目:virtualpdu    作者:internap    | 项目源码 | 文件源码
def setUp(self):
        super(PDUTestCase, self).setUp()
        self.community = 'test1212'
        self.core_mock = mock.Mock()
        self.pdu = self.pdu_class(name='test_pdu', core=self.core_mock)
        self.pdu_test_harness = pysnmp_handler.SNMPPDUHarness(
            self.pdu,
            '127.0.0.1',
            random.randint(20000, 30000),
            self.community)
        self.pdu_test_harness.start()