我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用warnings.catch_warnings()。
def fail_if_not_removed(method): """Decorate a test method to track removal of deprecated code This decorator catches :class:`~deprecation.UnsupportedWarning` warnings that occur during testing and causes unittests to fail, making it easier to keep track of when code should be removed. :raises: :class:`AssertionError` if an :class:`~deprecation.UnsupportedWarning` is raised while running the test method. """ def _inner(*args, **kwargs): with warnings.catch_warnings(record=True) as caught_warnings: warnings.simplefilter("always") rv = method(*args, **kwargs) for warning in caught_warnings: if warning.category == UnsupportedWarning: raise AssertionError( ("%s uses a function that should be removed: %s" % (method, str(warning.message)))) return rv return _inner
def test_DeprecatedWarning_not_raised(self): ret_val = "lololol" class Test(object): @deprecation.deprecated(deprecated_in="2.0", removed_in="3.0", current_version="1.0") def method(self): """method docstring""" return ret_val with warnings.catch_warnings(record=True): # If a warning is raised it'll be an exception, so we'll fail. warnings.simplefilter("error") sot = Test() self.assertEqual(sot.method(), ret_val)
def cache_from_source(path, debug_override=None): """**DEPRECATED** Given the path to a .py file, return the path to its .pyc file. The .py file does not need to exist; this simply returns the path to the .pyc file calculated as if the .py file were imported. If debug_override is not None, then it must be a boolean and is used in place of sys.flags.optimize. If sys.implementation.cache_tag is None then NotImplementedError is raised. """ with warnings.catch_warnings(): warnings.simplefilter('ignore') return util.cache_from_source(path, debug_override)
def test_security_dates_warning(self): # Build an asset with an end_date eq_end = pd.Timestamp('2012-01-01', tz='UTC') equity_asset = Equity(1, symbol="TESTEQ", end_date=eq_end) # Catch all warnings with warnings.catch_warnings(record=True) as w: # Cause all warnings to always be triggered warnings.simplefilter("always") equity_asset.security_start_date equity_asset.security_end_date equity_asset.security_name # Verify the warning self.assertEqual(3, len(w)) for warning in w: self.assertTrue(issubclass(warning.category, DeprecationWarning))
def test_analogsignal_shortening_warning(self): nio = NeuralynxIO(self.sn, use_cache='never') with reset_warning_registry(): with warnings.catch_warnings(record=True) as w: seg = Segment('testsegment') nio.read_ncs(os.path.join(self.sn, 'Tet3a.ncs'), seg) self.assertGreater(len(w), 0) self.assertTrue(issubclass(w[0].category, UserWarning)) self.assertTrue('Analogsignalarray was shortened due to gap in' ' recorded data of file' in str(w[0].message)) # This class is copied from # 'http://bugs.python.org/file40031/reset_warning_registry.py' by Eli Collins # and is related to http://bugs.python.org/issue21724 Python<3.4
def test_callbacks_work_multiple_times(self): """ Tests that multiple executions of execute on a batch statement logs a warning, and that we don't encounter an attribute error. @since 3.1 @jira_ticket PYTHON-445 @expected_result warning message is logged @test_category object_mapper """ call_history = [] def my_callback(*args, **kwargs): call_history.append(args) with warnings.catch_warnings(record=True) as w: with BatchQuery() as batch: batch.add_callback(my_callback) batch.execute() batch.execute() self.assertEqual(len(w), 2) # package filter setup to warn always self.assertRegexpMatches(str(w[0].message), r"^Batch.*multiple.*")
def test_disable_multiple_callback_warning(self): """ Tests that multiple executions of a batch statement don't log a warning when warn_multiple_exec flag is set, and that we don't encounter an attribute error. @since 3.1 @jira_ticket PYTHON-445 @expected_result warning message is logged @test_category object_mapper """ call_history = [] def my_callback(*args, **kwargs): call_history.append(args) with patch('cassandra.cqlengine.query.BatchQuery.warn_multiple_exec', False): with warnings.catch_warnings(record=True) as w: with BatchQuery() as batch: batch.add_callback(my_callback) batch.execute() batch.execute() self.assertFalse(w)
def assert_warnings(fn, warning_msgs, regex=False): """Assert that each of the given warnings are emitted by fn.""" from .assertions import eq_ with warnings.catch_warnings(record=True) as log: # ensure that nothing is going into __warningregistry__ warnings.filterwarnings("always") result = fn() for warning in log: popwarn = warning_msgs.pop(0) if regex: assert re.match(popwarn, str(warning.message)) else: eq_(popwarn, str(warning.message)) return result
def test_warnings(self): """Are warnings printed at the right time?""" # Setting up a more complicated repository sub.call(["rm", "-rf", ".git"]) sub.call(["cp", "-R", "repo_nx.git", ".git"]) path = os.getcwd() nx_log = gitnet.get_log(path) # Note: Depending on the environment a warning may or may not be raised. For example, PyCharm uses UTF-8 # encoding and thus will not raised the unicode error. with warnings.catch_warnings(record=True) as w: # Ensure warnings are being shown warnings.simplefilter("always") # Trigger Warning nx_log.detect_dup_emails() # Check warning occurs self.assertTrue(len(w) == 0 or len(w) == 1)
def test_undecorated_coroutine(self): namespace = exec_test(globals(), locals(), """ class Test(AsyncTestCase): async def test_coro(self): pass """) test_class = namespace['Test'] test = test_class('test_coro') result = unittest.TestResult() # Silence "RuntimeWarning: coroutine 'test_coro' was never awaited". with warnings.catch_warnings(): warnings.simplefilter('ignore') test.run(result) self.assertEqual(len(result.errors), 1) self.assertIn("should be decorated", result.errors[0][1])
def twisted_coroutine_fetch(self, url, runner): body = [None] @gen.coroutine def f(): # This is simpler than the non-coroutine version, but it cheats # by reading the body in one blob instead of streaming it with # a Protocol. client = Agent(self.reactor) response = yield client.request(b'GET', utf8(url)) with warnings.catch_warnings(): # readBody has a buggy DeprecationWarning in Twisted 15.0: # https://twistedmatrix.com/trac/changeset/43379 warnings.simplefilter('ignore', category=DeprecationWarning) body[0] = yield readBody(response) self.stop_loop() self.io_loop.add_callback(f) runner() return body[0]
def test_callbacks_work_multiple_times(self): """ Tests that multiple executions of execute on a batch statement logs a warning, and that we don't encounter an attribute error. @since 3.1 @jira_ticket PYTHON-445 @expected_result warning message is logged @test_category object_mapper """ call_history = [] def my_callback(*args, **kwargs): call_history.append(args) with warnings.catch_warnings(record=True) as w: batch = Batch(self.conn) batch.add_callback(my_callback) batch.execute_batch() batch.execute_batch() self.assertEqual(len(w), 1) self.assertRegexpMatches(str(w[0].message), r"^Batch.*multiple.*")
def execute(self): def showwarning(message, category, filename, fileno, file=None, line=None): msg = warnings.formatwarning(message, category, filename, fileno, line) self.log.warning(msg.strip()) with warnings.catch_warnings(): warnings.simplefilter("always") warnings.showwarning = showwarning try: return self.run() except SystemExit: raise except: self.log.critical(traceback.format_exc().strip()) sys.exit(1)
def guesscaption(simple=False): output = "" gf = guessformat() print("raw: ") for g in gf: print(guessline(g, True)) print() with warnings.catch_warnings(): warnings.simplefilter("ignore") for g in gf: gl = guessline(g, simple=simple) if not isgibber(gl): output += gl.replace(" !", "!").replace(" ?", "?") + "\n" return output
def test_backwards_compat(self): with warnings.catch_warnings(record=True) as w: class NewProperty(properties.Property): info_text = 'new property' def info(self): return self.info_text assert len(w) == 2 assert issubclass(w[0].category, FutureWarning) np = NewProperty('') assert getattr(np, 'class_info', None) == 'new property' assert getattr(np, 'info', None) == 'new property'
def test_norm_hash_name(self): """norm_hash_name()""" from itertools import chain from passlib.crypto.digest import norm_hash_name, _known_hash_names # snapshot warning state, ignore unknown hash warnings ctx = warnings.catch_warnings() ctx.__enter__() self.addCleanup(ctx.__exit__) warnings.filterwarnings("ignore", '.*unknown hash') # test string types self.assertEqual(norm_hash_name(u("MD4")), "md4") self.assertEqual(norm_hash_name(b"MD4"), "md4") self.assertRaises(TypeError, norm_hash_name, None) # test selected results for row in chain(_known_hash_names, self.norm_hash_samples): for idx, format in enumerate(self.norm_hash_formats): correct = row[idx] for value in row: result = norm_hash_name(value, format) self.assertEqual(result, correct, "name=%r, format=%r:" % (value, format))
def test_bad_val_in_column_descriptions(): np.random.seed(0) df_titanic_train, df_titanic_test = utils.get_titanic_binary_classification_dataset() column_descriptions = { 'survived': 'output' , 'embarked': 'categorical' , 'pclass': 'categorical' , 'fare': 'this_is_a_bad_value' } with warnings.catch_warnings(record=True) as w: ml_predictor = Predictor(type_of_estimator='classifier', column_descriptions=column_descriptions) print('we should be throwing a warning for the user to give them useful feedback') assert len(w) == 1
def test_throws_warning_when_fl_data_equals_df_train(): df_titanic_train, df_titanic_test = utils.get_titanic_binary_classification_dataset() column_descriptions = { 'survived': 'output' , 'embarked': 'categorical' , 'pclass': 'categorical' } ml_predictor = Predictor(type_of_estimator='classifier', column_descriptions=column_descriptions) with warnings.catch_warnings(record=True) as w: try: ml_predictor.train(df_titanic_train, feature_learning=True, fl_data=df_titanic_train) except KeyError as e: pass # We should not be getting to this line- we should be throwing an error above for thing in w: print(thing) assert len(w) == 1
def test_remove_missing(): df = pd.DataFrame({'a': [1.0, np.NaN, 3, np.inf], 'b': [1, 2, 3, 4]}) df2 = pd.DataFrame({'a': [1.0, 3, np.inf], 'b': [1, 3, 4]}) df3 = pd.DataFrame({'a': [1.0, 3], 'b': [1, 3]}) with warnings.catch_warnings(record=True) as w: res = remove_missing(df, na_rm=True, vars=['b']) res.equals(df) res = remove_missing(df) res.equals(df2) res = remove_missing(df, na_rm=True, finite=True) res.equals(df3) assert len(w) == 1
def get_credentials(flags): """Gets valid user credentials from storage. If nothing has been stored, or if the stored credentials are invalid, the OAuth2 flow is completed to obtain the new credentials. Returns: Credentials, the obtained credential. """ store = Storage(flags.credfile) with warnings.catch_warnings(): warnings.simplefilter("ignore") credentials = store.get() if not credentials or credentials.invalid: flow = client.OAuth2WebServerFlow(**CLIENT_CREDENTIAL) credentials = tools.run_flow(flow, store, flags) print('credential file saved at\n\t' + flags.credfile) return credentials
def test_loadTestsFromModule__load_tests(self): m = types.ModuleType('m') class MyTestCase(unittest2.TestCase): def test(self): pass m.testcase_1 = MyTestCase load_tests_args = [] def load_tests(loader, tests, pattern): self.assertIsInstance(tests, unittest2.TestSuite) load_tests_args.extend((loader, tests, pattern)) return tests m.load_tests = load_tests loader = unittest2.TestLoader() suite = loader.loadTestsFromModule(m) self.assertIsInstance(suite, unittest2.TestSuite) self.assertEqual(load_tests_args, [loader, suite, None]) # With Python 3.5, the undocumented and unofficial use_load_tests is # ignored (and deprecated). load_tests_args = [] with warnings.catch_warnings(record=False): warnings.simplefilter('never') suite = loader.loadTestsFromModule(m, use_load_tests=False) self.assertEqual(load_tests_args, [loader, suite, None])
def test_loadTestsFromModule__use_load_tests_other_bad_keyword(self): m = types.ModuleType('m') class MyTestCase(unittest.TestCase): def test(self): pass m.testcase_1 = MyTestCase load_tests_args = [] def load_tests(loader, tests, pattern): self.assertIsInstance(tests, unittest.TestSuite) load_tests_args.extend((loader, tests, pattern)) return tests m.load_tests = load_tests loader = unittest.TestLoader() with warnings.catch_warnings(): warnings.simplefilter('never') with self.assertRaises(TypeError) as cm: loader.loadTestsFromModule( m, use_load_tests=False, very_bad=True, worse=False) self.assertEqual(type(cm.exception), TypeError) # The error message names the first bad argument alphabetically, # however use_load_tests (which sorts first) is ignored. self.assertEqual( str(cm.exception), "loadTestsFromModule() got an unexpected keyword argument 'very_bad'")
def _build_program(self): code = "\n".join(itertools.chain(self.common_header.pieces, itertools.chain.from_iterable(cu.pieces for cu in self._compile_units))) program = pyopencl.Program(self.context, code) with warnings.catch_warnings(): # Intel OpenCL generates non empty output and that causes warnings # from pyopencl. We just silence them. warnings.simplefilter("ignore") return program.build(options=DEFAULT_COMPILER_OPTIONS) # Working around bug in pyopencl.Program.compile in pyopencl # (https://lists.tiker.net/pipermail/pyopencl/2015-September/001986.html) # TODO: Fix this in OpenCL # compiled_units = [unit.compile(self.context, self.common_header.pieces) # for unit in self._compile_units] # return pyopencl.link_program(self.context, compiled_units)
def test_pipe_handle(self): h, _ = windows_utils.pipe(overlapped=(True, True)) _winapi.CloseHandle(_) p = windows_utils.PipeHandle(h) self.assertEqual(p.fileno(), h) self.assertEqual(p.handle, h) # check garbage collection of p closes handle with warnings.catch_warnings(): warnings.filterwarnings("ignore", "", ResourceWarning) del p support.gc_collect() try: _winapi.CloseHandle(h) except OSError as e: self.assertEqual(e.winerror, 6) # ERROR_INVALID_HANDLE else: raise RuntimeError('expected ERROR_INVALID_HANDLE')
def test_warning_raised(self): ret_val = "lololol" for test in [{"args": {}, # No args just means deprecated "warning": deprecation.DeprecatedWarning}, {"args": {"deprecated_in": "1.0", "current_version": "2.0"}, "warning": deprecation.DeprecatedWarning}, {"args": {"deprecated_in": "1.0", "removed_in": "2.0", "current_version": "2.0"}, "warning": deprecation.UnsupportedWarning}]: with self.subTest(**test): class Test(object): @deprecation.deprecated(**test["args"]) def method(self): return ret_val with warnings.catch_warnings(record=True) as caught_warnings: warnings.simplefilter("always") sot = Test() self.assertEqual(ret_val, sot.method()) self.assertEqual(len(caught_warnings), 1) self.assertEqual(caught_warnings[0].category, test["warning"])
def main(argv): parser = build_cli_parser() opts, args = parser.parse_args(argv) if not opts.url or not opts.token or not opts.sensor: print "Missing required param; run with --help for usage" sys.exit(-1) cb = cbapi.CbApi(opts.url, token=opts.token, ssl_verify=opts.ssl_verify, ignore_system_proxy=True) with warnings.catch_warnings(): warnings.simplefilter("ignore") sensor = cb.sensor(opts.sensor) pprint.pprint(sensor)
def __init__(self, period): # Quiet the warning about GTK Tooltip deprecation import warnings with warnings.catch_warnings(): warnings.filterwarnings('ignore', category=DeprecationWarning) self.figure = pyplot.figure() self.figure.canvas.set_window_title('REDHAWK Speedometer') # Create a line graph of throughput over time self.axes = self.figure.add_subplot(111) self.axes.set_xlabel('Time') self.axes.set_ylabel('Throughput (Bps)') self.x = [] self.y = [] self.line, = self.axes.plot(self.x, self.y) self.axes.set_xlim(xmax=period) self.figure.subplots_adjust(bottom=0.2) sliderax = self.figure.add_axes([0.2, 0.05, 0.6, 0.05]) min_size = 1024 max_size = 8*1024*1024 default_size = 1*1024*1024 self.slider = matplotlib.widgets.Slider(sliderax, 'Transfer size', min_size, max_size, default_size, '%.0f') self.figure.show()
def test_propertyInitialization(self): """ Tests for the correct initialization of 'property' kind properties based on whether command line is set, and overrides via launch(). """ # First, test with defaults comp = sb.launch('sdr/dom/components/property_init/property_init.spd.xml') self.assertFalse('initial' in comp.cmdline_args) self.assertFalse('cmdline' in comp.initialize_props) comp.releaseObject() # Test with overrides comp = sb.launch('sdr/dom/components/property_init/property_init.spd.xml', properties={'cmdline':'override', 'initial':'override'}) self.assertFalse('initial' in comp.cmdline_args) self.assertFalse('cmdline' in comp.initialize_props) self.assertEquals('override', comp.cmdline) self.assertEquals('override', comp.initial) comp.releaseObject() # Test with overrides in deprecated 'execparams' and 'configure' arguments with warnings.catch_warnings(): warnings.simplefilter('ignore', DeprecationWarning) comp = sb.launch('sdr/dom/components/property_init/property_init.spd.xml', execparams={'cmdline':'override'}, configure={'initial':'override'}) self.assertFalse('initial' in comp.cmdline_args) self.assertFalse('cmdline' in comp.initialize_props) self.assertEquals('override', comp.cmdline) self.assertEquals('override', comp.initial) comp.releaseObject() # Test with misplaced command line property in deprecated 'configure' argument comp = sb.launch('sdr/dom/components/property_init/property_init.spd.xml', configure={'cmdline':'override'}) self.assertFalse('initial' in comp.cmdline_args) self.assertFalse('cmdline' in comp.initialize_props) self.assertEquals('override', comp.cmdline) comp.releaseObject()
def test_ignore_nan(self): """ Test that NaNs are handled correctly """ stream = [np.random.random(size = (16,12)) for _ in range(5)] for s in stream: s[randint(0, 15), randint(0,11)] = np.nan with catch_warnings(): simplefilter('ignore') from_iaverage = last(iaverage(stream, ignore_nan = True)) from_numpy = np.nanmean(np.dstack(stream), axis = 2) self.assertTrue(np.allclose(from_iaverage, from_numpy))
def test_ddof(self): """ Test that the ddof parameter is equivalent to numpy's """ stream = [np.random.random((16, 7, 3)) for _ in range(10)] stack = np.stack(stream, axis = -1) with catch_warnings(): simplefilter('ignore') for axis in (0, 1, 2, None): for ddof in range(4): with self.subTest('axis = {}, ddof = {}'.format(axis, ddof)): from_numpy = np.var(stack, axis = axis, ddof = ddof) from_ivar = last(ivar(stream, axis = axis, ddof = ddof)) self.assertSequenceEqual(from_numpy.shape, from_ivar.shape) self.assertTrue(np.allclose(from_ivar, from_numpy))
def test_against_numpy_std(self): stream = [np.random.random((16, 7, 3)) for _ in range(10)] stack = np.stack(stream, axis = -1) with catch_warnings(): simplefilter('ignore') for axis in (0, 1, 2, None): for ddof in range(4): with self.subTest('axis = {}, ddof = {}'.format(axis, ddof)): from_numpy = np.std(stack, axis = axis, ddof = ddof) from_ivar = last(istd(stream, axis = axis, ddof = ddof)) self.assertSequenceEqual(from_numpy.shape, from_ivar.shape) self.assertTrue(np.allclose(from_ivar, from_numpy))
def test_slider_slice_warning(self): index = 3 kwargs = dict(grid_longitude=index) coords = ('time', 'grid_latitude') plot = Plot2D(self.cube, self.axes, coords=coords) plot.alias(wibble=2) wmsg = ("expected to be called with alias 'wibble' for dimension 2, " "rather than with 'grid_longitude'") with warnings.catch_warnings(): # Cause all warnings to raise an exception. warnings.simplefilter('error') with self.assertRaisesRegexp(UserWarning, wmsg): plot(**kwargs)
def test_auto_deltas_fail_warn(self): with warnings.catch_warnings(record=True) as ws: warnings.simplefilter('always') loader = BlazeLoader() expr = bz.data(self.df, dshape=self.dshape) from_blaze( expr, loader=loader, no_deltas_rule=no_deltas_rules.warn, missing_values=self.missing_values, ) self.assertEqual(len(ws), 1) w = ws[0].message self.assertIsInstance(w, NoDeltasWarning) self.assertIn(str(expr), str(w))
def test_warning_from_explicit_topic(self, valid_message_data): data_with_topic = self._make_message_data( valid_message_data, topic=str('explicit_topic') ) with warnings.catch_warnings(record=True) as w: self.message_class(**data_with_topic) assert len(w) == 1 assert "Passing in topics explicitly is deprecated." in w[0].message
def test_specify_contains_pii_triggers_warnings(self, valid_message_data): message_data = self._make_message_data( valid_message_data, contains_pii=False ) with warnings.catch_warnings(record=True) as warns: self.message_class(**message_data) assert len(warns) == 1 contains_pii_warning = warns[0] assert issubclass(contains_pii_warning.category, DeprecationWarning) assert ('contains_pii is deprecated. Please stop passing it in.' in contains_pii_warning.message)
def test_specify_keys_triggers_warnings(self, valid_message_data): message_data = self._make_message_data( valid_message_data, keys=(1, 2) ) with warnings.catch_warnings(record=True) as warns: self.message_class(**message_data) assert len(warns) == 1 keys_warning = warns[0] assert issubclass(keys_warning.category, DeprecationWarning) assert ('Passing in keys explicitly is deprecated.' in keys_warning.message)
def assertItemsEqual(self, expected_seq, actual_seq, msg=None): """An unordered sequence specific comparison. It asserts that actual_seq and expected_seq have the same element counts. Equivalent to:: self.assertEqual(Counter(iter(actual_seq)), Counter(iter(expected_seq))) Asserts that each element has the same count in both sequences. Example: - [0, 1, 1] and [1, 0, 1] compare equal. - [0, 0, 1] and [0, 1] compare unequal. """ first_seq, second_seq = list(actual_seq), list(expected_seq) with warnings.catch_warnings(): if sys.py3kwarning: # Silence Py3k warning raised during the sorting for _msg in ["(code|dict|type) inequality comparisons", "builtin_function_or_method order comparisons", "comparing unequal types"]: warnings.filterwarnings("ignore", _msg, DeprecationWarning) try: first = collections.Counter(first_seq) second = collections.Counter(second_seq) except TypeError: # Handle case with unhashable elements differences = _count_diff_all_purpose(first_seq, second_seq) else: if first == second: return differences = _count_diff_hashable(first_seq, second_seq) if differences: standardMsg = 'Element counts were not equal:\n' lines = ['First has %d, Second has %d: %r' % diff for diff in differences] diffMsg = '\n'.join(lines) standardMsg = self._truncateMessage(standardMsg, diffMsg) msg = self._formatMessage(msg, standardMsg) self.fail(msg)
def initiate_send(self): while self.producer_fifo and self.connected: first = self.producer_fifo[0] # handle empty string/buffer or None entry if not first: del self.producer_fifo[0] if first is None: self.handle_close() return # handle classic producer behavior obs = self.ac_out_buffer_size try: with catch_warnings(): if py3kwarning: filterwarnings("ignore", ".*buffer", DeprecationWarning) data = buffer(first, 0, obs) except TypeError: data = first.more() if data: self.producer_fifo.appendleft(data) else: del self.producer_fifo[0] continue # send the data try: num_sent = self.send(data) except socket.error: self.handle_error() return if num_sent: if num_sent < len(data) or obs < len(first): self.producer_fifo[0] = first[num_sent:] else: del self.producer_fifo[0] # we tried to send some actual data return
def _sorted(iterable): with warnings.catch_warnings(): if _sys.py3kwarning: warnings.filterwarnings("ignore", "comparing unequal types " "not supported", DeprecationWarning) return sorted(iterable)
def index_worker(self, queue, size=200): actions = [] indexed = 0 while True: item = queue.get() if item is None: break id_submission, analysis = item doc = { '_index': 'fcc-comments', '_type': 'document', '_op_type': 'update', '_id': id_submission, 'doc': {'analysis': analysis}, } actions.append(doc) if len(actions) == size: with warnings.catch_warnings(): warnings.simplefilter('ignore') try: response = bulk(self.es, actions) indexed += response[0] print('\tanalyzed %s/%s\t%s%%' % (indexed, self.limit, int(indexed / self.limit * 100))) actions = [] except ConnectionTimeout: print('error indexing: connection timeout') with warnings.catch_warnings(): warnings.simplefilter('ignore') response = bulk(self.es, actions) indexed += response[0] print('indexed %s' % (indexed))
def bulk_index(self, queue, size=20): actions = [] indexed = 0 ids = set() while True: item = queue.get() if item is None: break doc_id = item doc = { '_index': 'fcc-comments', '_type': 'document', '_op_type': 'update', '_id': doc_id, 'doc': {'analysis.sentiment_sig_terms_ordered': True}, } actions.append(doc) ids.add(doc_id) if len(actions) == size: with warnings.catch_warnings(): warnings.simplefilter('ignore') try: response = bulk(self.es, actions) indexed += response[0] if not indexed % 200: print('\tindexed %s/%s\t%s%%' % (indexed, self.limit, int(indexed / self.limit * 100))) actions = [] except ConnectionTimeout: print('error indexing: connection timeout') with warnings.catch_warnings(): warnings.simplefilter('ignore') response = bulk(self.es, actions) indexed += response[0] print('indexed %s' % (indexed)) ids = list(ids) #print('%s\n%s' % (len(ids), ' '.join(ids))
def make_table(self, *args, **kwargs): # type: (*Any, **Any) -> str with warnings.catch_warnings(): # PendingDeprecationWarning acknowledged. warnings.simplefilter("ignore") table = super(HtmlMultiDiff, self).make_table(*args, **kwargs) def unescape_zeroonetwo(m): return unichr(ord(m.group(1)) - ord('0')) table = re.sub('\x02([012])', unescape_zeroonetwo, table) return table
def get_random_giphy(phrase): """Return the URL of a random GIF related to the phrase, if possible""" with warnings.catch_warnings(): warnings.simplefilter('ignore') giphy = giphypop.Giphy() results = giphy.search_list(phrase=phrase, limit=100) if not results: raise ValueError('There were no results for that phrase') return random.choice(results).media_url
def test(): """ little test, NOT FOR REAL USE, put target filename as first and only argument will plot the pitch and strength v. time to screen safety is not guaranteed. """ # imports specific to this test import sys import warnings from scipy.io import wavfile import matplotlib.pyplot as plt from psola.experiment_config import ExperimentConfig # get the data and do the estimation filename = sys.argv[1] # filename is first command line arg cfg = ExperimentConfig() # use default settings with warnings.catch_warnings(): warnings.simplefilter("ignore") # ignore annoying WavFileWarning fs, data = wavfile.read(filename) pitch, t, strength = pitch_estimation(data, fs, cfg) # Plot estimated pitch and strength of pitch values f, (ax1, ax2) = plt.subplots(2, 1, sharex=True, figsize=(16,9)) ax1.plot(t, pitch); ax1.set_title('Pitch v. Time'); ax1.set_ylabel('Freq (Hz)') ax2.plot(t, strength); ax2.set_title('Strength v. Time'); ax1.set_ylabel('Strength') plt.show()