我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用nose.tools.raises()。
def test_invalid_dataset(): class WrongNumberOfArgsDataSource(FileDataSource): def collect_files(self): return ["dummy.txt"] def collect_features(self, path, this_is_not_needed): pass class WrongNumberOfCollectedFilesDataSource(FileDataSource): def collect_files(self): return ["dummy.txt"] * 1, ["dummy.txt"] * 2 def collect_features(self, path): pass def __test_wrong_num_args(): X = FileSourceDataset(WrongNumberOfArgsDataSource()) X[0] def __test_wrong_num_collected_files(): X = FileSourceDataset(WrongNumberOfCollectedFilesDataSource()) X[0] yield raises(TypeError)(__test_wrong_num_args) yield raises(RuntimeError)(__test_wrong_num_collected_files)
def test_voice_statistics_dummy(): data_source = voice_statistics.WavFileDataSource("dummy", speakers=["fujitou"]) @raises(ValueError) def __test_invalid_speaker(): data_source = voice_statistics.WavFileDataSource("dummy", speakers=["test"]) @raises(ValueError) def __test_invalid_emotion(): data_source = voice_statistics.WavFileDataSource( "dummy", speakers=["fujitou"], emotions="nnmnkwii") @raises(RuntimeError) def __test_nodir(data_source): data_source.collect_files() __test_invalid_speaker() __test_invalid_emotion() __test_nodir(data_source)
def test_data_watcher_once(self): update = threading.Event() data = [True] # Make it a non-existent path self.path += 'f' dwatcher = self.client.DataWatch(self.path) @dwatcher def changed(d, stat): data.pop() data.append(d) update.set() update.wait(10) eq_(data, [None]) update.clear() @raises(KazooException) def test_it(): @dwatcher def func(d, stat): data.pop() test_it()
def test_child_watcher_once(self): update = threading.Event() all_children = ['fred'] cwatch = self.client.ChildrenWatch(self.path) @cwatch def changed(children): while all_children: all_children.pop() all_children.extend(children) update.set() update.wait(10) eq_(all_children, []) update.clear() @raises(KazooException) def test_it(): @cwatch def changed_again(children): update.set() test_it()
def test_async_exception(self): @raises(IOError) def check_exc(r): r.get() def broken(): raise IOError("Failed") with start_stop_one() as handler: r = handler.async_result() w = handler.spawn(utils.wrap(r)(broken)) w.join() self.assertFalse(r.successful()) check_exc(r)
def test_scipy_lbfgsb(): sess = tf.Session() x = tf.Variable(np.float64(2), name='x') sess.run(tf.initialize_variables([x])) optimizer = ScipyLBFGSBOptimizer(verbose=True, session=sess) # With gradient results = optimizer.minimize([x], x**2, [2 * x]) assert results.success # Without gradient results = optimizer.minimize([x], x**2) assert results.success # Test callback def callback(xs): pass optimizer = ScipyLBFGSBOptimizer(verbose=True, session=sess, callback=callback) assert optimizer.minimize([x], x**2).success @raises(ValueError) def test_illegal_parameter_as_variable1(): optimizer.minimize([42], x**2) test_illegal_parameter_as_variable1() @raises(ValueError) def test_illegal_parameter_as_variable2(): optimizer.minimize(42, x**2) test_illegal_parameter_as_variable2()
def test_migrad(): sess = tf.Session() x = tf.Variable(np.float64(2), name='x') sess.run(tf.initialize_variables([x])) optimizer = MigradOptimizer(session=sess) # With gradient results = optimizer.minimize([x], x**2, [2 * x]) assert results.success # Without gradient results = optimizer.minimize([x], x**2) assert results.success @raises(ValueError) def test_illegal_parameter_as_variable1(): optimizer.minimize([42], x**2) test_illegal_parameter_as_variable1() @raises(ValueError) def test_illegal_parameter_as_variable2(): optimizer.minimize(42, x**2) test_illegal_parameter_as_variable2()
def test_init(): with tp.Model(): X1 = tp.Uniform(lower=-1, upper=1) X2 = tp.Uniform(lower=-1) X3 = tp.Uniform(upper=1) X4 = tp.Uniform() X7 = tp.Uniform(lower=X1, upper=X2) # @raises(ValueError) # def test_uniform_fail_lower(): # with tp.Model(): # X1 = tp.Uniform() # X2 = tp.Uniform(lower=X1) # @raises(ValueError) # def test_uniform_fail_upper(): # with tp.Model() as model: # X1 = tp.Uniform() # X2 = tp.Uniform(upper=X1)
def test_inconsistent_inner_fct(self): # Test that scan can detect inconsistencies in the inner graph and # raises an appropriate exception. The pickled file used in this test # relies on the cuda backend. # This test has not been extensively tested for Python 3 so it should # be skipped if python version is >=3 version = sys.version_info if version >= (3,): raise SkipTest("This test relies on a pickled file produced with " "Python 2. The current python version " "(%i.%i.%i.%i) is >= 3 so the test will be " "skipped." % (version.major, version.minor, version.micro, version.serial)) # When unpickled, the scan op should perform validation on its inner # graph, detect the inconsistencies and raise a TypeError folder = os.path.dirname(os.path.abspath(__file__)) path = os.path.join(folder, "inconsistent_scan.pkl") assert_raises(TypeError, pickle.load, open(path, "r"))
def test_formpack_version_cannot_have_name(self): vdata = copy(SINGLE_NOTE_SURVEY) vdata['name'] = "somename" FormPack(id_string="idstring", versions=[ vdata, ]) # TODO: remove this test of fix it # @raises(PyXFormError) # def test_xform(self): # fp = FormPack(title='test_fixture_title', # root_node_name='daata', # versions=[ # SINGLE_NOTE_SURVEY, # ]) # fp.versions[0].to_xml()
def test_csv_with_tag_headers(self): title, schemas, submissions = build_fixture('dietary_needs') fp = FormPack(schemas, title) options = {'versions': 'dietv1', 'tag_cols_for_header': ['hxl']} rows = list(fp.export(**options).to_csv(submissions)) assert rows[1] == (u'"#loc +name";"#indicator +diet";' u'"#indicator +diet";"#indicator +diet";' u'"#indicator +diet";"#indicator +diet"') # disabled for now # @raises(RuntimeError) # def test_csv_on_repeatable_groups(self): # title, schemas, submissions = build_fixture('grouped_repeatable') # fp = FormPack(schemas, title) # options = {'versions': 'rgv1'} # list(fp.export(**options).to_csv(submissions))
def test_dwm_init_fields_badlookup(): """ test Dwm class raises error with bad lookup type """ fields = { 'field1': { 'lookup': ['genericLookup', 'genericRegex', 'fieldSpecificRegex', 'fieldSpecificLookup', 'normLookup', 'badlookup'], 'derive': [ { 'type': 'deriveIncludes', 'fieldSet': ['field2'], 'options': [] } ] } } Dwm(name='test', mongo=DB, fields=fields)
def test_dwm_init_fields_badderive(): """ test Dwm class raises error with bad derive type """ fields = { 'field1': { 'lookup': ['genericLookup', 'genericRegex', 'fieldSpecificRegex', 'fieldSpecificLookup', 'normLookup', 'normIncludes'], 'derive': [ { 'type': 'badderive', 'fieldSet': ['field2'], 'options': [] } ] } } Dwm(name='test', mongo=DB, fields=fields)
def test_dwm_init_fields_badopt(): """ test Dwm class raises error with bad derive option type """ fields = { 'field1': { 'lookup': ['genericLookup', 'genericRegex', 'fieldSpecificRegex', 'fieldSpecificLookup', 'normLookup', 'normIncludes'], 'derive': [ { 'type': 'deriveIncludes', 'fieldSet': ['field2'], 'options': ['badoption'] } ] } } Dwm(name='test', mongo=DB, fields=fields) # Initialize with User-Defined Functions
def test_yql_object_one(self): """Test that invalid query raises AttributeError""" yqlobj.query = 1
def test_one(self): """Test that accessing one result raises exception""" yqlobj.one()
def test_hts_append(): lab_path = join(DATA_DIR, "BASIC5000_0001.lab") test_labels = hts.load(lab_path) print("\n{}".format(test_labels)) # should get same string representation labels = hts.HTSLabelFile() assert str(labels) == "" for label in test_labels: labels.append(label) assert str(test_labels) == str(labels) @raises(ValueError) def test_invalid_start_time(): l = hts.HTSLabelFile() l.append((100000, 0, "NG")) def test_succeeding_times(): l = hts.HTSLabelFile() l.append((0, 1000000, "OK")) l.append((1000000, 2000000, "OK")) @raises(ValueError) def test_non_succeeding_times(): l = hts.HTSLabelFile() l.append((0, 1000000, "OK")) l.append((1500000, 2000000, "NG")) test_invalid_start_time() test_succeeding_times() test_non_succeeding_times() # shouldn't raise RuntimeError
def test_empty_dataset(): class EmptyDataSource(FileDataSource): def collect_files(self): return [] def collect_features(self, path): pass X = FileSourceDataset(EmptyDataSource()) def __test_outof_range(X): print(X[0]) # Should raise IndexError yield raises(IndexError)(__test_outof_range), X
def test_asarray(): X, Y = _get_small_datasets(padded=False, duration=True) lengths = [len(x) for x in X] X, Y = _get_small_datasets( padded=True, duration=True, padded_length=np.max(lengths)) X_array = np.asarray(X) assert X_array.ndim == 3 assert np.allclose(X_array, X.asarray()) # Explicitly give padded length to actual max time length X, Y = _get_small_datasets(padded=False, duration=True) assert np.allclose(X_array, X.asarray(padded_length=np.max(lengths))) # Make sure that auto-guessing padded_length should get same result as # explicitly given max time length assert np.allclose(X_array, X.asarray(padded_length=None)) # Force triggering re-allocations assert np.allclose(X_array, X.asarray( padded_length=None, padded_length_guess=1)) def __test_very_small_padded_length(): X, Y = _get_small_datasets(padded=False, duration=True) X.asarray(padded_length=1) # Should raise `num frames exceeded` yield raises(RuntimeError)(__test_very_small_padded_length)
def test_sequence_wise_torch_data_loader(): import torch from torch.utils import data as data_utils X, Y = _get_small_datasets(padded=False) class TorchDataset(data_utils.Dataset): def __init__(self, X, Y): self.X = X self.Y = Y def __getitem__(self, idx): return torch.from_numpy(self.X[idx]), torch.from_numpy(self.Y[idx]) def __len__(self): return len(self.X) def __test(X, Y, batch_size): dataset = TorchDataset(X, Y) loader = data_utils.DataLoader( dataset, batch_size=batch_size, num_workers=1, shuffle=True) for idx, (x, y) in enumerate(loader): assert len(x.shape) == len(y.shape) assert len(x.shape) == 3 print(idx, x.shape, y.shape) # Test with batch_size = 1 yield __test, X, Y, 1 # Since we have variable length frames, batch size larger than 1 causes # runtime error. yield raises(RuntimeError)(__test), X, Y, 2 # For padded dataset, which can be reprensented by (N, T^max, D), batchsize # can be any number. X, Y = _get_small_datasets(padded=True) yield __test, X, Y, 1 yield __test, X, Y, 2
def test_ljspeech_dummy(): data_sources = [ljspeech.TranscriptionDataSource, ljspeech.NormalizedTranscriptionDataSource, ljspeech.WavFileDataSource] for data_source in data_sources: @raises(RuntimeError) def f(source): source("dummy") f(data_source)
def test_vcc2016_dummy(): data_source = vcc2016.WavFileDataSource("dummy", speakers=["SF1"]) @raises(ValueError) def __test_invalid_speaker(): data_source = vcc2016.WavFileDataSource("dummy", speakers=["test"]) @raises(RuntimeError) def __test_nodir(data_source): data_source.collect_files() __test_invalid_speaker() __test_nodir(data_source)
def test_jsut_dummy(): data_sources = [jsut.TranscriptionDataSource, jsut.WavFileDataSource] for data_source in data_sources: @raises(RuntimeError) def f(source): source("dummy") f(data_source)
def test_vctk_dummy(): assert len(vctk.available_speakers) == 108 data_sources = [vctk.TranscriptionDataSource, vctk.WavFileDataSource] for data_source in data_sources: @raises(RuntimeError) def f(source): source("dummy") f(data_source)
def test_invalid_duration_features(): phone_labels = hts.load(example_label_file(phone_level=True)) @raises(ValueError) def __test(labels, unit_size, feature_size): fe.duration_features(labels, unit_size=unit_size, feature_size=feature_size) yield __test, phone_labels, None, "frame"
def test_modspec_smoothing(): static_dim = 2 T = 64 np.random.seed(1234) y = np.random.rand(T, static_dim) modfs = 200 for log_domain in [True, False]: for norm in [None, "ortho"]: for n in [1024, 2048]: # Nyquist freq y_hat = P.modspec_smoothing(y, modfs, n=n, norm=norm, cutoff=modfs // 2, log_domain=log_domain) assert np.allclose(y, y_hat) # Smooth P.modspec_smoothing(y, modfs, n=n, norm=norm, cutoff=modfs // 4, log_domain=log_domain) # Cutoff frequency larger than modfs//2 @raises(ValueError) def __test_invalid_param(y, modfs): P.modspec_smoothing(y, modfs, n=2048, cutoff=modfs // 2 + 1) # FFT size should larger than time length @raises(RuntimeError) def __test_invalid_time_length(y, modfs): P.modspec_smoothing(y, modfs, n=32, cutoff=modfs // 2) __test_invalid_time_length(y, modfs) __test_invalid_param(y, modfs)
def test_send_verification_with_no_number(self): """ Test that send_verification with no number specified raises a ValueError """ self.service.send_verification('', self.request)
def test_create_url_with_no_key(self): """ Test that create_url with no key raises ValueError """ self.service.create_url(self.request, None)
def test_bad_watch_func2(self): counter = 0 @self.client.DataWatch(self.path) def changed(d, stat): if counter > 0: raise Exception("oops") raises(Exception)(changed) counter += 1 self.client.set(self.path, b'asdfasdf')
def test_bad_children_watch_func(self): counter = 0 @self.client.ChildrenWatch(self.path) def changed(children): if counter > 0: raise Exception("oops") raises(Exception)(changed) counter += 1 self.client.create(self.path + '/' + 'smith')
def test_exception(self): from kazoo.exceptions import NoNodeError watcher = self._makeOne(self.client, self.path, 0.1) result = watcher.start() @raises(NoNodeError) def testit(): result.get() testit()
def test_get_with_no_block(self): handler = eventlet_handler.SequentialEventletHandler() @raises(handler.timeout_exception) def test_no_block(r): r.get(block=False) with start_stop_one(handler): r = handler.async_result() test_no_block(r) r.set(1) self.assertEqual(1, r.get())
def test_bad_deserialization(self): async_object = self.client.handler.async_result() self.client._queue.append( (Delete(self.client.chroot, -1), async_object)) self.client._connection._write_sock.send(b'\0') @raises(ValueError) def testit(): async_object.get() testit()
def test_read_only(self): from kazoo.exceptions import NotReadOnlyCallError from kazoo.protocol.states import KeeperState client = self.client states = [] ev = threading.Event() @client.add_listener def listen(state): states.append(state) if client.client_state == KeeperState.CONNECTED_RO: ev.set() try: self.cluster[1].stop() self.cluster[2].stop() ev.wait(6) eq_(ev.is_set(), True) eq_(client.client_state, KeeperState.CONNECTED_RO) # Test read only command eq_(client.get_children('/'), []) # Test error with write command @raises(NotReadOnlyCallError) def testit(): client.create('/fred') testit() # Wait for a ping time.sleep(15) finally: client.remove_listener(listen) self.cluster[1].run() self.cluster[2].run()
def test_version_no_connection(self): @raises(ConnectionLoss) def testit(): self.client.server_version() self.client.stop() testit()
def test_bad_creates(self): args_list = [(True,), ('/smith', 0), ('/smith', b'', 'bleh'), ('/smith', b'', None, 'fred'), ('/smith', b'', None, True, 'fred')] @raises(TypeError) def testit(args): t = self.client.transaction() t.create(*args) for args in args_list: testit(args)
def test_bad_deletes(self): args_list = [(True,), ('/smith', 'woops'), ] @raises(TypeError) def testit(args): t = self.client.transaction() t.delete(*args) for args in args_list: testit(args)
def test_bad_sets(self): args_list = [(42, 52), ('/smith', False), ('/smith', b'', 'oops')] @raises(TypeError) def testit(args): t = self.client.transaction() t.set_data(*args) for args in args_list: testit(args)
def test_bad_checks(self): args_list = [(42, 52), ('/smith', 'oops')] @raises(TypeError) def testit(args): t = self.client.transaction() t.check(*args) for args in args_list: testit(args)
def test_bad_commit(self): t = self.client.transaction() @raises(ValueError) def testit(): t.commit() t.committed = True testit()
def test_exception_raising(self): h = self._makeOne() @raises(h.timeout_exception) def testit(): raise h.timeout_exception("This is a timeout") testit()
def test_get_with_nowait(self): mock_handler = mock.Mock() async = self._makeOne(mock_handler) timeout = self._makeHandler().timeout_exception @raises(timeout) def test_it(): async.get(block=False) test_it() @raises(timeout) def test_nowait(): async.get_nowait() test_nowait()
def test_DatasetBase(): ds = DatasetBase() nt.raises(NotImplementedError, ds.segments_info) nt.raises(NotImplementedError, ds.video_info)
def test_custom_encoding_text(self): test_file = self.test_dir / 'test_file.txt' with stor.open(test_file, mode='w', encoding='utf-16') as fp: fp.write(STRING_STRING) with stor.open(test_file, mode='r', encoding='utf-16') as fp: result = fp.read() assert result == STRING_STRING with pytest.raises(UnicodeDecodeError): with stor.open(test_file, mode='r', encoding='utf-8') as fp: result = fp.read()