我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用warnings.filterwarnings()。
def emits_warning_on(db, *messages): """Mark a test as emitting a warning on a specific dialect. With no arguments, squelches all SAWarning failures. Or pass one or more strings; these will be matched to the root of the warning description by warnings.filterwarnings(). Note that emits_warning_on does **not** assert that the warnings were in fact seen. """ @decorator def decorate(fn, *args, **kw): with expect_warnings_on(db, assert_=False, *messages): return fn(*args, **kw) return decorate
def uses_deprecated(*messages): """Mark a test as immune from fatal deprecation warnings. With no arguments, squelches all SADeprecationWarning failures. Or pass one or more strings; these will be matched to the root of the warning description by warnings.filterwarnings(). As a special case, you may pass a function name prefixed with // and it will be re-written as needed to match the standard warning verbiage emitted by the sqlalchemy.util.deprecated decorator. Note that uses_deprecated does **not** assert that the warnings were in fact seen. """ @decorator def decorate(fn, *args, **kw): with expect_deprecated(*messages, assert_=False): return fn(*args, **kw) return decorate
def assert_warnings(fn, warning_msgs, regex=False): """Assert that each of the given warnings are emitted by fn.""" from .assertions import eq_ with warnings.catch_warnings(record=True) as log: # ensure that nothing is going into __warningregistry__ warnings.filterwarnings("always") result = fn() for warning in log: popwarn = warning_msgs.pop(0) if regex: assert re.match(popwarn, str(warning.message)) else: eq_(popwarn, str(warning.message)) return result
def run(): # pragma: no cover """Run Markdown from the command line.""" # Parse options and adjust logging level if necessary options, logging_level = parse_options() if not options: sys.exit(2) logger.setLevel(logging_level) console_handler = logging.StreamHandler() logger.addHandler(console_handler) if logging_level <= WARNING: # Ensure deprecation warnings get displayed warnings.filterwarnings('default') logging.captureWarnings(True) warn_logger = logging.getLogger('py.warnings') warn_logger.addHandler(console_handler) # Run markdown.markdownFromFile(**options)
def runWithWarningsSuppressed(suppressedWarnings, f, *a, **kw): """Run the function C{f}, but with some warnings suppressed. @param suppressedWarnings: A list of arguments to pass to filterwarnings. Must be a sequence of 2-tuples (args, kwargs). @param f: A callable, followed by its arguments and keyword arguments """ for args, kwargs in suppressedWarnings: warnings.filterwarnings(*args, **kwargs) addedFilters = warnings.filters[:len(suppressedWarnings)] try: result = f(*a, **kw) except: exc_info = sys.exc_info() _resetWarningFilters(None, addedFilters) raise exc_info[0], exc_info[1], exc_info[2] else: if isinstance(result, defer.Deferred): result.addBoth(_resetWarningFilters, addedFilters) else: _resetWarningFilters(None, addedFilters) return result
def test_norm_hash_name(self): """norm_hash_name()""" from itertools import chain from passlib.crypto.digest import norm_hash_name, _known_hash_names # snapshot warning state, ignore unknown hash warnings ctx = warnings.catch_warnings() ctx.__enter__() self.addCleanup(ctx.__exit__) warnings.filterwarnings("ignore", '.*unknown hash') # test string types self.assertEqual(norm_hash_name(u("MD4")), "md4") self.assertEqual(norm_hash_name(b"MD4"), "md4") self.assertRaises(TypeError, norm_hash_name, None) # test selected results for row in chain(_known_hash_names, self.norm_hash_samples): for idx, format in enumerate(self.norm_hash_formats): correct = row[idx] for value in row: result = norm_hash_name(value, format) self.assertEqual(result, correct, "name=%r, format=%r:" % (value, format))
def test_90_special(self): """test marker option & special behavior""" warnings.filterwarnings("ignore", "passing settings to .*.hash\(\) is deprecated") handler = self.handler # preserve hash if provided self.assertEqual(handler.genhash("stub", "!asd"), "!asd") # use marker if no hash self.assertEqual(handler.genhash("stub", ""), handler.default_marker) self.assertEqual(handler.hash("stub"), handler.default_marker) self.assertEqual(handler.using().default_marker, handler.default_marker) # custom marker self.assertEqual(handler.genhash("stub", "", marker="*xxx"), "*xxx") self.assertEqual(handler.hash("stub", marker="*xxx"), "*xxx") self.assertEqual(handler.using(marker="*xxx").hash("stub"), "*xxx") # reject invalid marker self.assertRaises(ValueError, handler.genhash, 'stub', "", marker='abc') self.assertRaises(ValueError, handler.hash, 'stub', marker='abc') self.assertRaises(ValueError, handler.using, marker='abc')
def test_15_min_verify_time(self): """test get_min_verify_time() method""" # silence deprecation warnings for min verify time warnings.filterwarnings("ignore", category=DeprecationWarning) pa = CryptPolicy() self.assertEqual(pa.get_min_verify_time(), 0) self.assertEqual(pa.get_min_verify_time('admin'), 0) pb = pa.replace(min_verify_time=.1) self.assertEqual(pb.get_min_verify_time(), 0) self.assertEqual(pb.get_min_verify_time('admin'), 0) #=================================================================== # serialization #===================================================================
def setUpWarnings(self): """helper to init warning filters before subclass setUp()""" if self.resetWarningState: ctx = reset_warnings() ctx.__enter__() self.addCleanup(ctx.__exit__) # ignore warnings about PasswordHash features deprecated in 1.7 # TODO: should be cleaned in 2.0, when support will be dropped. # should be kept until then, so we test the legacy paths. warnings.filterwarnings("ignore", r"the method .*\.(encrypt|genconfig|genhash)\(\) is deprecated") warnings.filterwarnings("ignore", r"the 'vary_rounds' option is deprecated") #--------------------------------------------------------------- # tweak message formatting so longMessage mode is only enabled # if msg ends with ":", and turn on longMessage by default. #---------------------------------------------------------------
def test_pipe_handle(self): h, _ = windows_utils.pipe(overlapped=(True, True)) _winapi.CloseHandle(_) p = windows_utils.PipeHandle(h) self.assertEqual(p.fileno(), h) self.assertEqual(p.handle, h) # check garbage collection of p closes handle with warnings.catch_warnings(): warnings.filterwarnings("ignore", "", ResourceWarning) del p support.gc_collect() try: _winapi.CloseHandle(h) except OSError as e: self.assertEqual(e.winerror, 6) # ERROR_INVALID_HANDLE else: raise RuntimeError('expected ERROR_INVALID_HANDLE')
def test_empty_tuple_index(self): # Empty tuple index creates a view a = np.array([1, 2, 3]) assert_equal(a[()], a) assert_(a[()].base is a) a = np.array(0) assert_(isinstance(a[()], np.int_)) # Regression, it needs to fall through integer and fancy indexing # cases, so need the with statement to ignore the non-integer error. with warnings.catch_warnings(): warnings.filterwarnings('ignore', '', DeprecationWarning) a = np.array([1.]) assert_(isinstance(a[0.], np.float_)) a = np.array([np.array(1)], dtype=object) assert_(isinstance(a[0.], np.ndarray))
def test_scalar_none_comparison(self): # Scalars should still just return False and not give a warnings. # The comparisons are flagged by pep8, ignore that. with warnings.catch_warnings(record=True) as w: warnings.filterwarnings('always', '', FutureWarning) assert_(not np.float32(1) == None) assert_(not np.str_('test') == None) # This is dubious (see below): assert_(not np.datetime64('NaT') == None) assert_(np.float32(1) != None) assert_(np.str_('test') != None) # This is dubious (see below): assert_(np.datetime64('NaT') != None) assert_(len(w) == 0) # For documentation purposes, this is why the datetime is dubious. # At the time of deprecation this was no behaviour change, but # it has to be considered when the deprecations are done. assert_(np.equal(np.datetime64('NaT'), None))
def test_identity_equality_mismatch(self): a = np.array([np.nan], dtype=object) with warnings.catch_warnings(): warnings.filterwarnings('always', '', FutureWarning) assert_warns(FutureWarning, np.equal, a, a) assert_warns(FutureWarning, np.not_equal, a, a) with warnings.catch_warnings(): warnings.filterwarnings('error', '', FutureWarning) assert_raises(FutureWarning, np.equal, a, a) assert_raises(FutureWarning, np.not_equal, a, a) # And the other do not warn: with np.errstate(invalid='ignore'): np.less(a, a) np.greater(a, a) np.less_equal(a, a) np.greater_equal(a, a)
def test_skip_footer_with_invalid(self): with warnings.catch_warnings(): warnings.filterwarnings("ignore") basestr = '1 1\n2 2\n3 3\n4 4\n5 \n6 \n7 \n' # Footer too small to get rid of all invalid values assert_raises(ValueError, np.genfromtxt, TextIO(basestr), skip_footer=1) # except ValueError: # pass a = np.genfromtxt( TextIO(basestr), skip_footer=1, invalid_raise=False) assert_equal(a, np.array([[1., 1.], [2., 2.], [3., 3.], [4., 4.]])) # a = np.genfromtxt(TextIO(basestr), skip_footer=3) assert_equal(a, np.array([[1., 1.], [2., 2.], [3., 3.], [4., 4.]])) # basestr = '1 1\n2 \n3 3\n4 4\n5 \n6 6\n7 7\n' a = np.genfromtxt( TextIO(basestr), skip_footer=1, invalid_raise=False) assert_equal(a, np.array([[1., 1.], [3., 3.], [4., 4.], [6., 6.]])) a = np.genfromtxt( TextIO(basestr), skip_footer=3, invalid_raise=False) assert_equal(a, np.array([[1., 1.], [3., 3.], [4., 4.]]))
def test_version_2_0(): f = BytesIO() # requires more than 2 byte for header dt = [(("%d" % i) * 100, float) for i in range(500)] d = np.ones(1000, dtype=dt) format.write_array(f, d, version=(2, 0)) with warnings.catch_warnings(record=True) as w: warnings.filterwarnings('always', '', UserWarning) format.write_array(f, d) assert_(w[0].category is UserWarning) f.seek(0) n = format.read_array(f) assert_array_equal(d, n) # 1.0 requested but data cannot be saved this way assert_raises(ValueError, format.write_array, f, d, (1, 0))
def test_version_2_0_memmap(): # requires more than 2 byte for header dt = [(("%d" % i) * 100, float) for i in range(500)] d = np.ones(1000, dtype=dt) tf = tempfile.mktemp('', 'mmap', dir=tempdir) # 1.0 requested but data cannot be saved this way assert_raises(ValueError, format.open_memmap, tf, mode='w+', dtype=d.dtype, shape=d.shape, version=(1, 0)) ma = format.open_memmap(tf, mode='w+', dtype=d.dtype, shape=d.shape, version=(2, 0)) ma[...] = d del ma with warnings.catch_warnings(record=True) as w: warnings.filterwarnings('always', '', UserWarning) ma = format.open_memmap(tf, mode='w+', dtype=d.dtype, shape=d.shape, version=None) assert_(w[0].category is UserWarning) ma[...] = d del ma ma = format.open_memmap(tf, mode='r') assert_array_equal(ma, d)
def test_basic(self): a = [1, 2, 3] assert_equal(insert(a, 0, 1), [1, 1, 2, 3]) assert_equal(insert(a, 3, 1), [1, 2, 3, 1]) assert_equal(insert(a, [1, 1, 1], [1, 2, 3]), [1, 1, 2, 3, 2, 3]) assert_equal(insert(a, 1, [1, 2, 3]), [1, 1, 2, 3, 2, 3]) assert_equal(insert(a, [1, -1, 3], 9), [1, 9, 2, 9, 3, 9]) assert_equal(insert(a, slice(-1, None, -1), 9), [9, 1, 9, 2, 9, 3]) assert_equal(insert(a, [-1, 1, 3], [7, 8, 9]), [1, 8, 2, 7, 3, 9]) b = np.array([0, 1], dtype=np.float64) assert_equal(insert(b, 0, b[0]), [0., 0., 1.]) assert_equal(insert(b, [], []), b) # Bools will be treated differently in the future: # assert_equal(insert(a, np.array([True]*4), 9), [9, 1, 9, 2, 9, 3, 9]) with warnings.catch_warnings(record=True) as w: warnings.filterwarnings('always', '', FutureWarning) assert_equal( insert(a, np.array([True] * 4), 9), [1, 9, 9, 9, 9, 2, 3]) assert_(w[0].category is FutureWarning)
def test_out_nan(self): with warnings.catch_warnings(record=True): warnings.filterwarnings('always', '', RuntimeWarning) o = np.zeros((4,)) d = np.ones((3, 4)) d[2, 1] = np.nan assert_equal(np.percentile(d, 0, 0, out=o), o) assert_equal( np.percentile(d, 0, 0, interpolation='nearest', out=o), o) o = np.zeros((3,)) assert_equal(np.percentile(d, 1, 1, out=o), o) assert_equal( np.percentile(d, 1, 1, interpolation='nearest', out=o), o) o = np.zeros(()) assert_equal(np.percentile(d, 1, out=o), o) assert_equal( np.percentile(d, 1, interpolation='nearest', out=o), o)
def test_basic(self): a0 = np.array(1) a1 = np.arange(2) a2 = np.arange(6).reshape(2, 3) assert_equal(np.median(a0), 1) assert_allclose(np.median(a1), 0.5) assert_allclose(np.median(a2), 2.5) assert_allclose(np.median(a2, axis=0), [1.5, 2.5, 3.5]) assert_equal(np.median(a2, axis=1), [1, 4]) assert_allclose(np.median(a2, axis=None), 2.5) a = np.array([0.0444502, 0.0463301, 0.141249, 0.0606775]) assert_almost_equal((a[1] + a[3]) / 2., np.median(a)) a = np.array([0.0463301, 0.0444502, 0.141249]) assert_equal(a[0], np.median(a)) a = np.array([0.0444502, 0.141249, 0.0463301]) assert_equal(a[-1], np.median(a)) # check array scalar result assert_equal(np.median(a).ndim, 0) a[1] = np.nan with warnings.catch_warnings(record=True) as w: warnings.filterwarnings('always', '', RuntimeWarning) assert_equal(np.median(a).ndim, 0) assert_(w[0].category is RuntimeWarning)
def test_inplace_addition_array_type(self): # Test of inplace additions for t in self.othertypes: with warnings.catch_warnings(record=True) as w: warnings.filterwarnings("always") (x, y, xm) = (_.astype(t) for _ in self.uint8data) m = xm.mask a = arange(10, dtype=t) a[-1] = masked x += a xm += a assert_equal(x, y + a) assert_equal(xm, y + a) assert_equal(xm.mask, mask_or(m, a.mask)) assert_equal(len(w), 0, "Failed on type=%s." % t)
def test_inplace_subtraction_array_type(self): # Test of inplace subtractions for t in self.othertypes: with warnings.catch_warnings(record=True) as w: warnings.filterwarnings("always") (x, y, xm) = (_.astype(t) for _ in self.uint8data) m = xm.mask a = arange(10, dtype=t) a[-1] = masked x -= a xm -= a assert_equal(x, y - a) assert_equal(xm, y - a) assert_equal(xm.mask, mask_or(m, a.mask)) assert_equal(len(w), 0, "Failed on type=%s." % t)
def test_inplace_multiplication_array_type(self): # Test of inplace multiplication for t in self.othertypes: with warnings.catch_warnings(record=True) as w: warnings.filterwarnings("always") (x, y, xm) = (_.astype(t) for _ in self.uint8data) m = xm.mask a = arange(10, dtype=t) a[-1] = masked x *= a xm *= a assert_equal(x, y * a) assert_equal(xm, y * a) assert_equal(xm.mask, mask_or(m, a.mask)) assert_equal(len(w), 0, "Failed on type=%s." % t)
def test_inplace_floor_division_array_type(self): # Test of inplace division for t in self.othertypes: with warnings.catch_warnings(record=True) as w: warnings.filterwarnings("always") (x, y, xm) = (_.astype(t) for _ in self.uint8data) m = xm.mask a = arange(10, dtype=t) a[-1] = masked x //= a xm //= a assert_equal(x, y // a) assert_equal(xm, y // a) assert_equal( xm.mask, mask_or(mask_or(m, a.mask), (a == t(0))) ) assert_equal(len(w), 0, "Failed on type=%s." % t)
def test_issue_3(self): """ undefined methods datetime_or_None, date_or_None """ conn = self.connections[0] c = conn.cursor() with warnings.catch_warnings(): warnings.filterwarnings("ignore") c.execute("drop table if exists issue3") c.execute("create table issue3 (d date, t time, dt datetime, ts timestamp)") try: c.execute("insert into issue3 (d, t, dt, ts) values (%s,%s,%s,%s)", (None, None, None, None)) c.execute("select d from issue3") self.assertEqual(None, c.fetchone()[0]) c.execute("select t from issue3") self.assertEqual(None, c.fetchone()[0]) c.execute("select dt from issue3") self.assertEqual(None, c.fetchone()[0]) c.execute("select ts from issue3") self.assertTrue(isinstance(c.fetchone()[0], datetime.datetime)) finally: c.execute("drop table issue3")
def test_issue_8(self): """ Primary Key and Index error when selecting data """ conn = self.connections[0] c = conn.cursor() with warnings.catch_warnings(): warnings.filterwarnings("ignore") c.execute("drop table if exists test") c.execute("""CREATE TABLE `test` (`station` int(10) NOT NULL DEFAULT '0', `dh` datetime NOT NULL DEFAULT '2015-01-01 00:00:00', `echeance` int(1) NOT NULL DEFAULT '0', `me` double DEFAULT NULL, `mo` double DEFAULT NULL, PRIMARY KEY (`station`,`dh`,`echeance`)) ENGINE=MyISAM DEFAULT CHARSET=latin1;""") try: self.assertEqual(0, c.execute("SELECT * FROM test")) c.execute("ALTER TABLE `test` ADD INDEX `idx_station` (`station`)") self.assertEqual(0, c.execute("SELECT * FROM test")) finally: c.execute("drop table test")
def test_issue_13(self): """ can't handle large result fields """ conn = self.connections[0] cur = conn.cursor() with warnings.catch_warnings(): warnings.filterwarnings("ignore") cur.execute("drop table if exists issue13") try: cur.execute("create table issue13 (t text)") # ticket says 18k size = 18*1024 cur.execute("insert into issue13 (t) values (%s)", ("x" * size,)) cur.execute("select t from issue13") # use assertTrue so that obscenely huge error messages don't print r = cur.fetchone()[0] self.assertTrue("x" * size == r) finally: cur.execute("drop table issue13")
def test_issue_17(self): """could not connect mysql use passwod""" conn = self.connections[0] host = self.databases[0]["host"] db = self.databases[0]["db"] c = conn.cursor() # grant access to a table to a user with a password try: with warnings.catch_warnings(): warnings.filterwarnings("ignore") c.execute("drop table if exists issue17") c.execute("create table issue17 (x varchar(32) primary key)") c.execute("insert into issue17 (x) values ('hello, world!')") c.execute("grant all privileges on %s.issue17 to 'issue17user'@'%%' identified by '1234'" % db) conn.commit() conn2 = pymysql.connect(host=host, user="issue17user", passwd="1234", db=db) c2 = conn2.cursor() c2.execute("select x from issue17") self.assertEqual("hello, world!", c2.fetchone()[0]) finally: c.execute("drop table issue17")
def disabled_test_issue_54(self): conn = self.connections[0] c = conn.cursor() with warnings.catch_warnings(): warnings.filterwarnings("ignore") c.execute("drop table if exists issue54") big_sql = "select * from issue54 where " big_sql += " and ".join("%d=%d" % (i,i) for i in range(0, 100000)) try: c.execute("create table issue54 (id integer primary key)") c.execute("insert into issue54 (id) values (7)") c.execute(big_sql) self.assertEqual(7, c.fetchone()[0]) finally: c.execute("drop table issue54")
def setUp(self): super(TestDictCursor, self).setUp() self.conn = conn = self.connections[0] c = conn.cursor(self.cursor_type) # create a table ane some data to query with warnings.catch_warnings(): warnings.filterwarnings("ignore") c.execute("drop table if exists dictcursor") # include in filterwarnings since for unbuffered dict cursor warning for lack of table # will only be propagated at start of next execute() call c.execute("""CREATE TABLE dictcursor (name char(20), age int , DOB datetime)""") data = [("bob", 21, "1990-02-06 23:04:56"), ("jim", 56, "1955-05-09 13:12:45"), ("fred", 100, "1911-09-12 01:01:01")] c.executemany("insert into dictcursor values (%s,%s,%s)", data)
def expect_warnings(*messages): """Context manager to expect warnings with the given messages.""" filters = [dict(action='ignore', category=sa_exc.SAPendingDeprecationWarning)] if not messages: filters.append(dict(action='ignore', category=sa_exc.SAWarning)) else: filters.extend(dict(action='ignore', message=message, category=sa_exc.SAWarning) for message in messages) for f in filters: warnings.filterwarnings(**f) try: yield finally: resetwarnings()
def emits_warning_on(db, *warnings): """Mark a test as emitting a warning on a specific dialect. With no arguments, squelches all SAWarning failures. Or pass one or more strings; these will be matched to the root of the warning description by warnings.filterwarnings(). """ spec = db_spec(db) @decorator def decorate(fn, *args, **kw): if isinstance(db, util.string_types): if not spec(config._current): return fn(*args, **kw) else: wrapped = emits_warning(*warnings)(fn) return wrapped(*args, **kw) else: if not _is_excluded(*db): return fn(*args, **kw) else: wrapped = emits_warning(*warnings)(fn) return wrapped(*args, **kw) return decorate
def uses_deprecated(*messages): """Mark a test as immune from fatal deprecation warnings. With no arguments, squelches all SADeprecationWarning failures. Or pass one or more strings; these will be matched to the root of the warning description by warnings.filterwarnings(). As a special case, you may pass a function name prefixed with // and it will be re-written as needed to match the standard warning verbiage emitted by the sqlalchemy.util.deprecated decorator. """ @decorator def decorate(fn, *args, **kw): with expect_deprecated(*messages): return fn(*args, **kw) return decorate
def fit(self, X, Y=None): import scipy.sparse import sklearn.decomposition self.preprocessor = sklearn.decomposition.KernelPCA( n_components=self.n_components, kernel=self.kernel, degree=self.degree, gamma=self.gamma, coef0=self.coef0, remove_zero_eig=True) if scipy.sparse.issparse(X): X = X.astype(np.float64) with warnings.catch_warnings(): warnings.filterwarnings("error") self.preprocessor.fit(X) # Raise an informative error message, equation is based ~line 249 in # kernel_pca.py in scikit-learn if len(self.preprocessor.alphas_ / self.preprocessor.lambdas_) == 0: raise ValueError('KernelPCA removed all features!') return self
def fit(self, X, Y=None): import sklearn.decomposition self.preprocessor = sklearn.decomposition.FastICA( n_components=self.n_components, algorithm=self.algorithm, fun=self.fun, whiten=self.whiten, random_state=self.random_state ) # Make the RuntimeWarning an Exception! with warnings.catch_warnings(): warnings.filterwarnings("error") try: self.preprocessor.fit(X) except ValueError as e: if 'array must not contain infs or NaNs' in e.args[0]: raise ValueError("Bug in scikit-learn: https://github.com/scikit-learn/scikit-learn/pull/2738") else: import traceback traceback.format_exc() raise ValueError() return self
def _mkstemp(*args, **kw): old_open = os.open try: # temporarily bypass sandboxing os.open = os_open return tempfile.mkstemp(*args, **kw) finally: # and then put it back os.open = old_open # Silence the PEP440Warning by default, so that end users don't get hit by it # randomly just because they use pkg_resources. We want to append the rule # because we want earlier uses of filterwarnings to take precedence over this # one.