Python nose.tools 模块,assert_raises() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用nose.tools.assert_raises()

项目:ckanext-validation    作者:frictionlessdata    | 项目源码 | 文件源码
def test_show_anon_private_dataset(self):

        user = factories.User()
        org = factories.Organization()
        dataset = factories.Dataset(
            owner_org=org['id'], resources=[factories.Resource()],
            private=True)

        context = {
            'user': user['name'],
            'model': model
        }

        assert_raises(t.NotAuthorized,
                      call_auth, 'resource_validation_run', context=context,
                      resource_id=dataset['resources'][0]['id'])
项目:ckanext-validation    作者:frictionlessdata    | 项目源码 | 文件源码
def test_validation_fails_on_upload(self, mock_open):

        invalid_file = StringIO.StringIO()
        invalid_file.write(INVALID_CSV)

        mock_upload = MockFieldStorage(invalid_file, 'invalid.csv')

        dataset = factories.Dataset()

        invalid_stream = io.BufferedReader(io.BytesIO(INVALID_CSV))

        with mock.patch('io.open', return_value=invalid_stream):

            with assert_raises(t.ValidationError) as e:

                    call_action(
                        'resource_create',
                        package_id=dataset['id'],
                        format='CSV',
                        upload=mock_upload
                    )

        assert 'validation' in e.exception.error_dict
        assert 'missing-value' in str(e.exception)
        assert 'Row 2 has a missing value in column 4' in str(e.exception)
项目:ucsm_apis    作者:CiscoUcs    | 项目源码 | 文件源码
def test_boot_local_lun_001(mock_login, mock_query_dn, mock_order_clear):
    mock_login.return_value = True
    bp = LsbootPolicy("org-root", name="test")
    mock_query_dn.return_value = bp
    mock_order_clear.return_value = None

    # case1:
    # Add two local_lun without type
    devices = [
                {"device_name": "local_lun",
                "device_order": "1",
                },
                {"device_name": "local_lun",
                "device_order": "2",
                },
    ]

    expected_error_message = "_local_lun_add failed, error: Instance of Local Lun already added at order '1'"
    with assert_raises(UcsOperationError) as error:
         boot_policy_order_set(handle, boot_policy_dn, devices)
    assert_equal(error.exception.message, expected_error_message)
项目:ucsm_apis    作者:CiscoUcs    | 项目源码 | 文件源码
def test_boot_local_lun_002(mock_login, mock_query_dn, mock_order_clear):
    mock_login.return_value = True
    bp = LsbootPolicy("org-root", name="test")
    mock_query_dn.return_value = bp
    mock_order_clear.return_value = None

    # case2:
    # Add first local_lun without type
    # Add next local_lun with type
    devices = [
                {"device_name": "local_lun",
                 "device_order": "1",
                },
                {"device_name": "local_lun",
                 "device_order": "2",
                 "type": "primary",
                 "lun_name": "test_primary"
                },
    ]

    expected_error_message = "_local_lun_add failed, error: Instance of Local Lun already added at order '1'"
    with assert_raises(UcsOperationError) as error:
         boot_policy_order_set(handle, boot_policy_dn, devices)
    assert_equal(error.exception.message, expected_error_message)
项目:ucsm_apis    作者:CiscoUcs    | 项目源码 | 文件源码
def test_boot_local_lun_003(mock_login, mock_query_dn, mock_order_clear):
    mock_login.return_value = True
    bp = LsbootPolicy("org-root", name="test")
    mock_query_dn.return_value = bp
    mock_order_clear.return_value = None

    # case3:
    # Add first local_lun with type
    # Add next local_lun with type
    devices = [
                {"device_name": "local_lun",
                 "device_order": "1",
                 "type": "primary",
                 "lun_name": "primary"
                },
                {"device_name": "local_lun",
                 "device_order": "2",
                },
    ]

    expected_error_message = "_local_lun_add failed, error: Required parameter 'lun_name' or 'type' missing."
    with assert_raises(UcsOperationError) as error:
         boot_policy_order_set(handle, boot_policy_dn, devices)
    assert_equal(error.exception.message, expected_error_message)
项目:ucsm_apis    作者:CiscoUcs    | 项目源码 | 文件源码
def test_boot_local_jbod_002(mock_login, mock_query_dn, mock_order_clear):
    mock_login.return_value = True
    bp = LsbootPolicy("org-root", name="test")
    mock_query_dn.return_value = bp
    mock_order_clear.return_value = None

    # local_jbod_case2
    # Add local_jbod without slot_number
    devices = [
                {"device_name": "local_jbod",
                 "device_order": "1",
                },
    ]

    expected_error_message = "_local_jbod_add() takes exactly 3 arguments (2 given)"
    with assert_raises(TypeError) as error:
         boot_policy_order_set(handle, boot_policy_dn, devices)
    assert_equal(error.exception.message, expected_error_message)
项目:ucsm_apis    作者:CiscoUcs    | 项目源码 | 文件源码
def test_boot_embedded_disk_001(mock_login, mock_query_dn, mock_order_clear):
    mock_login.return_value = True
    bp = LsbootPolicy("org-root", name="test")
    mock_query_dn.return_value = bp
    mock_order_clear.return_value = None

    # local_embedded_disk_case1:
    # Add two local_embedded_disk  without type
    devices = [
                {"device_name": "embedded_disk",
                "device_order": "1",
                },
                {"device_name": "embedded_disk",
                "device_order": "2",
                },
    ]

    expected_error_message = "_local_embedded_disk_add failed, error: Instance of Local Embedded Disk already added at order '1'"
    with assert_raises(UcsOperationError) as error:
         boot_policy_order_set(handle, boot_policy_dn, devices)
    assert_equal(error.exception.message, expected_error_message)
项目:ucsm_apis    作者:CiscoUcs    | 项目源码 | 文件源码
def test_boot_embedded_disk_002(mock_login, mock_query_dn, mock_order_clear):
    mock_login.return_value = True
    bp = LsbootPolicy("org-root", name="test")
    mock_query_dn.return_value = bp
    mock_order_clear.return_value = None

    # local_embedded_disk_case2:
    # Add first local_embedded_disk without type
    # Add next local_embedded_disk with type
    devices = [
                {"device_name": "embedded_disk",
                "device_order": "1",
                },
                {"device_name": "embedded_disk",
                "device_order": "2",
                 "type": "primary",
                 "slot_number": "1"
                },
    ]

    expected_error_message = "_local_embedded_disk_add failed, error: Instance of Local Embedded Disk already added at order '1'"
    with assert_raises(UcsOperationError) as error:
         boot_policy_order_set(handle, boot_policy_dn, devices)
    assert_equal(error.exception.message, expected_error_message)
项目:ucsm_apis    作者:CiscoUcs    | 项目源码 | 文件源码
def test_boot_embedded_disk_003(mock_login, mock_query_dn, mock_order_clear):
    mock_login.return_value = True
    bp = LsbootPolicy("org-root", name="test")
    mock_query_dn.return_value = bp
    mock_order_clear.return_value = None

    # local_embedded_disk_case3:
    # Add first local_embedded_disk with type
    # Add next local_embedded_disk without type
    devices = [
                {"device_name": "embedded_disk",
                "device_order": "1",
                 "type": "primary",
                 "slot_number": "1"
                },
                {"device_name": "embedded_disk",
                "device_order": "2",
                },
    ]

    expected_error_message = "_local_embedded_disk_add failed, error: Required parameter 'slot_number' or 'type' missing."
    with assert_raises(UcsOperationError) as error:
         boot_policy_order_set(handle, boot_policy_dn, devices)
    assert_equal(error.exception.message, expected_error_message)
项目:ucsm_apis    作者:CiscoUcs    | 项目源码 | 文件源码
def test_boot_lan_001(mock_login, mock_query_dn, mock_order_clear):
    mock_login.return_value = True
    bp = LsbootPolicy("org-root", name="test")
    mock_query_dn.return_value = bp
    mock_order_clear.return_value = None

    # lan_case1:
    # Add first lan without vnic_name and with type
    devices = [
                {"device_name": "lan",
                "device_order": "1",
                },
    ]

    expected_error_message = "_lan_device_add() takes exactly 3 arguments (2 given)"
    with assert_raises(TypeError) as error:
         boot_policy_order_set(handle, boot_policy_dn, devices)
    assert_equal(error.exception.message, expected_error_message)
项目:ucsm_apis    作者:CiscoUcs    | 项目源码 | 文件源码
def test_boot_iscsi_001(mock_login, mock_query_dn, mock_order_clear):
    mock_login.return_value = True
    bp = LsbootPolicy("org-root", name="test")
    mock_query_dn.return_value = bp
    mock_order_clear.return_value = None

    # iscsi_case1:
    # Add first iscsi without vnic_name and with type
    devices = [
                {"device_name": "iscsi",
                "device_order": "1",
                 "type": "primary",
                },
    ]

    expected_error_message = "_iscsi_device_add() got an unexpected keyword argument 'type'"
    with assert_raises(TypeError) as error:
         boot_policy_order_set(handle, boot_policy_dn, devices)
    assert_equal(error.exception.message, expected_error_message)
项目:ucsm_apis    作者:CiscoUcs    | 项目源码 | 文件源码
def test_boot_iscsi_002(mock_login, mock_query_dn, mock_order_clear):
    mock_login.return_value = True
    bp = LsbootPolicy("org-root", name="test")
    mock_query_dn.return_value = bp
    mock_order_clear.return_value = None

    # iscsi_case2:
    # Add first iscsi without vnic_name and type
    devices = [
                {"device_name": "iscsi",
                "device_order": "1",
                },
    ]

    expected_error_message = "_iscsi_device_add() takes exactly 3 arguments (2 given)"
    with assert_raises(TypeError) as error:
         boot_policy_order_set(handle, boot_policy_dn, devices)
    assert_equal(error.exception.message, expected_error_message)
项目:ucsm_apis    作者:CiscoUcs    | 项目源码 | 文件源码
def test_boot_special_case_01(mock_login, mock_query_dn, mock_order_clear):
    mock_login.return_value = True
    bp = LsbootPolicy("org-root", name="test")
    mock_query_dn.return_value = bp
    mock_order_clear.return_value = None

    # special_case1:
    # Add first local_disk
    # Add another local_device
    devices = [
                {"device_name": "local_disk",
                "device_order": "1",
                },
                {"device_name": "sdcard",
                "device_order": "2",
                },
    ]

    expected_error_message = "_device_add failed, error: local_disk cannot be added with other local devices."
    with assert_raises(UcsOperationError) as error:
         boot_policy_order_set(handle, boot_policy_dn, devices)
    assert_equal(error.exception.message, expected_error_message)
项目:ucsm_apis    作者:CiscoUcs    | 项目源码 | 文件源码
def test_boot_special_case_02(mock_login, mock_query_dn, mock_order_clear):
    mock_login.return_value = True
    bp = LsbootPolicy("org-root", name="test")
    mock_query_dn.return_value = bp
    mock_order_clear.return_value = None


    # special_case2:
    # Add first cd_dvd
    # Add another cd_dvd_local
    devices = [
                {"device_name": "cd_dvd",
                "device_order": "1",
                },
                {"device_name": "cd_dvd_local",
                "device_order": "2",
                },
    ]

    expected_error_message = "_device_add failed, error: 'cd_dvd' or 'cd_dvd_local, cd_dvd_remote'"
    with assert_raises(UcsOperationError) as error:
         boot_policy_order_set(handle, boot_policy_dn, devices)
    assert_equal(error.exception.message, expected_error_message)
项目:ucsm_apis    作者:CiscoUcs    | 项目源码 | 文件源码
def test_boot_special_case_03(mock_login, mock_query_dn, mock_order_clear):
    mock_login.return_value = True
    bp = LsbootPolicy("org-root", name="test")
    mock_query_dn.return_value = bp
    mock_order_clear.return_value = None

    # special_case3:
    # Add first floppy
    # Add another floppy_local
    devices = [
                {"device_name": "floppy",
                "device_order": "1",
                },
                {"device_name": "floppy_local",
                "device_order": "2",
                },
    ]

    expected_error_message = "_device_add failed, error: 'floppy' or 'floppy_local, floppy_remote'"
    with assert_raises(UcsOperationError) as error:
         boot_policy_order_set(handle, boot_policy_dn, devices)
    assert_equal(error.exception.message, expected_error_message)
项目:ucsm_apis    作者:CiscoUcs    | 项目源码 | 文件源码
def test_boot_special_case_05(mock_login, mock_query_dn, mock_order_clear):
    mock_login.return_value = True
    bp = LsbootPolicy("org-root", name="test")
    mock_query_dn.return_value = bp
    mock_order_clear.return_value = None

    # special_case4:
    # Add virtual_drive
    # Add virtual_drive
    devices = [
                {"device_name": "virtual_drive",
                "device_order": "1",
                },
                {"device_name": "virtual_drive",
                "device_order": "2",
                },
    ]

    expected_error_message = "_vmedia_device_add failed, error: Device 'virtual_drive' already exist at order '1'"
    with assert_raises(UcsOperationError) as error:
         boot_policy_order_set(handle, boot_policy_dn, devices)
    assert_equal(error.exception.message, expected_error_message)
项目:django-nameko    作者:and3rson    | 项目源码 | 文件源码
def test_cluster_proxy_pool():
    with patch('django_nameko.rpc.ClusterRpcProxy') as FakeClusterRpcProxy:
        pool = rpc.ClusterRpcProxyPool(dict(), pool_size=2)
        pool.start()
        assert pool.queue.qsize() == 2

        with pool.next() as client:
            assert pool.queue.qsize() == 1

            client.foo.bar()
            assert call().start().foo.bar() in FakeClusterRpcProxy.mock_calls

            with pool.next():
                assert pool.queue.qsize() == 0

                tools.assert_raises(queue_six.Empty, pool.next, timeout=1)

            assert pool.queue.qsize() == 1
        assert pool.queue.qsize() == 2

        pool.stop()
项目:ckanext-powerview    作者:OCHA-DAP    | 项目源码 | 文件源码
def test_powerview_create_title_missing(self):
        '''Missing title raises ValidationError.'''
        sysadmin = Sysadmin()

        data_dict = self._make_create_data_dict()
        # remove title key
        del data_dict['title']
        with nosetools.assert_raises(ValidationError) as cm:
            toolkit.get_action('powerview_create')(
                context={'user': sysadmin['name']},
                data_dict=data_dict
            )
        error_dict = cm.exception.error_dict['title']
        nosetools.assert_true("Missing value"
                              in error_dict,
                              "Expected string not in exception message.")
项目:ckanext-powerview    作者:OCHA-DAP    | 项目源码 | 文件源码
def test_powerview_create_view_type_missing(self):
        '''Missing view_type raises ValidationError.'''
        sysadmin = Sysadmin()

        data_dict = self._make_create_data_dict()
        # remove title key
        del data_dict['view_type']
        with nosetools.assert_raises(ValidationError) as cm:
            toolkit.get_action('powerview_create')(
                context={'user': sysadmin['name']},
                data_dict=data_dict
            )
        error_dict = cm.exception.error_dict['view_type']
        nosetools.assert_true("Missing value"
                              in error_dict,
                              "Expected string not in exception message.")
项目:ckanext-powerview    作者:OCHA-DAP    | 项目源码 | 文件源码
def test_powerview_create_config_must_be_json(self):
        '''If present, config must be a json string.'''
        sysadmin = Sysadmin()

        data_dict = self._make_create_data_dict()
        # replace config with non-json string
        data_dict['config'] = "I'm not json."
        with nosetools.assert_raises(ValidationError) as cm:
            toolkit.get_action('powerview_create')(
                context={'user': sysadmin['name']},
                data_dict=data_dict
            )
        error_dict = cm.exception.error_dict['config']
        nosetools.assert_true("Could not parse as valid JSON"
                              in error_dict,
                              "Expected string not in exception message.")
项目:ckanext-powerview    作者:OCHA-DAP    | 项目源码 | 文件源码
def test_powerview_update_no_id(self):
        '''Updating with a missing powerview id raises error.'''

        sysadmin = Sysadmin()

        powerview_dict = factories.PowerView()
        del powerview_dict['id']

        with nosetools.assert_raises(ValidationError) as cm:
            toolkit.get_action('powerview_update')(
                context={'user': sysadmin['name']},
                data_dict=powerview_dict
            )
        error_dict = cm.exception.error_dict['id']
        nosetools.assert_true("Missing value"
                              in error_dict,
                              "Expected string not in exception message.")
项目:ckanext-powerview    作者:OCHA-DAP    | 项目源码 | 文件源码
def test_powerview_update_nonexisting_id(self):
        '''Updating with a non-existing powerview id raises error.'''

        sysadmin = Sysadmin()

        powerview_dict = factories.PowerView()
        powerview_dict['id'] = 'non-existing-id'

        with nosetools.assert_raises(ValidationError) as cm:
            toolkit.get_action('powerview_update')(
                context={'user': sysadmin['name']},
                data_dict=powerview_dict
            )
        error_dict = cm.exception.error_dict['id']
        nosetools.assert_true("Not found: PowerView"
                              in error_dict,
                              "Expected string not in exception message.")
项目:ckanext-powerview    作者:OCHA-DAP    | 项目源码 | 文件源码
def test_powerview_show_no_id(self):
        '''Calling powerview show with a missing id raises error.'''

        sysadmin = Sysadmin()

        factories.PowerView()

        with nosetools.assert_raises(ValidationError) as cm:
            toolkit.get_action('powerview_show')(
                context={'user': sysadmin['name']},
                data_dict={}
            )
        error_dict = cm.exception.error_dict['id']
        nosetools.assert_true("Missing value"
                              in error_dict,
                              "Expected string not in exception message.")
项目:ckanext-powerview    作者:OCHA-DAP    | 项目源码 | 文件源码
def test_powerview_show_nonexisting_id(self):
        '''Calling powerview show with a non-existing id raises error.'''

        sysadmin = Sysadmin()

        factories.PowerView()

        with nosetools.assert_raises(ValidationError) as cm:
            toolkit.get_action('powerview_show')(
                context={'user': sysadmin['name']},
                data_dict={'id': 'non-existin-id'}
            )
        error_dict = cm.exception.error_dict['id']
        nosetools.assert_true("Not found: PowerView"
                              in error_dict,
                              "Expected string not in exception message.")
项目:ckanext-powerview    作者:OCHA-DAP    | 项目源码 | 文件源码
def test_powerview_resource_list_nonexisting_id(self):
        '''Calling powerview resource list with a non-existing id raises
        error.'''
        sysadmin = Sysadmin()

        factories.PowerView()

        with nosetools.assert_raises(ValidationError) as cm:
            toolkit.get_action('powerview_resource_list')(
                context={'user': sysadmin['name']},
                data_dict={'id': 'non-existin-id'}
            )
        error_dict = cm.exception.error_dict['id']
        nosetools.assert_true("Not found: PowerView"
                              in error_dict,
                              "Expected string not in exception message.")
项目:ckanext-powerview    作者:OCHA-DAP    | 项目源码 | 文件源码
def test_powerview_delete_no_id(self):
        '''Calling powerview delete with a missing id raises error.'''

        sysadmin = Sysadmin()

        factories.PowerView()

        with nosetools.assert_raises(ValidationError) as cm:
            toolkit.get_action('powerview_delete')(
                context={'user': sysadmin['name']},
                data_dict={}
            )
        error_dict = cm.exception.error_dict['id']
        nosetools.assert_true("Missing value"
                              in error_dict,
                              "Expected string not in exception message.")
项目:ckanext-powerview    作者:OCHA-DAP    | 项目源码 | 文件源码
def test_powerview_delete_nonexisting_id(self):
        '''Calling powerview delete with a non-existing id raises error.'''

        sysadmin = Sysadmin()

        factories.PowerView()

        with nosetools.assert_raises(ValidationError) as cm:
            toolkit.get_action('powerview_delete')(
                context={'user': sysadmin['name']},
                data_dict={'id': 'non-existin-id'}
            )
        error_dict = cm.exception.error_dict['id']
        nosetools.assert_true("Not found: PowerView"
                              in error_dict,
                              "Expected string not in exception message.")
项目:ckanext-powerview    作者:OCHA-DAP    | 项目源码 | 文件源码
def test_powerview_add_resource_nonexisting_resource_id(self):
        '''Calling powerview_add_resource with nonexisting resource id raises
        ValidationError.'''
        sysadmin = Sysadmin()
        powerview = factories.PowerView()

        with nosetools.assert_raises(ValidationError) as cm:
            toolkit.get_action('powerview_add_resource')(
                context={'user': sysadmin['name']},
                data_dict={'id': powerview['id'],
                           'resource_id': 'non-existing-id'}
            )
        error_dict = cm.exception.error_dict['resource_id']
        nosetools.assert_true("Not found: Resource"
                              in error_dict,
                              "Expected string not in exception message.")
项目:ckanext-powerview    作者:OCHA-DAP    | 项目源码 | 文件源码
def test_powerview_remove_resource_nonexisting_resource_id(self):
        '''Calling powerview_remove_resource with nonexisting resource id raises
        ValidationError.'''
        sysadmin = Sysadmin()
        powerview = factories.PowerView()

        with nosetools.assert_raises(ValidationError) as cm:
            toolkit.get_action('powerview_remove_resource')(
                context={'user': sysadmin['name']},
                data_dict={'id': powerview['id'],
                           'resource_id': 'non-existing-id'}
            )
        error_dict = cm.exception.error_dict['resource_id']
        nosetools.assert_true("Not found: Resource"
                              in error_dict,
                              "Expected string not in exception message.")
项目:ckanext-powerview    作者:OCHA-DAP    | 项目源码 | 文件源码
def test_powerview_create_normal_user_allow_create_private_resources(self):
        '''
        Calling powerview create with normal user when allow_user_create is
        true, with unauthorized resources, raises NotAuthorized.
        '''
        org = factories.Organization()
        dataset = factories.Dataset(owner_org=org['id'], private="true")
        r1 = factories.Resource(package_id=dataset['id'])
        r2 = factories.Resource(package_id=dataset['id'])

        a_user = factories.User()

        pv_dict = self._make_create_data_dict(resources=[r1['id'], r2['id']])
        context = {'user': a_user['name'], 'model': model}
        nosetools.assert_raises(NotAuthorized, helpers.call_auth,
                                'ckanext_powerview_create', context=context,
                                **pv_dict)
项目:ckanext-powerview    作者:OCHA-DAP    | 项目源码 | 文件源码
def test_powerview_add_resource_normal_user(self):
        '''
        Calling powerview add resource for a normal user raises NotAuthorized.
        '''
        a_user = factories.User()
        dataset = factories.Dataset()
        r1 = factories.Resource(package_id=dataset['id'])

        powerview = powerview_factories.PowerView(user=a_user)

        context = {'user': a_user['name'], 'model': model}
        nosetools.assert_raises(NotAuthorized, helpers.call_auth,
                                'ckanext_powerview_add_resource',
                                context=context,
                                powerview_id=powerview['id'],
                                resource_id=r1['id'])
项目:ckanext-powerview    作者:OCHA-DAP    | 项目源码 | 文件源码
def test_powerview_add_resource_anon_user_allow_create(self):
        '''
        Calling powerview add resource for an anon user still raises
        NotAuthorized.
        '''
        dataset = factories.Dataset()
        r1 = factories.Resource(package_id=dataset['id'])

        powerview = powerview_factories.PowerView()

        context = {'user': '', 'model': model}
        nosetools.assert_raises(NotAuthorized, helpers.call_auth,
                                'ckanext_powerview_add_resource',
                                context=context,
                                powerview_id=powerview['id'],
                                resource_id=r1['id'])
项目:ckanext-powerview    作者:OCHA-DAP    | 项目源码 | 文件源码
def test_powerview_remove_resource_normal_user(self):
        '''
        Calling powerview remove resource for a normal user raises
        NotAuthorized.
        '''
        a_user = factories.User()
        dataset = factories.Dataset()
        r1 = factories.Resource(package_id=dataset['id'])

        powerview = powerview_factories.PowerView(user=a_user,
                                                  resources=[r1['id']])

        context = {'user': a_user['name'], 'model': model}
        nosetools.assert_raises(NotAuthorized, helpers.call_auth,
                                'ckanext_powerview_remove_resource',
                                context=context,
                                powerview_id=powerview['id'],
                                resource_id=r1['id'])
项目:ckanext-powerview    作者:OCHA-DAP    | 项目源码 | 文件源码
def test_powerview_remove_resource_anon_user(self):
        '''
        Calling powerview remove resource for an anon user raises
        NotAuthorized.
        '''
        dataset = factories.Dataset()
        r1 = factories.Resource(package_id=dataset['id'])

        powerview = powerview_factories.PowerView(resources=[r1['id']])

        context = {'user': '', 'model': model}
        nosetools.assert_raises(NotAuthorized, helpers.call_auth,
                                'ckanext_powerview_remove_resource',
                                context=context,
                                powerview_id=powerview['id'],
                                resource_id=r1['id'])
项目:ckanext-powerview    作者:OCHA-DAP    | 项目源码 | 文件源码
def test_powerview_remove_resource_anon_user_allow_create(self):
        '''
        Calling powerview remove resource for an anon user still raises
        NotAuthorized.
        '''
        dataset = factories.Dataset()
        r1 = factories.Resource(package_id=dataset['id'])

        powerview = powerview_factories.PowerView(resources=[r1['id']])

        context = {'user': '', 'model': model}
        nosetools.assert_raises(NotAuthorized, helpers.call_auth,
                                'ckanext_powerview_remove_resource',
                                context=context,
                                powerview_id=powerview['id'],
                                resource_id=r1['id'])
项目:ckanext-powerview    作者:OCHA-DAP    | 项目源码 | 文件源码
def test_powerview_resource_list_normal_user_private_resources(self):
        '''
        Calling powerview resource list with a normal user on a powerview
        containing private resources raises NotAuthorized.
        '''
        a_user = factories.User()
        org = factories.Organization()
        dataset = factories.Dataset(owner_org=org['id'],
                                    private="true")
        r1 = factories.Resource(package_id=dataset['id'])
        r2 = factories.Resource(package_id=dataset['id'])

        powerview = powerview_factories.PowerView(private='no',
                                                  resources=[r1['id'],
                                                             r2['id']])

        context = {'user': a_user['name'], 'model': model}
        nosetools.assert_raises(NotAuthorized, helpers.call_auth,
                                'ckanext_powerview_resource_list',
                                context=context,
                                id=powerview['id'])
项目:ckanext-powerview    作者:OCHA-DAP    | 项目源码 | 文件源码
def test_powerview_show_with_private_resource_not_authorized(self):
        '''Calling powerview_show will raise NotAuthorized if powerview
        contains a private resource for user.'''
        org = Organization()
        user = User()
        dataset = Dataset(owner_org=org['id'], private="true")
        r1 = Resource(package_id=dataset['id'])
        r2 = Resource(package_id=dataset['id'])

        p1 = factories.PowerView(private=False,
                                 resources=[r1['id'],
                                            r2['id']])

        powerview_show_action = toolkit.get_action('powerview_show')
        context = {'user': user['name']}
        nosetools.assert_raises(toolkit.NotAuthorized, powerview_show_action,
                                context=context, data_dict={'id': p1['id']})
项目:ckanext-powerview    作者:OCHA-DAP    | 项目源码 | 文件源码
def test_powerview_remove_resource_unassociated_resource(self):
        '''Calling powerview_remove_resource with an unassociated resource id
        raises an error.'''
        sysadmin = Sysadmin()
        r1 = Resource()
        r2 = Resource()

        powerview_dict = factories.PowerView(resources=[r1['id']])

        with nosetools.assert_raises(ValidationError):
            toolkit.get_action('powerview_remove_resource')(
                context={'user': sysadmin['name']},
                data_dict={
                    'id': powerview_dict['id'],
                    'resource_id': r2['id']
                }
            )
项目:dd-trace-py    作者:DataDog    | 项目源码 | 文件源码
def test_engine_execute_errors(self):
        # ensures that SQL errors are reported
        with assert_raises(ProgrammingError):
            with self.connection() as conn:
                conn.execute('SELECT * FROM a_wrong_table').fetchall()

        traces = self.tracer.writer.pop_traces()
        # trace composition
        eq_(len(traces), 1)
        eq_(len(traces[0]), 1)
        span = traces[0][0]
        # span fields
        eq_(span.name, '{}.query'.format(self.VENDOR))
        eq_(span.service, self.SERVICE)
        eq_(span.resource, 'SELECT * FROM a_wrong_table')
        eq_(span.get_tag('sql.db'), self.SQL_DB)
        ok_(span.get_tag('sql.rows') is None)
        self.check_meta(span)
        eq_(span.span_type, 'sql')
        ok_(span.duration > 0)
        # check the error
        eq_(span.error, 1)
        eq_(span.get_tag('error.type'), 'mysql.connector.errors.ProgrammingError')
        ok_("Table 'test.a_wrong_table' doesn't exist" in span.get_tag('error.msg'))
        ok_("Table 'test.a_wrong_table' doesn't exist" in span.get_tag('error.stack'))
项目:dd-trace-py    作者:DataDog    | 项目源码 | 文件源码
def test_engine_execute_errors(self):
        # ensures that SQL errors are reported
        with assert_raises(ProgrammingError):
            with self.connection() as conn:
                conn.execute('SELECT * FROM a_wrong_table').fetchall()

        traces = self.tracer.writer.pop_traces()
        # trace composition
        eq_(len(traces), 1)
        eq_(len(traces[0]), 1)
        span = traces[0][0]
        # span fields
        eq_(span.name, '{}.query'.format(self.VENDOR))
        eq_(span.service, self.SERVICE)
        eq_(span.resource, 'SELECT * FROM a_wrong_table')
        eq_(span.get_tag('sql.db'), self.SQL_DB)
        ok_(span.get_tag('sql.rows') is None)
        self.check_meta(span)
        eq_(span.span_type, 'sql')
        ok_(span.duration > 0)
        # check the error
        eq_(span.error, 1)
        ok_('relation "a_wrong_table" does not exist' in span.get_tag('error.msg'))
        ok_('ProgrammingError' in span.get_tag('error.type'))
        ok_('ProgrammingError: relation "a_wrong_table" does not exist' in span.get_tag('error.stack'))
项目:dd-trace-py    作者:DataDog    | 项目源码 | 文件源码
def test_engine_execute_errors(self):
        # ensures that SQL errors are reported
        with assert_raises(OperationalError):
            with self.connection() as conn:
                conn.execute('SELECT * FROM a_wrong_table').fetchall()

        traces = self.tracer.writer.pop_traces()
        # trace composition
        eq_(len(traces), 1)
        eq_(len(traces[0]), 1)
        span = traces[0][0]
        # span fields
        eq_(span.name, '{}.query'.format(self.VENDOR))
        eq_(span.service, self.SERVICE)
        eq_(span.resource, 'SELECT * FROM a_wrong_table')
        eq_(span.get_tag('sql.db'), self.SQL_DB)
        ok_(span.get_tag('sql.rows') is None)
        eq_(span.span_type, 'sql')
        ok_(span.duration > 0)
        # check the error
        eq_(span.error, 1)
        eq_(span.get_tag('error.msg'), 'no such table: a_wrong_table')
        ok_('OperationalError' in span.get_tag('error.type'))
        ok_('OperationalError: no such table: a_wrong_table' in span.get_tag('error.stack'))
项目:dd-trace-py    作者:DataDog    | 项目源码 | 文件源码
def test_wrapper_incr_safety(self):
        # get the default cache
        cache = caches['default']

        # it should fail not because of our wrapper
        with assert_raises(ValueError) as ex:
            cache.incr('missing_key')

        # the error is not caused by our tracer
        eq_(ex.exception.args[0], "Key 'missing_key' not found")
        # an error trace must be sent
        spans = self.tracer.writer.pop()
        eq_(len(spans), 2)
        span = spans[0]
        eq_(span.resource, 'incr')
        eq_(span.name, 'django.cache')
        eq_(span.span_type, 'cache')
        eq_(span.error, 1)
项目:dd-trace-py    作者:DataDog    | 项目源码 | 文件源码
def test_wrapper_decr_safety(self):
        # get the default cache
        cache = caches['default']

        # it should fail not because of our wrapper
        with assert_raises(ValueError) as ex:
            cache.decr('missing_key')

        # the error is not caused by our tracer
        eq_(ex.exception.args[0], "Key 'missing_key' not found")
        # an error trace must be sent
        spans = self.tracer.writer.pop()
        eq_(len(spans), 3)
        span = spans[0]
        eq_(span.resource, 'decr')
        eq_(span.name, 'django.cache')
        eq_(span.span_type, 'cache')
        eq_(span.error, 1)
项目:dd-trace-py    作者:DataDog    | 项目源码 | 文件源码
def test_template_renderer_exception(self):
        # it should trace the Template exceptions generation even outside web handlers
        t = template.Template('{% module ModuleThatDoesNotExist() %}')
        with assert_raises(NameError):
            t.generate()

        traces = self.tracer.writer.pop_traces()
        eq_(1, len(traces))
        eq_(1, len(traces[0]))

        template_span = traces[0][0]
        eq_('tornado-web', template_span.service)
        eq_('tornado.template', template_span.name)
        eq_('template', template_span.span_type)
        eq_('render_string', template_span.resource)
        eq_('render_string', template_span.get_tag('tornado.template_name'))
        eq_(1, template_span.error)
        ok_('is not defined' in template_span.get_tag('error.msg'))
        ok_('NameError' in template_span.get_tag('error.stack'))
项目:dati-ckan-docker    作者:italia    | 项目源码 | 文件源码
def test_resource_delete_editor(self):
        '''Normally organization admins can delete resources
        Our plugin prevents this by blocking delete organization.

        Ensure the delete button is not displayed (as only resource delete
        is checked for showing this)

        '''
        user = factories.User()
        owner_org = factories.Organization(
            users=[{'name': user['id'], 'capacity': 'admin'}]
        )
        dataset = factories.Dataset(owner_org=owner_org['id'])
        resource = factories.Resource(package_id=dataset['id'])
        with assert_raises(logic.NotAuthorized) as e:
            logic.check_access('resource_delete', {'user': user['name']}, {'id': resource['id']})

        assert_equal(e.exception.message, 'User %s not authorized to delete resource %s' % (user['name'], resource['id']))
项目:dati-ckan-docker    作者:italia    | 项目源码 | 文件源码
def test_remote_groups_only_local(self):
        # Create an existing group
        Group(id='group1-id', name='group1')

        config = {'remote_groups': 'only_local'}
        results_by_guid = run_harvest(
            url='http://localhost:%s' % mock_ckan.PORT,
            harvester=CKANHarvester(),
            config=json.dumps(config))
        assert 'dataset1-id' in results_by_guid

        # Check that the dataset was added to the existing local group
        dataset = call_action('package_show', {}, id=mock_ckan.DATASETS[0]['id'])
        assert_equal(dataset['groups'][0]['id'], mock_ckan.DATASETS[0]['groups'][0]['id'])

        # Check that the other remote group was not created locally
        assert_raises(toolkit.ObjectNotFound, call_action, 'group_show', {},
                      id='remote-group')
项目:leetcode    作者:thomasyimgit    | 项目源码 | 文件源码
def test_extension():
    # Debugging information for failures of this test
    print('sys.path:')
    for p in sys.path:
        print(' ', p)
    print('CWD', os.getcwd())

    nt.assert_raises(ImportError, _ip.magic, "load_ext daft_extension")
    daft_path = os.path.join(os.path.dirname(__file__), "daft_extension")
    sys.path.insert(0, daft_path)
    try:
        _ip.user_ns.pop('arq', None)
        invalidate_caches()   # Clear import caches
        _ip.magic("load_ext daft_extension")
        nt.assert_equal(_ip.user_ns['arq'], 185)
        _ip.magic("unload_ext daft_extension")
        assert 'arq' not in _ip.user_ns
    finally:
        sys.path.remove(daft_path)
项目:leetcode    作者:thomasyimgit    | 项目源码 | 文件源码
def test_pop_string():
    f = PlainTextFormatter()
    type_str = '%s.%s' % (C.__module__, 'C')

    with nt.assert_raises(KeyError):
        f.pop(type_str)

    f.for_type(type_str, foo_printer)
    f.pop(type_str)
    with nt.assert_raises(KeyError):
        f.lookup_by_type(C)
    with nt.assert_raises(KeyError):
        f.pop(type_str)

    f.for_type(C, foo_printer)
    nt.assert_is(f.pop(type_str, None), foo_printer)
    with nt.assert_raises(KeyError):
        f.lookup_by_type(C)
    with nt.assert_raises(KeyError):
        f.pop(type_str)
    nt.assert_is(f.pop(type_str, None), None)
项目:leetcode    作者:thomasyimgit    | 项目源码 | 文件源码
def eval_formatter_check(f):
    ns = dict(n=12, pi=math.pi, stuff='hello there', os=os, u=u"café", b="café")
    s = f.format("{n} {n//4} {stuff.split()[0]}", **ns)
    nt.assert_equal(s, "12 3 hello")
    s = f.format(' '.join(['{n//%i}'%i for i in range(1,8)]), **ns)
    nt.assert_equal(s, "12 6 4 3 2 2 1")
    s = f.format('{[n//i for i in range(1,8)]}', **ns)
    nt.assert_equal(s, "[12, 6, 4, 3, 2, 2, 1]")
    s = f.format("{stuff!s}", **ns)
    nt.assert_equal(s, ns['stuff'])
    s = f.format("{stuff!r}", **ns)
    nt.assert_equal(s, repr(ns['stuff']))

    # Check with unicode:
    s = f.format("{u}", **ns)
    nt.assert_equal(s, ns['u'])
    # This decodes in a platform dependent manner, but it shouldn't error out
    s = f.format("{b}", **ns)

    nt.assert_raises(NameError, f.format, '{dne}', **ns)
项目:leetcode    作者:thomasyimgit    | 项目源码 | 文件源码
def test_get_py_filename():
    os.chdir(TMP_TEST_DIR)
    with make_tempfile('foo.py'):
        nt.assert_equal(path.get_py_filename('foo.py'), 'foo.py')
        nt.assert_equal(path.get_py_filename('foo'), 'foo.py')
    with make_tempfile('foo'):
        nt.assert_equal(path.get_py_filename('foo'), 'foo')
        nt.assert_raises(IOError, path.get_py_filename, 'foo.py')
    nt.assert_raises(IOError, path.get_py_filename, 'foo')
    nt.assert_raises(IOError, path.get_py_filename, 'foo.py')
    true_fn = 'foo with spaces.py'
    with make_tempfile(true_fn):
        nt.assert_equal(path.get_py_filename('foo with spaces'), true_fn)
        nt.assert_equal(path.get_py_filename('foo with spaces.py'), true_fn)
        nt.assert_raises(IOError, path.get_py_filename, '"foo with spaces.py"')
        nt.assert_raises(IOError, path.get_py_filename, "'foo with spaces.py'")