我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用nose.tools.assert_raises()。
def test_show_anon_private_dataset(self): user = factories.User() org = factories.Organization() dataset = factories.Dataset( owner_org=org['id'], resources=[factories.Resource()], private=True) context = { 'user': user['name'], 'model': model } assert_raises(t.NotAuthorized, call_auth, 'resource_validation_run', context=context, resource_id=dataset['resources'][0]['id'])
def test_validation_fails_on_upload(self, mock_open): invalid_file = StringIO.StringIO() invalid_file.write(INVALID_CSV) mock_upload = MockFieldStorage(invalid_file, 'invalid.csv') dataset = factories.Dataset() invalid_stream = io.BufferedReader(io.BytesIO(INVALID_CSV)) with mock.patch('io.open', return_value=invalid_stream): with assert_raises(t.ValidationError) as e: call_action( 'resource_create', package_id=dataset['id'], format='CSV', upload=mock_upload ) assert 'validation' in e.exception.error_dict assert 'missing-value' in str(e.exception) assert 'Row 2 has a missing value in column 4' in str(e.exception)
def test_boot_local_lun_001(mock_login, mock_query_dn, mock_order_clear): mock_login.return_value = True bp = LsbootPolicy("org-root", name="test") mock_query_dn.return_value = bp mock_order_clear.return_value = None # case1: # Add two local_lun without type devices = [ {"device_name": "local_lun", "device_order": "1", }, {"device_name": "local_lun", "device_order": "2", }, ] expected_error_message = "_local_lun_add failed, error: Instance of Local Lun already added at order '1'" with assert_raises(UcsOperationError) as error: boot_policy_order_set(handle, boot_policy_dn, devices) assert_equal(error.exception.message, expected_error_message)
def test_boot_local_lun_002(mock_login, mock_query_dn, mock_order_clear): mock_login.return_value = True bp = LsbootPolicy("org-root", name="test") mock_query_dn.return_value = bp mock_order_clear.return_value = None # case2: # Add first local_lun without type # Add next local_lun with type devices = [ {"device_name": "local_lun", "device_order": "1", }, {"device_name": "local_lun", "device_order": "2", "type": "primary", "lun_name": "test_primary" }, ] expected_error_message = "_local_lun_add failed, error: Instance of Local Lun already added at order '1'" with assert_raises(UcsOperationError) as error: boot_policy_order_set(handle, boot_policy_dn, devices) assert_equal(error.exception.message, expected_error_message)
def test_boot_local_lun_003(mock_login, mock_query_dn, mock_order_clear): mock_login.return_value = True bp = LsbootPolicy("org-root", name="test") mock_query_dn.return_value = bp mock_order_clear.return_value = None # case3: # Add first local_lun with type # Add next local_lun with type devices = [ {"device_name": "local_lun", "device_order": "1", "type": "primary", "lun_name": "primary" }, {"device_name": "local_lun", "device_order": "2", }, ] expected_error_message = "_local_lun_add failed, error: Required parameter 'lun_name' or 'type' missing." with assert_raises(UcsOperationError) as error: boot_policy_order_set(handle, boot_policy_dn, devices) assert_equal(error.exception.message, expected_error_message)
def test_boot_local_jbod_002(mock_login, mock_query_dn, mock_order_clear): mock_login.return_value = True bp = LsbootPolicy("org-root", name="test") mock_query_dn.return_value = bp mock_order_clear.return_value = None # local_jbod_case2 # Add local_jbod without slot_number devices = [ {"device_name": "local_jbod", "device_order": "1", }, ] expected_error_message = "_local_jbod_add() takes exactly 3 arguments (2 given)" with assert_raises(TypeError) as error: boot_policy_order_set(handle, boot_policy_dn, devices) assert_equal(error.exception.message, expected_error_message)
def test_boot_embedded_disk_001(mock_login, mock_query_dn, mock_order_clear): mock_login.return_value = True bp = LsbootPolicy("org-root", name="test") mock_query_dn.return_value = bp mock_order_clear.return_value = None # local_embedded_disk_case1: # Add two local_embedded_disk without type devices = [ {"device_name": "embedded_disk", "device_order": "1", }, {"device_name": "embedded_disk", "device_order": "2", }, ] expected_error_message = "_local_embedded_disk_add failed, error: Instance of Local Embedded Disk already added at order '1'" with assert_raises(UcsOperationError) as error: boot_policy_order_set(handle, boot_policy_dn, devices) assert_equal(error.exception.message, expected_error_message)
def test_boot_embedded_disk_002(mock_login, mock_query_dn, mock_order_clear): mock_login.return_value = True bp = LsbootPolicy("org-root", name="test") mock_query_dn.return_value = bp mock_order_clear.return_value = None # local_embedded_disk_case2: # Add first local_embedded_disk without type # Add next local_embedded_disk with type devices = [ {"device_name": "embedded_disk", "device_order": "1", }, {"device_name": "embedded_disk", "device_order": "2", "type": "primary", "slot_number": "1" }, ] expected_error_message = "_local_embedded_disk_add failed, error: Instance of Local Embedded Disk already added at order '1'" with assert_raises(UcsOperationError) as error: boot_policy_order_set(handle, boot_policy_dn, devices) assert_equal(error.exception.message, expected_error_message)
def test_boot_embedded_disk_003(mock_login, mock_query_dn, mock_order_clear): mock_login.return_value = True bp = LsbootPolicy("org-root", name="test") mock_query_dn.return_value = bp mock_order_clear.return_value = None # local_embedded_disk_case3: # Add first local_embedded_disk with type # Add next local_embedded_disk without type devices = [ {"device_name": "embedded_disk", "device_order": "1", "type": "primary", "slot_number": "1" }, {"device_name": "embedded_disk", "device_order": "2", }, ] expected_error_message = "_local_embedded_disk_add failed, error: Required parameter 'slot_number' or 'type' missing." with assert_raises(UcsOperationError) as error: boot_policy_order_set(handle, boot_policy_dn, devices) assert_equal(error.exception.message, expected_error_message)
def test_boot_lan_001(mock_login, mock_query_dn, mock_order_clear): mock_login.return_value = True bp = LsbootPolicy("org-root", name="test") mock_query_dn.return_value = bp mock_order_clear.return_value = None # lan_case1: # Add first lan without vnic_name and with type devices = [ {"device_name": "lan", "device_order": "1", }, ] expected_error_message = "_lan_device_add() takes exactly 3 arguments (2 given)" with assert_raises(TypeError) as error: boot_policy_order_set(handle, boot_policy_dn, devices) assert_equal(error.exception.message, expected_error_message)
def test_boot_iscsi_001(mock_login, mock_query_dn, mock_order_clear): mock_login.return_value = True bp = LsbootPolicy("org-root", name="test") mock_query_dn.return_value = bp mock_order_clear.return_value = None # iscsi_case1: # Add first iscsi without vnic_name and with type devices = [ {"device_name": "iscsi", "device_order": "1", "type": "primary", }, ] expected_error_message = "_iscsi_device_add() got an unexpected keyword argument 'type'" with assert_raises(TypeError) as error: boot_policy_order_set(handle, boot_policy_dn, devices) assert_equal(error.exception.message, expected_error_message)
def test_boot_iscsi_002(mock_login, mock_query_dn, mock_order_clear): mock_login.return_value = True bp = LsbootPolicy("org-root", name="test") mock_query_dn.return_value = bp mock_order_clear.return_value = None # iscsi_case2: # Add first iscsi without vnic_name and type devices = [ {"device_name": "iscsi", "device_order": "1", }, ] expected_error_message = "_iscsi_device_add() takes exactly 3 arguments (2 given)" with assert_raises(TypeError) as error: boot_policy_order_set(handle, boot_policy_dn, devices) assert_equal(error.exception.message, expected_error_message)
def test_boot_special_case_01(mock_login, mock_query_dn, mock_order_clear): mock_login.return_value = True bp = LsbootPolicy("org-root", name="test") mock_query_dn.return_value = bp mock_order_clear.return_value = None # special_case1: # Add first local_disk # Add another local_device devices = [ {"device_name": "local_disk", "device_order": "1", }, {"device_name": "sdcard", "device_order": "2", }, ] expected_error_message = "_device_add failed, error: local_disk cannot be added with other local devices." with assert_raises(UcsOperationError) as error: boot_policy_order_set(handle, boot_policy_dn, devices) assert_equal(error.exception.message, expected_error_message)
def test_boot_special_case_02(mock_login, mock_query_dn, mock_order_clear): mock_login.return_value = True bp = LsbootPolicy("org-root", name="test") mock_query_dn.return_value = bp mock_order_clear.return_value = None # special_case2: # Add first cd_dvd # Add another cd_dvd_local devices = [ {"device_name": "cd_dvd", "device_order": "1", }, {"device_name": "cd_dvd_local", "device_order": "2", }, ] expected_error_message = "_device_add failed, error: 'cd_dvd' or 'cd_dvd_local, cd_dvd_remote'" with assert_raises(UcsOperationError) as error: boot_policy_order_set(handle, boot_policy_dn, devices) assert_equal(error.exception.message, expected_error_message)
def test_boot_special_case_03(mock_login, mock_query_dn, mock_order_clear): mock_login.return_value = True bp = LsbootPolicy("org-root", name="test") mock_query_dn.return_value = bp mock_order_clear.return_value = None # special_case3: # Add first floppy # Add another floppy_local devices = [ {"device_name": "floppy", "device_order": "1", }, {"device_name": "floppy_local", "device_order": "2", }, ] expected_error_message = "_device_add failed, error: 'floppy' or 'floppy_local, floppy_remote'" with assert_raises(UcsOperationError) as error: boot_policy_order_set(handle, boot_policy_dn, devices) assert_equal(error.exception.message, expected_error_message)
def test_boot_special_case_05(mock_login, mock_query_dn, mock_order_clear): mock_login.return_value = True bp = LsbootPolicy("org-root", name="test") mock_query_dn.return_value = bp mock_order_clear.return_value = None # special_case4: # Add virtual_drive # Add virtual_drive devices = [ {"device_name": "virtual_drive", "device_order": "1", }, {"device_name": "virtual_drive", "device_order": "2", }, ] expected_error_message = "_vmedia_device_add failed, error: Device 'virtual_drive' already exist at order '1'" with assert_raises(UcsOperationError) as error: boot_policy_order_set(handle, boot_policy_dn, devices) assert_equal(error.exception.message, expected_error_message)
def test_cluster_proxy_pool(): with patch('django_nameko.rpc.ClusterRpcProxy') as FakeClusterRpcProxy: pool = rpc.ClusterRpcProxyPool(dict(), pool_size=2) pool.start() assert pool.queue.qsize() == 2 with pool.next() as client: assert pool.queue.qsize() == 1 client.foo.bar() assert call().start().foo.bar() in FakeClusterRpcProxy.mock_calls with pool.next(): assert pool.queue.qsize() == 0 tools.assert_raises(queue_six.Empty, pool.next, timeout=1) assert pool.queue.qsize() == 1 assert pool.queue.qsize() == 2 pool.stop()
def test_powerview_create_title_missing(self): '''Missing title raises ValidationError.''' sysadmin = Sysadmin() data_dict = self._make_create_data_dict() # remove title key del data_dict['title'] with nosetools.assert_raises(ValidationError) as cm: toolkit.get_action('powerview_create')( context={'user': sysadmin['name']}, data_dict=data_dict ) error_dict = cm.exception.error_dict['title'] nosetools.assert_true("Missing value" in error_dict, "Expected string not in exception message.")
def test_powerview_create_view_type_missing(self): '''Missing view_type raises ValidationError.''' sysadmin = Sysadmin() data_dict = self._make_create_data_dict() # remove title key del data_dict['view_type'] with nosetools.assert_raises(ValidationError) as cm: toolkit.get_action('powerview_create')( context={'user': sysadmin['name']}, data_dict=data_dict ) error_dict = cm.exception.error_dict['view_type'] nosetools.assert_true("Missing value" in error_dict, "Expected string not in exception message.")
def test_powerview_create_config_must_be_json(self): '''If present, config must be a json string.''' sysadmin = Sysadmin() data_dict = self._make_create_data_dict() # replace config with non-json string data_dict['config'] = "I'm not json." with nosetools.assert_raises(ValidationError) as cm: toolkit.get_action('powerview_create')( context={'user': sysadmin['name']}, data_dict=data_dict ) error_dict = cm.exception.error_dict['config'] nosetools.assert_true("Could not parse as valid JSON" in error_dict, "Expected string not in exception message.")
def test_powerview_update_no_id(self): '''Updating with a missing powerview id raises error.''' sysadmin = Sysadmin() powerview_dict = factories.PowerView() del powerview_dict['id'] with nosetools.assert_raises(ValidationError) as cm: toolkit.get_action('powerview_update')( context={'user': sysadmin['name']}, data_dict=powerview_dict ) error_dict = cm.exception.error_dict['id'] nosetools.assert_true("Missing value" in error_dict, "Expected string not in exception message.")
def test_powerview_update_nonexisting_id(self): '''Updating with a non-existing powerview id raises error.''' sysadmin = Sysadmin() powerview_dict = factories.PowerView() powerview_dict['id'] = 'non-existing-id' with nosetools.assert_raises(ValidationError) as cm: toolkit.get_action('powerview_update')( context={'user': sysadmin['name']}, data_dict=powerview_dict ) error_dict = cm.exception.error_dict['id'] nosetools.assert_true("Not found: PowerView" in error_dict, "Expected string not in exception message.")
def test_powerview_show_no_id(self): '''Calling powerview show with a missing id raises error.''' sysadmin = Sysadmin() factories.PowerView() with nosetools.assert_raises(ValidationError) as cm: toolkit.get_action('powerview_show')( context={'user': sysadmin['name']}, data_dict={} ) error_dict = cm.exception.error_dict['id'] nosetools.assert_true("Missing value" in error_dict, "Expected string not in exception message.")
def test_powerview_show_nonexisting_id(self): '''Calling powerview show with a non-existing id raises error.''' sysadmin = Sysadmin() factories.PowerView() with nosetools.assert_raises(ValidationError) as cm: toolkit.get_action('powerview_show')( context={'user': sysadmin['name']}, data_dict={'id': 'non-existin-id'} ) error_dict = cm.exception.error_dict['id'] nosetools.assert_true("Not found: PowerView" in error_dict, "Expected string not in exception message.")
def test_powerview_resource_list_nonexisting_id(self): '''Calling powerview resource list with a non-existing id raises error.''' sysadmin = Sysadmin() factories.PowerView() with nosetools.assert_raises(ValidationError) as cm: toolkit.get_action('powerview_resource_list')( context={'user': sysadmin['name']}, data_dict={'id': 'non-existin-id'} ) error_dict = cm.exception.error_dict['id'] nosetools.assert_true("Not found: PowerView" in error_dict, "Expected string not in exception message.")
def test_powerview_delete_no_id(self): '''Calling powerview delete with a missing id raises error.''' sysadmin = Sysadmin() factories.PowerView() with nosetools.assert_raises(ValidationError) as cm: toolkit.get_action('powerview_delete')( context={'user': sysadmin['name']}, data_dict={} ) error_dict = cm.exception.error_dict['id'] nosetools.assert_true("Missing value" in error_dict, "Expected string not in exception message.")
def test_powerview_delete_nonexisting_id(self): '''Calling powerview delete with a non-existing id raises error.''' sysadmin = Sysadmin() factories.PowerView() with nosetools.assert_raises(ValidationError) as cm: toolkit.get_action('powerview_delete')( context={'user': sysadmin['name']}, data_dict={'id': 'non-existin-id'} ) error_dict = cm.exception.error_dict['id'] nosetools.assert_true("Not found: PowerView" in error_dict, "Expected string not in exception message.")
def test_powerview_add_resource_nonexisting_resource_id(self): '''Calling powerview_add_resource with nonexisting resource id raises ValidationError.''' sysadmin = Sysadmin() powerview = factories.PowerView() with nosetools.assert_raises(ValidationError) as cm: toolkit.get_action('powerview_add_resource')( context={'user': sysadmin['name']}, data_dict={'id': powerview['id'], 'resource_id': 'non-existing-id'} ) error_dict = cm.exception.error_dict['resource_id'] nosetools.assert_true("Not found: Resource" in error_dict, "Expected string not in exception message.")
def test_powerview_remove_resource_nonexisting_resource_id(self): '''Calling powerview_remove_resource with nonexisting resource id raises ValidationError.''' sysadmin = Sysadmin() powerview = factories.PowerView() with nosetools.assert_raises(ValidationError) as cm: toolkit.get_action('powerview_remove_resource')( context={'user': sysadmin['name']}, data_dict={'id': powerview['id'], 'resource_id': 'non-existing-id'} ) error_dict = cm.exception.error_dict['resource_id'] nosetools.assert_true("Not found: Resource" in error_dict, "Expected string not in exception message.")
def test_powerview_create_normal_user_allow_create_private_resources(self): ''' Calling powerview create with normal user when allow_user_create is true, with unauthorized resources, raises NotAuthorized. ''' org = factories.Organization() dataset = factories.Dataset(owner_org=org['id'], private="true") r1 = factories.Resource(package_id=dataset['id']) r2 = factories.Resource(package_id=dataset['id']) a_user = factories.User() pv_dict = self._make_create_data_dict(resources=[r1['id'], r2['id']]) context = {'user': a_user['name'], 'model': model} nosetools.assert_raises(NotAuthorized, helpers.call_auth, 'ckanext_powerview_create', context=context, **pv_dict)
def test_powerview_add_resource_normal_user(self): ''' Calling powerview add resource for a normal user raises NotAuthorized. ''' a_user = factories.User() dataset = factories.Dataset() r1 = factories.Resource(package_id=dataset['id']) powerview = powerview_factories.PowerView(user=a_user) context = {'user': a_user['name'], 'model': model} nosetools.assert_raises(NotAuthorized, helpers.call_auth, 'ckanext_powerview_add_resource', context=context, powerview_id=powerview['id'], resource_id=r1['id'])
def test_powerview_add_resource_anon_user_allow_create(self): ''' Calling powerview add resource for an anon user still raises NotAuthorized. ''' dataset = factories.Dataset() r1 = factories.Resource(package_id=dataset['id']) powerview = powerview_factories.PowerView() context = {'user': '', 'model': model} nosetools.assert_raises(NotAuthorized, helpers.call_auth, 'ckanext_powerview_add_resource', context=context, powerview_id=powerview['id'], resource_id=r1['id'])
def test_powerview_remove_resource_normal_user(self): ''' Calling powerview remove resource for a normal user raises NotAuthorized. ''' a_user = factories.User() dataset = factories.Dataset() r1 = factories.Resource(package_id=dataset['id']) powerview = powerview_factories.PowerView(user=a_user, resources=[r1['id']]) context = {'user': a_user['name'], 'model': model} nosetools.assert_raises(NotAuthorized, helpers.call_auth, 'ckanext_powerview_remove_resource', context=context, powerview_id=powerview['id'], resource_id=r1['id'])
def test_powerview_remove_resource_anon_user(self): ''' Calling powerview remove resource for an anon user raises NotAuthorized. ''' dataset = factories.Dataset() r1 = factories.Resource(package_id=dataset['id']) powerview = powerview_factories.PowerView(resources=[r1['id']]) context = {'user': '', 'model': model} nosetools.assert_raises(NotAuthorized, helpers.call_auth, 'ckanext_powerview_remove_resource', context=context, powerview_id=powerview['id'], resource_id=r1['id'])
def test_powerview_remove_resource_anon_user_allow_create(self): ''' Calling powerview remove resource for an anon user still raises NotAuthorized. ''' dataset = factories.Dataset() r1 = factories.Resource(package_id=dataset['id']) powerview = powerview_factories.PowerView(resources=[r1['id']]) context = {'user': '', 'model': model} nosetools.assert_raises(NotAuthorized, helpers.call_auth, 'ckanext_powerview_remove_resource', context=context, powerview_id=powerview['id'], resource_id=r1['id'])
def test_powerview_resource_list_normal_user_private_resources(self): ''' Calling powerview resource list with a normal user on a powerview containing private resources raises NotAuthorized. ''' a_user = factories.User() org = factories.Organization() dataset = factories.Dataset(owner_org=org['id'], private="true") r1 = factories.Resource(package_id=dataset['id']) r2 = factories.Resource(package_id=dataset['id']) powerview = powerview_factories.PowerView(private='no', resources=[r1['id'], r2['id']]) context = {'user': a_user['name'], 'model': model} nosetools.assert_raises(NotAuthorized, helpers.call_auth, 'ckanext_powerview_resource_list', context=context, id=powerview['id'])
def test_powerview_show_with_private_resource_not_authorized(self): '''Calling powerview_show will raise NotAuthorized if powerview contains a private resource for user.''' org = Organization() user = User() dataset = Dataset(owner_org=org['id'], private="true") r1 = Resource(package_id=dataset['id']) r2 = Resource(package_id=dataset['id']) p1 = factories.PowerView(private=False, resources=[r1['id'], r2['id']]) powerview_show_action = toolkit.get_action('powerview_show') context = {'user': user['name']} nosetools.assert_raises(toolkit.NotAuthorized, powerview_show_action, context=context, data_dict={'id': p1['id']})
def test_powerview_remove_resource_unassociated_resource(self): '''Calling powerview_remove_resource with an unassociated resource id raises an error.''' sysadmin = Sysadmin() r1 = Resource() r2 = Resource() powerview_dict = factories.PowerView(resources=[r1['id']]) with nosetools.assert_raises(ValidationError): toolkit.get_action('powerview_remove_resource')( context={'user': sysadmin['name']}, data_dict={ 'id': powerview_dict['id'], 'resource_id': r2['id'] } )
def test_engine_execute_errors(self): # ensures that SQL errors are reported with assert_raises(ProgrammingError): with self.connection() as conn: conn.execute('SELECT * FROM a_wrong_table').fetchall() traces = self.tracer.writer.pop_traces() # trace composition eq_(len(traces), 1) eq_(len(traces[0]), 1) span = traces[0][0] # span fields eq_(span.name, '{}.query'.format(self.VENDOR)) eq_(span.service, self.SERVICE) eq_(span.resource, 'SELECT * FROM a_wrong_table') eq_(span.get_tag('sql.db'), self.SQL_DB) ok_(span.get_tag('sql.rows') is None) self.check_meta(span) eq_(span.span_type, 'sql') ok_(span.duration > 0) # check the error eq_(span.error, 1) eq_(span.get_tag('error.type'), 'mysql.connector.errors.ProgrammingError') ok_("Table 'test.a_wrong_table' doesn't exist" in span.get_tag('error.msg')) ok_("Table 'test.a_wrong_table' doesn't exist" in span.get_tag('error.stack'))
def test_engine_execute_errors(self): # ensures that SQL errors are reported with assert_raises(ProgrammingError): with self.connection() as conn: conn.execute('SELECT * FROM a_wrong_table').fetchall() traces = self.tracer.writer.pop_traces() # trace composition eq_(len(traces), 1) eq_(len(traces[0]), 1) span = traces[0][0] # span fields eq_(span.name, '{}.query'.format(self.VENDOR)) eq_(span.service, self.SERVICE) eq_(span.resource, 'SELECT * FROM a_wrong_table') eq_(span.get_tag('sql.db'), self.SQL_DB) ok_(span.get_tag('sql.rows') is None) self.check_meta(span) eq_(span.span_type, 'sql') ok_(span.duration > 0) # check the error eq_(span.error, 1) ok_('relation "a_wrong_table" does not exist' in span.get_tag('error.msg')) ok_('ProgrammingError' in span.get_tag('error.type')) ok_('ProgrammingError: relation "a_wrong_table" does not exist' in span.get_tag('error.stack'))
def test_engine_execute_errors(self): # ensures that SQL errors are reported with assert_raises(OperationalError): with self.connection() as conn: conn.execute('SELECT * FROM a_wrong_table').fetchall() traces = self.tracer.writer.pop_traces() # trace composition eq_(len(traces), 1) eq_(len(traces[0]), 1) span = traces[0][0] # span fields eq_(span.name, '{}.query'.format(self.VENDOR)) eq_(span.service, self.SERVICE) eq_(span.resource, 'SELECT * FROM a_wrong_table') eq_(span.get_tag('sql.db'), self.SQL_DB) ok_(span.get_tag('sql.rows') is None) eq_(span.span_type, 'sql') ok_(span.duration > 0) # check the error eq_(span.error, 1) eq_(span.get_tag('error.msg'), 'no such table: a_wrong_table') ok_('OperationalError' in span.get_tag('error.type')) ok_('OperationalError: no such table: a_wrong_table' in span.get_tag('error.stack'))
def test_wrapper_incr_safety(self): # get the default cache cache = caches['default'] # it should fail not because of our wrapper with assert_raises(ValueError) as ex: cache.incr('missing_key') # the error is not caused by our tracer eq_(ex.exception.args[0], "Key 'missing_key' not found") # an error trace must be sent spans = self.tracer.writer.pop() eq_(len(spans), 2) span = spans[0] eq_(span.resource, 'incr') eq_(span.name, 'django.cache') eq_(span.span_type, 'cache') eq_(span.error, 1)
def test_wrapper_decr_safety(self): # get the default cache cache = caches['default'] # it should fail not because of our wrapper with assert_raises(ValueError) as ex: cache.decr('missing_key') # the error is not caused by our tracer eq_(ex.exception.args[0], "Key 'missing_key' not found") # an error trace must be sent spans = self.tracer.writer.pop() eq_(len(spans), 3) span = spans[0] eq_(span.resource, 'decr') eq_(span.name, 'django.cache') eq_(span.span_type, 'cache') eq_(span.error, 1)
def test_template_renderer_exception(self): # it should trace the Template exceptions generation even outside web handlers t = template.Template('{% module ModuleThatDoesNotExist() %}') with assert_raises(NameError): t.generate() traces = self.tracer.writer.pop_traces() eq_(1, len(traces)) eq_(1, len(traces[0])) template_span = traces[0][0] eq_('tornado-web', template_span.service) eq_('tornado.template', template_span.name) eq_('template', template_span.span_type) eq_('render_string', template_span.resource) eq_('render_string', template_span.get_tag('tornado.template_name')) eq_(1, template_span.error) ok_('is not defined' in template_span.get_tag('error.msg')) ok_('NameError' in template_span.get_tag('error.stack'))
def test_resource_delete_editor(self): '''Normally organization admins can delete resources Our plugin prevents this by blocking delete organization. Ensure the delete button is not displayed (as only resource delete is checked for showing this) ''' user = factories.User() owner_org = factories.Organization( users=[{'name': user['id'], 'capacity': 'admin'}] ) dataset = factories.Dataset(owner_org=owner_org['id']) resource = factories.Resource(package_id=dataset['id']) with assert_raises(logic.NotAuthorized) as e: logic.check_access('resource_delete', {'user': user['name']}, {'id': resource['id']}) assert_equal(e.exception.message, 'User %s not authorized to delete resource %s' % (user['name'], resource['id']))
def test_remote_groups_only_local(self): # Create an existing group Group(id='group1-id', name='group1') config = {'remote_groups': 'only_local'} results_by_guid = run_harvest( url='http://localhost:%s' % mock_ckan.PORT, harvester=CKANHarvester(), config=json.dumps(config)) assert 'dataset1-id' in results_by_guid # Check that the dataset was added to the existing local group dataset = call_action('package_show', {}, id=mock_ckan.DATASETS[0]['id']) assert_equal(dataset['groups'][0]['id'], mock_ckan.DATASETS[0]['groups'][0]['id']) # Check that the other remote group was not created locally assert_raises(toolkit.ObjectNotFound, call_action, 'group_show', {}, id='remote-group')
def test_extension(): # Debugging information for failures of this test print('sys.path:') for p in sys.path: print(' ', p) print('CWD', os.getcwd()) nt.assert_raises(ImportError, _ip.magic, "load_ext daft_extension") daft_path = os.path.join(os.path.dirname(__file__), "daft_extension") sys.path.insert(0, daft_path) try: _ip.user_ns.pop('arq', None) invalidate_caches() # Clear import caches _ip.magic("load_ext daft_extension") nt.assert_equal(_ip.user_ns['arq'], 185) _ip.magic("unload_ext daft_extension") assert 'arq' not in _ip.user_ns finally: sys.path.remove(daft_path)
def test_pop_string(): f = PlainTextFormatter() type_str = '%s.%s' % (C.__module__, 'C') with nt.assert_raises(KeyError): f.pop(type_str) f.for_type(type_str, foo_printer) f.pop(type_str) with nt.assert_raises(KeyError): f.lookup_by_type(C) with nt.assert_raises(KeyError): f.pop(type_str) f.for_type(C, foo_printer) nt.assert_is(f.pop(type_str, None), foo_printer) with nt.assert_raises(KeyError): f.lookup_by_type(C) with nt.assert_raises(KeyError): f.pop(type_str) nt.assert_is(f.pop(type_str, None), None)
def eval_formatter_check(f): ns = dict(n=12, pi=math.pi, stuff='hello there', os=os, u=u"café", b="café") s = f.format("{n} {n//4} {stuff.split()[0]}", **ns) nt.assert_equal(s, "12 3 hello") s = f.format(' '.join(['{n//%i}'%i for i in range(1,8)]), **ns) nt.assert_equal(s, "12 6 4 3 2 2 1") s = f.format('{[n//i for i in range(1,8)]}', **ns) nt.assert_equal(s, "[12, 6, 4, 3, 2, 2, 1]") s = f.format("{stuff!s}", **ns) nt.assert_equal(s, ns['stuff']) s = f.format("{stuff!r}", **ns) nt.assert_equal(s, repr(ns['stuff'])) # Check with unicode: s = f.format("{u}", **ns) nt.assert_equal(s, ns['u']) # This decodes in a platform dependent manner, but it shouldn't error out s = f.format("{b}", **ns) nt.assert_raises(NameError, f.format, '{dne}', **ns)
def test_get_py_filename(): os.chdir(TMP_TEST_DIR) with make_tempfile('foo.py'): nt.assert_equal(path.get_py_filename('foo.py'), 'foo.py') nt.assert_equal(path.get_py_filename('foo'), 'foo.py') with make_tempfile('foo'): nt.assert_equal(path.get_py_filename('foo'), 'foo') nt.assert_raises(IOError, path.get_py_filename, 'foo.py') nt.assert_raises(IOError, path.get_py_filename, 'foo') nt.assert_raises(IOError, path.get_py_filename, 'foo.py') true_fn = 'foo with spaces.py' with make_tempfile(true_fn): nt.assert_equal(path.get_py_filename('foo with spaces'), true_fn) nt.assert_equal(path.get_py_filename('foo with spaces.py'), true_fn) nt.assert_raises(IOError, path.get_py_filename, '"foo with spaces.py"') nt.assert_raises(IOError, path.get_py_filename, "'foo with spaces.py'")