我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用warnings.simplefilter()。
def fail_if_not_removed(method): """Decorate a test method to track removal of deprecated code This decorator catches :class:`~deprecation.UnsupportedWarning` warnings that occur during testing and causes unittests to fail, making it easier to keep track of when code should be removed. :raises: :class:`AssertionError` if an :class:`~deprecation.UnsupportedWarning` is raised while running the test method. """ def _inner(*args, **kwargs): with warnings.catch_warnings(record=True) as caught_warnings: warnings.simplefilter("always") rv = method(*args, **kwargs) for warning in caught_warnings: if warning.category == UnsupportedWarning: raise AssertionError( ("%s uses a function that should be removed: %s" % (method, str(warning.message)))) return rv return _inner
def test_DeprecatedWarning_not_raised(self): ret_val = "lololol" class Test(object): @deprecation.deprecated(deprecated_in="2.0", removed_in="3.0", current_version="1.0") def method(self): """method docstring""" return ret_val with warnings.catch_warnings(record=True): # If a warning is raised it'll be an exception, so we'll fail. warnings.simplefilter("error") sot = Test() self.assertEqual(sot.method(), ret_val)
def cache_from_source(path, debug_override=None): """**DEPRECATED** Given the path to a .py file, return the path to its .pyc file. The .py file does not need to exist; this simply returns the path to the .pyc file calculated as if the .py file were imported. If debug_override is not None, then it must be a boolean and is used in place of sys.flags.optimize. If sys.implementation.cache_tag is None then NotImplementedError is raised. """ with warnings.catch_warnings(): warnings.simplefilter('ignore') return util.cache_from_source(path, debug_override)
def test_security_dates_warning(self): # Build an asset with an end_date eq_end = pd.Timestamp('2012-01-01', tz='UTC') equity_asset = Equity(1, symbol="TESTEQ", end_date=eq_end) # Catch all warnings with warnings.catch_warnings(record=True) as w: # Cause all warnings to always be triggered warnings.simplefilter("always") equity_asset.security_start_date equity_asset.security_end_date equity_asset.security_name # Verify the warning self.assertEqual(3, len(w)) for warning in w: self.assertTrue(issubclass(warning.category, DeprecationWarning))
def catch_warnings(): """Catch warnings in a with block in a list""" # make sure deprecation warnings are active in tests warnings.simplefilter('default', category=DeprecationWarning) filters = warnings.filters warnings.filters = filters[:] old_showwarning = warnings.showwarning log = [] def showwarning(message, category, filename, lineno, file=None, line=None): log.append(locals()) try: warnings.showwarning = showwarning yield log finally: warnings.filters = filters warnings.showwarning = old_showwarning
def test_warnings(self): """Are warnings printed at the right time?""" # Setting up a more complicated repository sub.call(["rm", "-rf", ".git"]) sub.call(["cp", "-R", "repo_nx.git", ".git"]) path = os.getcwd() nx_log = gitnet.get_log(path) # Note: Depending on the environment a warning may or may not be raised. For example, PyCharm uses UTF-8 # encoding and thus will not raised the unicode error. with warnings.catch_warnings(record=True) as w: # Ensure warnings are being shown warnings.simplefilter("always") # Trigger Warning nx_log.detect_dup_emails() # Check warning occurs self.assertTrue(len(w) == 0 or len(w) == 1)
def test_undecorated_coroutine(self): namespace = exec_test(globals(), locals(), """ class Test(AsyncTestCase): async def test_coro(self): pass """) test_class = namespace['Test'] test = test_class('test_coro') result = unittest.TestResult() # Silence "RuntimeWarning: coroutine 'test_coro' was never awaited". with warnings.catch_warnings(): warnings.simplefilter('ignore') test.run(result) self.assertEqual(len(result.errors), 1) self.assertIn("should be decorated", result.errors[0][1])
def twisted_coroutine_fetch(self, url, runner): body = [None] @gen.coroutine def f(): # This is simpler than the non-coroutine version, but it cheats # by reading the body in one blob instead of streaming it with # a Protocol. client = Agent(self.reactor) response = yield client.request(b'GET', utf8(url)) with warnings.catch_warnings(): # readBody has a buggy DeprecationWarning in Twisted 15.0: # https://twistedmatrix.com/trac/changeset/43379 warnings.simplefilter('ignore', category=DeprecationWarning) body[0] = yield readBody(response) self.stop_loop() self.io_loop.add_callback(f) runner() return body[0]
def execute(self): def showwarning(message, category, filename, fileno, file=None, line=None): msg = warnings.formatwarning(message, category, filename, fileno, line) self.log.warning(msg.strip()) with warnings.catch_warnings(): warnings.simplefilter("always") warnings.showwarning = showwarning try: return self.run() except SystemExit: raise except: self.log.critical(traceback.format_exc().strip()) sys.exit(1)
def configure_logging(logging_config, logging_settings): if not sys.warnoptions: # Route warnings through python logging logging.captureWarnings(True) # RemovedInNextVersionWarning is a subclass of DeprecationWarning which # is hidden by default, hence we force the "default" behavior warnings.simplefilter("default", RemovedInNextVersionWarning) if logging_config: # First find the logging configuration function ... logging_config_func = import_string(logging_config) logging.config.dictConfig(DEFAULT_LOGGING) # ... then invoke it with the logging settings if logging_settings: logging_config_func(logging_settings)
def guesscaption(simple=False): output = "" gf = guessformat() print("raw: ") for g in gf: print(guessline(g, True)) print() with warnings.catch_warnings(): warnings.simplefilter("ignore") for g in gf: gl = guessline(g, simple=simple) if not isgibber(gl): output += gl.replace(" !", "!").replace(" ?", "?") + "\n" return output
def __enter__(self): if self._entered: __tracebackhide__ = True raise RuntimeError("Cannot enter %r twice" % self) self._entered = True self._filters = self._module.filters self._module.filters = self._filters[:] self._showwarning = self._module.showwarning def showwarning(message, category, filename, lineno, file=None, line=None): self._list.append(RecordedWarning( message, category, filename, lineno, file, line)) # still perform old showwarning functionality self._showwarning( message, category, filename, lineno, file=file, line=line) self._module.showwarning = showwarning # allow the same warning to be raised more than once self._module.simplefilter('always') return self
def deprecated(func_replacement): """ Use this decorator on functions that are marked as deprecated. It takes a single argument of the function name it's being replaced with. """ def _deprecated(func): @functools.wraps(func) def new_func(*args, **kwargs): warnings.simplefilter('always', DeprecationWarning) #turn off filter warnings.warn( 'Call to deprecated function {}. Use new function: {}() instead.' .format(func.__name__, func_replacement), category=DeprecationWarning, stacklevel=2) warnings.simplefilter('default', DeprecationWarning) #reset filter return func(*args, **kwargs) return new_func return _deprecated
def __enter__(self): # let parent class archive filter state ret = super(reset_warnings, self).__enter__() # reset the filter to list everything if self._reset_filter: warnings.resetwarnings() warnings.simplefilter(self._reset_filter) # archive and clear the __warningregistry__ key for all modules # that match the 'reset' pattern. pattern = self._reset_registry if pattern: backup = self._orig_registry = {} for name, mod in list(sys.modules.items()): if mod is None or not pattern.match(name): continue reg = getattr(mod, "__warningregistry__", None) if reg: backup[name] = reg.copy() reg.clear() return ret
def get_credentials(flags): """Gets valid user credentials from storage. If nothing has been stored, or if the stored credentials are invalid, the OAuth2 flow is completed to obtain the new credentials. Returns: Credentials, the obtained credential. """ store = Storage(flags.credfile) with warnings.catch_warnings(): warnings.simplefilter("ignore") credentials = store.get() if not credentials or credentials.invalid: flow = client.OAuth2WebServerFlow(**CLIENT_CREDENTIAL) credentials = tools.run_flow(flow, store, flags) print('credential file saved at\n\t' + flags.credfile) return credentials
def testAssertWarnsCallable(self): def _runtime_warn(): warnings.warn("foo", RuntimeWarning) # Success when the right warning is triggered, even several times self.assertWarns(RuntimeWarning, _runtime_warn) self.assertWarns(RuntimeWarning, _runtime_warn) # A tuple of warning classes is accepted self.assertWarns((DeprecationWarning, RuntimeWarning), _runtime_warn) # *args and **kwargs also work self.assertWarns(RuntimeWarning, warnings.warn, "foo", category=RuntimeWarning) # Failure when no warning is triggered with self.assertRaises(self.failureException): self.assertWarns(RuntimeWarning, lambda: 0) # Failure when another warning is triggered with catch_warnings(): # Force default filter (in case tests are run with -We) warnings.simplefilter("default", RuntimeWarning) with self.assertRaises(self.failureException): self.assertWarns(DeprecationWarning, _runtime_warn) # Filters for other warnings are not modified with catch_warnings(): warnings.simplefilter("error", RuntimeWarning) with self.assertRaises(RuntimeWarning): self.assertWarns(DeprecationWarning, _runtime_warn)
def test_loadTestsFromModule__use_load_tests_other_bad_keyword(self): m = types.ModuleType('m') class MyTestCase(unittest.TestCase): def test(self): pass m.testcase_1 = MyTestCase load_tests_args = [] def load_tests(loader, tests, pattern): self.assertIsInstance(tests, unittest.TestSuite) load_tests_args.extend((loader, tests, pattern)) return tests m.load_tests = load_tests loader = unittest.TestLoader() with warnings.catch_warnings(): warnings.simplefilter('never') with self.assertRaises(TypeError) as cm: loader.loadTestsFromModule( m, use_load_tests=False, very_bad=True, worse=False) self.assertEqual(type(cm.exception), TypeError) # The error message names the first bad argument alphabetically, # however use_load_tests (which sorts first) is ignored. self.assertEqual( str(cm.exception), "loadTestsFromModule() got an unexpected keyword argument 'very_bad'")
def _build_program(self): code = "\n".join(itertools.chain(self.common_header.pieces, itertools.chain.from_iterable(cu.pieces for cu in self._compile_units))) program = pyopencl.Program(self.context, code) with warnings.catch_warnings(): # Intel OpenCL generates non empty output and that causes warnings # from pyopencl. We just silence them. warnings.simplefilter("ignore") return program.build(options=DEFAULT_COMPILER_OPTIONS) # Working around bug in pyopencl.Program.compile in pyopencl # (https://lists.tiker.net/pipermail/pyopencl/2015-September/001986.html) # TODO: Fix this in OpenCL # compiled_units = [unit.compile(self.context, self.common_header.pieces) # for unit in self._compile_units] # return pyopencl.link_program(self.context, compiled_units)
def test_bool_flat_indexing_invalid_nr_elements(self, level=rlevel): s = np.ones(10, dtype=float) x = np.array((15,), dtype=float) def ia(x, s, v): x[(s > 0)] = v # After removing deprecation, the following are ValueErrors. # This might seem odd as compared to the value error below. This # is due to the fact that the new code always uses "nonzero" logic # and the boolean special case is not taken. with warnings.catch_warnings(): warnings.simplefilter('ignore', DeprecationWarning) warnings.simplefilter('ignore', np.VisibleDeprecationWarning) self.assertRaises(IndexError, ia, x, s, np.zeros(9, dtype=float)) self.assertRaises(IndexError, ia, x, s, np.zeros(11, dtype=float)) # Old special case (different code path): self.assertRaises(ValueError, ia, x.flat, s, np.zeros(9, dtype=float)) self.assertRaises(ValueError, ia, x.flat, s, np.zeros(11, dtype=float))
def test_ticket_1539(self): dtypes = [x for x in np.typeDict.values() if (issubclass(x, np.number) and not issubclass(x, np.timedelta64))] a = np.array([], dtypes[0]) failures = [] # ignore complex warnings with warnings.catch_warnings(): warnings.simplefilter('ignore', np.ComplexWarning) for x in dtypes: b = a.astype(x) for y in dtypes: c = a.astype(y) try: np.dot(b, c) except TypeError: failures.append((x, y)) if failures: raise AssertionError("Failures: %r" % failures)
def test_warnings(self): # test warning code path with warnings.catch_warnings(record=True) as w: warnings.simplefilter("always") with np.errstate(all="warn"): np.divide(1, 0.) self.assertEqual(len(w), 1) self.assertTrue("divide by zero" in str(w[0].message)) np.array(1e300) * np.array(1e300) self.assertEqual(len(w), 2) self.assertTrue("overflow" in str(w[-1].message)) np.array(np.inf) - np.array(np.inf) self.assertEqual(len(w), 3) self.assertTrue("invalid value" in str(w[-1].message)) np.array(1e-300) * np.array(1e-300) self.assertEqual(len(w), 4) self.assertTrue("underflow" in str(w[-1].message))
def test_prepend_not_one(self): assign = self.assign s_ = np.s_ a = np.zeros(5) # Too large and not only ones. assert_raises(ValueError, assign, a, s_[...], np.ones((2, 1))) with warnings.catch_warnings(): # Will be a ValueError as well. warnings.simplefilter("error", DeprecationWarning) assert_raises(DeprecationWarning, assign, a, s_[[1, 2, 3],], np.ones((2, 1))) assert_raises(DeprecationWarning, assign, a, s_[[[1], [2]],], np.ones((2,2,1)))
def test_closing_fid(self): # Test that issue #1517 (too many opened files) remains closed # It might be a "weak" test since failed to get triggered on # e.g. Debian sid of 2012 Jul 05 but was reported to # trigger the failure on Ubuntu 10.04: # http://projects.scipy.org/numpy/ticket/1517#comment:2 with temppath(suffix='.npz') as tmp: np.savez(tmp, data='LOVELY LOAD') # We need to check if the garbage collector can properly close # numpy npz file returned by np.load when their reference count # goes to zero. Python 3 running in debug mode raises a # ResourceWarning when file closing is left to the garbage # collector, so we catch the warnings. Because ResourceWarning # is unknown in Python < 3.x, we take the easy way out and # catch all warnings. with warnings.catch_warnings(): warnings.simplefilter("ignore") for i in range(1, 1025): try: np.load(tmp)["data"] except Exception as e: msg = "Failed to load data from a file: %s" % e raise AssertionError(msg)
def test_ddof_too_big(self): nanfuncs = [np.nanvar, np.nanstd] stdfuncs = [np.var, np.std] dsize = [len(d) for d in _rdat] for nf, rf in zip(nanfuncs, stdfuncs): for ddof in range(5): with warnings.catch_warnings(record=True) as w: warnings.simplefilter('always') tgt = [ddof >= d for d in dsize] res = nf(_ndat, axis=1, ddof=ddof) assert_equal(np.isnan(res), tgt) if any(tgt): assert_(len(w) == 1) assert_(issubclass(w[0].category, RuntimeWarning)) else: assert_(len(w) == 0)
def test_allnans(self): mat = np.array([np.nan]*9).reshape(3, 3) for axis in [None, 0, 1]: with warnings.catch_warnings(record=True) as w: warnings.simplefilter('always') assert_(np.isnan(np.nanpercentile(mat, 60, axis=axis)).all()) if axis is None: assert_(len(w) == 1) else: assert_(len(w) == 3) assert_(issubclass(w[0].category, RuntimeWarning)) # Check scalar assert_(np.isnan(np.nanpercentile(np.nan, 60))) if axis is None: assert_(len(w) == 2) else: assert_(len(w) == 4) assert_(issubclass(w[0].category, RuntimeWarning))
def test_clear_and_catch_warnings(): # Initial state of module, no warnings my_mod = _get_fresh_mod() assert_equal(getattr(my_mod, '__warningregistry__', {}), {}) with clear_and_catch_warnings(modules=[my_mod]): warnings.simplefilter('ignore') warnings.warn('Some warning') assert_equal(my_mod.__warningregistry__, {}) # Without specified modules, don't clear warnings during context with clear_and_catch_warnings(): warnings.simplefilter('ignore') warnings.warn('Some warning') assert_warn_len_equal(my_mod, 1) # Confirm that specifying module keeps old warning, does not add new with clear_and_catch_warnings(modules=[my_mod]): warnings.simplefilter('ignore') warnings.warn('Another warning') assert_warn_len_equal(my_mod, 1) # Another warning, no module spec does add to warnings dict, except on # Python 3.4 (see comments in `assert_warn_len_equal`) with clear_and_catch_warnings(): warnings.simplefilter('ignore') warnings.warn('Another warning') assert_warn_len_equal(my_mod, 2)
def test_topython(self): # Tests some communication issues with Python. assert_equal(1, int(array(1))) assert_equal(1.0, float(array(1))) assert_equal(1, int(array([[[1]]]))) assert_equal(1.0, float(array([[1]]))) self.assertRaises(TypeError, float, array([1, 1])) with warnings.catch_warnings(): warnings.simplefilter('ignore', UserWarning) assert_(np.isnan(float(array([1], mask=[1])))) a = array([1, 2, 3], mask=[1, 0, 0]) self.assertRaises(TypeError, lambda:float(a)) assert_equal(float(a[-1]), 3.) self.assertTrue(np.isnan(float(a[0]))) self.assertRaises(TypeError, int, a) assert_equal(int(a[-1]), 3) self.assertRaises(MAError, lambda:int(a[0]))
def test_warning_raised(self): ret_val = "lololol" for test in [{"args": {}, # No args just means deprecated "warning": deprecation.DeprecatedWarning}, {"args": {"deprecated_in": "1.0", "current_version": "2.0"}, "warning": deprecation.DeprecatedWarning}, {"args": {"deprecated_in": "1.0", "removed_in": "2.0", "current_version": "2.0"}, "warning": deprecation.UnsupportedWarning}]: with self.subTest(**test): class Test(object): @deprecation.deprecated(**test["args"]) def method(self): return ret_val with warnings.catch_warnings(record=True) as caught_warnings: warnings.simplefilter("always") sot = Test() self.assertEqual(ret_val, sot.method()) self.assertEqual(len(caught_warnings), 1) self.assertEqual(caught_warnings[0].category, test["warning"])
def disable_warnings(category=exceptions.HTTPWarning): """ Helper for quickly disabling all urllib3 warnings. """ warnings.simplefilter('ignore', category)
def install_warning_logger(): # Enable our Deprecation Warnings warnings.simplefilter("default", PipDeprecationWarning, append=True) global _warnings_showwarning if _warnings_showwarning is None: _warnings_showwarning = warnings.showwarning warnings.showwarning = _showwarning
def main(argv): parser = build_cli_parser() opts, args = parser.parse_args(argv) if not opts.url or not opts.token or not opts.sensor: print "Missing required param; run with --help for usage" sys.exit(-1) cb = cbapi.CbApi(opts.url, token=opts.token, ssl_verify=opts.ssl_verify, ignore_system_proxy=True) with warnings.catch_warnings(): warnings.simplefilter("ignore") sensor = cb.sensor(opts.sensor) pprint.pprint(sensor)
def test_propertyInitialization(self): """ Tests for the correct initialization of 'property' kind properties based on whether command line is set, and overrides via launch(). """ # First, test with defaults comp = sb.launch('sdr/dom/components/property_init/property_init.spd.xml') self.assertFalse('initial' in comp.cmdline_args) self.assertFalse('cmdline' in comp.initialize_props) comp.releaseObject() # Test with overrides comp = sb.launch('sdr/dom/components/property_init/property_init.spd.xml', properties={'cmdline':'override', 'initial':'override'}) self.assertFalse('initial' in comp.cmdline_args) self.assertFalse('cmdline' in comp.initialize_props) self.assertEquals('override', comp.cmdline) self.assertEquals('override', comp.initial) comp.releaseObject() # Test with overrides in deprecated 'execparams' and 'configure' arguments with warnings.catch_warnings(): warnings.simplefilter('ignore', DeprecationWarning) comp = sb.launch('sdr/dom/components/property_init/property_init.spd.xml', execparams={'cmdline':'override'}, configure={'initial':'override'}) self.assertFalse('initial' in comp.cmdline_args) self.assertFalse('cmdline' in comp.initialize_props) self.assertEquals('override', comp.cmdline) self.assertEquals('override', comp.initial) comp.releaseObject() # Test with misplaced command line property in deprecated 'configure' argument comp = sb.launch('sdr/dom/components/property_init/property_init.spd.xml', configure={'cmdline':'override'}) self.assertFalse('initial' in comp.cmdline_args) self.assertFalse('cmdline' in comp.initialize_props) self.assertEquals('override', comp.cmdline) comp.releaseObject()
def test_slider_slice_warning(self): index = 3 kwargs = dict(grid_longitude=index) coords = ('time', 'grid_latitude') plot = Plot2D(self.cube, self.axes, coords=coords) plot.alias(wibble=2) wmsg = ("expected to be called with alias 'wibble' for dimension 2, " "rather than with 'grid_longitude'") with warnings.catch_warnings(): # Cause all warnings to raise an exception. warnings.simplefilter('error') with self.assertRaisesRegexp(UserWarning, wmsg): plot(**kwargs)
def test_auto_deltas_fail_warn(self): with warnings.catch_warnings(record=True) as ws: warnings.simplefilter('always') loader = BlazeLoader() expr = bz.data(self.df, dshape=self.dshape) from_blaze( expr, loader=loader, no_deltas_rule=no_deltas_rules.warn, missing_values=self.missing_values, ) self.assertEqual(len(ws), 1) w = ws[0].message self.assertIsInstance(w, NoDeltasWarning) self.assertIn(str(expr), str(w))
def setUp(self): # ``warnings='ignore'`` is used to suppress ``ResourceWarning``. # Another solution is to write the following lines when you call KNP. # >> knp.subprocess.process.stdout.close() # >> knp.juman.subprocess.process.stdout.close() warnings.simplefilter('ignore', ResourceWarning)
def __init__(self): warnings.simplefilter('always', category=DeprecationWarning) warnings.warn("data_pipeline.schema_cache.SchematizerClient is deprecated.", DeprecationWarning) self.schema_id_to_schema_map = {} self.schema_id_to_topic_map = {} self.base_to_transformed_schema_id_map = {} self.schema_id_to_pii_map = {}
def index_worker(self, queue, size=200): actions = [] indexed = 0 while True: item = queue.get() if item is None: break id_submission, analysis = item doc = { '_index': 'fcc-comments', '_type': 'document', '_op_type': 'update', '_id': id_submission, 'doc': {'analysis': analysis}, } actions.append(doc) if len(actions) == size: with warnings.catch_warnings(): warnings.simplefilter('ignore') try: response = bulk(self.es, actions) indexed += response[0] print('\tanalyzed %s/%s\t%s%%' % (indexed, self.limit, int(indexed / self.limit * 100))) actions = [] except ConnectionTimeout: print('error indexing: connection timeout') with warnings.catch_warnings(): warnings.simplefilter('ignore') response = bulk(self.es, actions) indexed += response[0] print('indexed %s' % (indexed))
def bulk_index(self, queue, size=20): actions = [] indexed = 0 ids = set() while True: item = queue.get() if item is None: break doc_id = item doc = { '_index': 'fcc-comments', '_type': 'document', '_op_type': 'update', '_id': doc_id, 'doc': {'analysis.sentiment_sig_terms_ordered': True}, } actions.append(doc) ids.add(doc_id) if len(actions) == size: with warnings.catch_warnings(): warnings.simplefilter('ignore') try: response = bulk(self.es, actions) indexed += response[0] if not indexed % 200: print('\tindexed %s/%s\t%s%%' % (indexed, self.limit, int(indexed / self.limit * 100))) actions = [] except ConnectionTimeout: print('error indexing: connection timeout') with warnings.catch_warnings(): warnings.simplefilter('ignore') response = bulk(self.es, actions) indexed += response[0] print('indexed %s' % (indexed)) ids = list(ids) #print('%s\n%s' % (len(ids), ' '.join(ids))
def make_table(self, *args, **kwargs): # type: (*Any, **Any) -> str with warnings.catch_warnings(): # PendingDeprecationWarning acknowledged. warnings.simplefilter("ignore") table = super(HtmlMultiDiff, self).make_table(*args, **kwargs) def unescape_zeroonetwo(m): return unichr(ord(m.group(1)) - ord('0')) table = re.sub('\x02([012])', unescape_zeroonetwo, table) return table
def get_random_giphy(phrase): """Return the URL of a random GIF related to the phrase, if possible""" with warnings.catch_warnings(): warnings.simplefilter('ignore') giphy = giphypop.Giphy() results = giphy.search_list(phrase=phrase, limit=100) if not results: raise ValueError('There were no results for that phrase') return random.choice(results).media_url