我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用retrying.retry()。
def update_heartbeat(): @retry( stop_max_delay=30000, # 30 seconds max wait_exponential_multiplier=100, # wait 2^i * 100 ms, on the i-th retry wait_exponential_max=1000, # but wait 1 second per try maximum wrap_exception=True ) def retry_fetch_fail_after_30sec(): return requests.post( config['webservice']['shifthelperHeartbeat'], auth=( config['webservice']['user'], config['webservice']['password'] ) ).json() try: return retry_fetch_fail_after_30sec() except RetryError as e: return {}
def _do_work(self, params, fn): pod_name = params.args.K8S_POD_NAME timeout = CONF.cni_daemon.vif_annotation_timeout # In case of KeyError retry for `timeout` s, wait 1 s between tries. @retrying.retry(stop_max_delay=(timeout * 1000), wait_fixed=1000, retry_on_exception=lambda e: isinstance(e, KeyError)) def find(): return self.registry[pod_name] try: d = find() pod = d['pod'] vif = base.VersionedObject.obj_from_primitive(d['vif']) except KeyError: raise exceptions.ResourceNotReady(pod_name) fn(vif, self._get_inst(pod), params.CNI_IFNAME, params.CNI_NETNS) return vif
def test_after_attempts(self): TestBeforeAfterAttempts._attempt_number = 0 def _after(attempt_number): TestBeforeAfterAttempts._attempt_number = attempt_number @retry(wait_fixed = 100, stop_max_attempt_number = 3, after_attempts = _after) def _test_after(): if TestBeforeAfterAttempts._attempt_number < 2: raise Exception("testing after_attempts handler") else: pass _test_after() self.assertTrue(TestBeforeAfterAttempts._attempt_number is 2)
def retry_boto_func(func, *args, retryable_error_codes=None, retryable_status_codes=None, retryable_exceptions=None, max_retries=5, retry_wait_time=2000, **kwargs): retriable = retry( stop_max_attempt_number=max_retries, wait_exponential_multiplier=retry_wait_time, retry_on_exception=lambda exc: isinstance(exc, Boto3RetryableException) )(_call_boto_func) return retriable(func, *args, retryable_status_codes=retryable_status_codes, retryable_exceptions=retryable_exceptions, retryable_codes=retryable_error_codes, **kwargs)
def _retry_api_check(exception): """Return True if we should retry. False otherwise. Args: exception: An exception to test for transience. Returns: True if we should retry. False otherwise. """ _print_error('Exception %s: %s' % (type(exception).__name__, str(exception))) if isinstance(exception, apiclient.errors.HttpError): if exception.resp.status in TRANSIENT_HTTP_ERROR_CODES: return True if isinstance(exception, socket.error): if exception.errno in TRANSIENT_SOCKET_ERROR_CODES: return True if isinstance(exception, HttpAccessTokenRefreshError): return True return False
def execute_sql(self, sql, params=None, require_commit=True): @retry(wait_exponential_multiplier=500, wait_exponential_max=10000, stop_max_attempt_number=10, retry_on_exception=self.retry_if_peewee_error) def execute(): try: cursor = super(RetryHarderOperationalError, self) \ .execute_sql(sql, params, require_commit) except (peewee.OperationalError, peewee.InterfaceError), error: print LOG.debug("Retrying after Peewee error: %s", error.message) if not self.is_closed(): self.close() with self.exception_wrapper(): cursor = self.get_cursor() cursor.execute(sql, params or ()) if require_commit and self.get_autocommit(): self.commit() return cursor return execute()
def await_build_ready(self, timeout=5000): """ Wait a specific amount of `timeout` for the blueprint build status to return no errors. The waiting polling interval is fixed at 1sec. Args: timeout (int): timeout to wait in milliseconds Returns: True: when the blueprint contains to build errors False: when the blueprint contains build errors, even after waiting `timeout` """ @retrying.retry(wait_fixed=1000, stop_max_delay=timeout) def wait_for_no_errors(): assert not self.build_errors # noinspection PyBroadException try: wait_for_no_errors() except: return False return True
def test_if_ucr_app_runs_in_new_pid_namespace(dcos_api_session): # We run a marathon app instead of a metronome job because metronome # doesn't support running docker images with the UCR. We need this # functionality in order to test that the pid namespace isolator # is functioning correctly. app, test_uuid = test_helpers.marathon_test_app(container_type=marathon.Container.MESOS) ps_output_file = 'ps_output' app['cmd'] = 'ps ax -o pid= > {}; sleep 1000'.format(ps_output_file) with dcos_api_session.marathon.deploy_and_cleanup(app, check_health=False): marathon_framework_id = dcos_api_session.marathon.get('/v2/info').json()['frameworkId'] app_task = dcos_api_session.marathon.get('/v2/apps/{}/tasks'.format(app['id'])).json()['tasks'][0] # There is a short delay between the `app_task` starting and it writing # its output to the `pd_output_file`. Because of this, we wait up to 10 # seconds for this file to appear before throwing an exception. @retrying.retry(wait_fixed=1000, stop_max_delay=10000) def get_ps_output(): return dcos_api_session.mesos_sandbox_file( app_task['slaveId'], marathon_framework_id, app_task['id'], ps_output_file) assert len(get_ps_output().split()) <= 4, 'UCR app has more than 4 processes running in its pid namespace'
def noparser(bot, update, tags, pages, chat_id, info=None): #Parser without retry loop (to prevent infinte exception) bot.sendChatAction(chat_id, "upload_photo") client = Pybooru('Yandere') randomint = randint(1000, 10000000) try: randompage = randint(1, int(pages)) posts = client.posts_list(tags=str(tags), limit=1, page=str(randompage)) for post in posts: urllib.request.urlretrieve(post['file_url'], "tmp/anime_bot_" + str(randomint) + ".jpg") tmp_data = "Uploader: " + post['author'] + "\nID: " + str(post['id']) globalarray[chat_id] = dict(data=tmp_data) photo = open('tmp/anime_bot_' + str(randomint) + ".jpg", 'rb') reply_markup = ikeyboard if info != None: bot.sendPhoto(chat_id, photo, reply_markup=reply_markup, caption=info + '\n' + tmp_data) os.remove('tmp/anime_bot_' + str(randomint) + ".jpg") else: bot.sendPhoto(chat_id, photo, reply_markup=reply_markup, caption=tmp_data) os.remove('tmp/anime_bot_' + str(randomint) + ".jpg") except Exception as e: print(e)
def visit_page( context: Context, actor_alias: str, page_name: str, *, first_time: bool = False): """Will visit specific page. NOTE: In order for the retry scheme to work properly you should have the webdriver' page load timeout set to value lower than the retry's `wait_fixed` timer, e.g `driver.set_page_load_timeout(time_to_wait=30)` """ if not get_actor(context, actor_alias): add_actor(context, unauthenticated_actor(actor_alias)) context.current_page = get_page_object(page_name) logging.debug( "%s will visit '%s' page using: '%s'", actor_alias, page_name, context.current_page.URL) context.current_page.visit(context.driver, first_time=first_time)
def call(f, exceptions=AssertionError, tries=STOP_MAX_ATTEMPT_NUMBER_DEFAULT): """Call a given function and treat it as eventually consistent. The function will be called immediately and retried with exponential backoff up to the listed amount of times. By default, it only retries on AssertionErrors, but can be told to retry on other errors. For example: @eventually_consistent.call def _(): results = client.query().fetch(10) assert len(results) == 10 """ __tracebackhide__ = True return retry( wait_exponential_multiplier=WAIT_EXPONENTIAL_MULTIPLIER, wait_exponential_max=WAIT_EXPONENTIAL_MAX_DEFAULT, stop_max_attempt_number=tries, retry_on_exception=_retry_on_exception(exceptions))(f)()
def test_django_request_trace(self): requests.get( BASE_URL, headers=self.headers_trace) @retry(wait_fixed=RETRY_WAIT_PERIOD, stop_max_attempt_number=RETRY_MAX_ATTEMPT) def test_with_retry(self): trace = self.client.get_trace(trace_id=self.trace_id) spans = trace.get('spans') self.assertEqual(trace.get('projectId'), PROJECT) self.assertEqual(trace.get('traceId'), str(self.trace_id)) self.assertEqual(len(spans), 1) for span in spans: labels = span.get('labels') self.assertEqual(labels.get('/http/status_code'), '200') test_with_retry(self)
def test_sqlalchemy_mysql_trace(self): requests.get( '{}sqlalchemy_mysql'.format(BASE_URL), headers=self.headers_trace) @retry(wait_fixed=RETRY_WAIT_PERIOD, stop_max_attempt_number=RETRY_MAX_ATTEMPT) def test_with_retry(self): trace = self.client.get_trace(trace_id=self.trace_id) spans = trace.get('spans') self.assertEqual(trace.get('projectId'), PROJECT) self.assertEqual(trace.get('traceId'), str(self.trace_id)) self.assertNotEqual(len(trace.get('spans')), 0) request_succeeded = False for span in spans: labels = span.get('labels') if '/http/status_code' in labels.keys(): self.assertEqual(labels.get('/http/status_code'), '200') request_succeeded = True self.assertTrue(request_succeeded) test_with_retry(self)
def test_flask_request_trace(self): requests.get( BASE_URL, headers=self.headers_trace) @retry(wait_fixed=RETRY_WAIT_PERIOD, stop_max_attempt_number=RETRY_MAX_ATTEMPT) def test_with_retry(self): trace = self.client.get_trace(trace_id=self.trace_id) spans = trace.get('spans') self.assertEqual(trace.get('projectId'), PROJECT) self.assertEqual(trace.get('traceId'), str(self.trace_id)) self.assertEqual(len(spans), 1) for span in spans: labels = span.get('labels') self.assertEqual(labels.get('/http/status_code'), '200') test_with_retry(self)
def test_sqlalchemy_mysql_trace(self): requests.get( '{}sqlalchemy-mysql'.format(BASE_URL), headers=self.headers_trace) @retry(wait_fixed=RETRY_WAIT_PERIOD, stop_max_attempt_number=RETRY_MAX_ATTEMPT) def test_with_retry(self): trace = self.client.get_trace(trace_id=self.trace_id) spans = trace.get('spans') self.assertEqual(trace.get('projectId'), PROJECT) self.assertEqual(trace.get('traceId'), str(self.trace_id)) self.assertNotEqual(len(spans), 0) request_succeeded = False for span in spans: labels = span.get('labels') if '/http/status_code' in labels.keys(): self.assertEqual(labels.get('/http/status_code'), '200') request_succeeded = True self.assertTrue(request_succeeded) test_with_retry(self)
def get_body_text(driver, exponential_multiplier=cfg_wait_exponential_multiplier, exponential_max=cfg_wait_exponential_max, stop_max_attempt=cfg_retry_stop_max_attempt): @retry( wait_exponential_multiplier=exponential_multiplier, wait_exponential_max=exponential_max, stop_max_attempt_number=stop_max_attempt) def _get_body_text(driver): try: e = wait_for_xpath_presence(driver, "//body") except StaleElementReferenceException: a_nice_refresh(driver) e = wait_for_xpath_presence(driver, "//*") raise StaleElementReferenceException return e.get_attribute("outerHTML") return _get_body_text(driver) # Subbornly clicks on the elements which run away from the DOM
def _ensure_cluster_status_set(t): m = t.talk_raw(CMD_INFO) logging.debug('Ask `info` Rsp %s', m) cluster_enabled = PAT_CLUSTER_ENABLED.findall(m) if len(cluster_enabled) == 0 or int(cluster_enabled[0]) == 0: raise hiredis.ProtocolError( 'Node %s:%d is not cluster enabled' % (t.host, t.port)) m = t.talk_raw(CMD_CLUSTER_INFO) logging.debug('Ask `cluster info` Rsp %s', m) cluster_state = PAT_CLUSTER_STATE.findall(m) cluster_slot_assigned = PAT_CLUSTER_SLOT_ASSIGNED.findall(m) if cluster_state[0] != 'ok' and int(cluster_slot_assigned[0]) == 0: raise hiredis.ProtocolError( 'Node %s:%d is not in a cluster' % (t.host, t.port)) # Redis instance responses to clients BEFORE changing its 'cluster_state' # just retry some times, it should become OK
def kibana(host): class Kibana(object): def __init__(self): self.url = 'http://localhost:5601' self.process = host.process.get(comm='node') self.image_flavor = config.getoption('--image-flavor') self.environment = dict( [line.split('=', 1) for line in self.stdout_of('env').split('\n')] ) @retry(**retry_settings) def get(self, location='/', allow_redirects=True): """GET a page from Kibana.""" url = urllib.parse.urljoin(self.url, location) return requests.get(url) def stdout_of(self, command): return host.run(command).stdout.strip() return Kibana()
def test_before_attempts(self): TestBeforeAfterAttempts._attempt_number = 0 def _before(attempt_number): TestBeforeAfterAttempts._attempt_number = attempt_number @retry(wait_fixed = 1000, stop_max_attempt_number = 1, before_attempts = _before) def _test_before(): pass _test_before() self.assertTrue(TestBeforeAfterAttempts._attempt_number is 1)
def retry_boto(max_retries=5, retry_wait_time=2000, retryable_error_codes=None, retryable_status_codes=None, retryable_exceptions=None): def decorator(func): @functools.wraps(func) @retry(stop_max_attempt_number=max_retries, wait_exponential_multiplier=retry_wait_time, retry_on_exception=lambda exc: isinstance(exc, Boto3RetryableException)) def wrapper(*args, **kwargs): return _call_boto_func(func, *args, retryable_status_codes=retryable_status_codes, retryable_exceptions=retryable_exceptions, retryable_codes=retryable_error_codes, **kwargs) return wrapper return decorator
def _retry_download_check(exception): """Return True if we should retry, False otherwise""" print_error('Exception during download: %s' % str(exception)) return isinstance(exception, oauth2client.client.HttpAccessTokenRefreshError) # Exponential backoff retrying downloads of GCS object chunks. # Maximum 23 retries. # Wait 1, 2, 4 ... 64, 64, 64... seconds.
def retry_on_429(exc): """ Used to trigger retry on rate limit """ return isinstance(exc, errors.APIRateLimitError)
def retry_if_rate_limit_error(exception): """Return True if we should retry (in this case when it's a rate_limit error), False otherwise""" return isinstance(exception, RateLimitException)
def retry_if_500_error(exception): """Allow retry if we get a 500 error from IAM API.""" logging.info("Received %s, retrying...", exception) return (isinstance(exception, errors.HttpError) and exception.resp.status >= 500 and exception.resp.status < 600)
def retry_if_io_error(exception): """Return True if IOError. Return True if we should retry (in this case when it's an IOError), False otherwise. """ print "Filesystem error, retrying in 2 seconds..." return isinstance(exception, IOError)
def get(self): @retrying.retry(wait_fixed=1000, stop_max_delay=5000) def get_status(): self.collection.digest() me = self.collection[self.name] assert me.value['status'] return me.value self.datum = get_status() return self
def update(self, device_keys): has_devices = self.get_devices() has_ids = set([dev['id'] for dev in has_devices]) should_ids = has_ids | set(device_keys) diff_ids = has_ids ^ should_ids if not diff_ids: return # nothing to add # need to append to what's already in the pool, # since this is a PUT action for new_id in diff_ids: has_devices.append(dict(id=new_id)) timeout = 3000 @retrying.retry(wait_fixed=1000, stop_max_delay=timeout) def put_updated(): got = self.api.requests.put( self.url, json=dict(display_name='Default Pool', devices=has_devices)) if not got.ok: raise SessionRqstError( message='unable to update approved list: %s' % got.text, resp=got) put_updated()
def retry_if_value_error(exception): """Return True if we should retry (in this case when it's an IOError), False otherwise""" return isinstance(exception, ValueError)
def create_volume(context): logging.debug("Creating volume") _create_volume = retry(wait_exponential_multiplier=1000, stop_max_delay=60000, retry_on_exception=retry_if_throttled)(context["ec2_connection"].create_volume) volume = _create_volume(context["ebs"]["size"], context["az"], volume_type=context["ebs"]["type"]) logging.info("Created volume {0}".format(volume.id)) wait_for_volume_state(volume, "available") return volume
def stop_if_result_none(result): '''stop if result none will return True if we should not retry when result is none, False otherwise using retrying python package ''' do_retry = result is not None return do_retry # Simple default retrying for calls to api
def vagrant_up_with_retry(vagrant_vm): """ vagrant up and retry on errorx """ cmd = 'vagrant up %s --no-provision' % vagrant_vm process = Popen(shlex.split(cmd), stdout=PIPE) process.communicate() exit_code = process.wait() return exit_code
def vagrant_run_with_retry(vagrant_vm, command): """ vagrant run and retry on errorx """ local('vagrant ssh %s -- %s' % (vagrant_vm, command))
def vagrant_halt_with_retry(vagrant_vm): """ vagrant halt and retry on errorx """ cmd = 'vagrant halt %s' % vagrant_vm process = Popen(shlex.split(cmd), stdout=PIPE) process.communicate() exit_code = process.wait() return exit_code
def vagrant_provision_with_retry(vagrant_vm): """ vagrant provision and retry on errorx """ cmd = 'vagrant provision %s' % vagrant_vm process = Popen(shlex.split(cmd), stdout=PIPE) process.communicate() exit_code = process.wait() return exit_code
def execute_and_assert(executor, storage=None): expected_value = 'value' successful_ctx = MockContext( storage, task_kwargs=dict(function=_get_function(mock_successful_task)) ) failing_ctx = MockContext( storage, task_kwargs=dict(function=_get_function(mock_failing_task)) ) task_with_inputs_ctx = MockContext( storage, task_kwargs=dict(function=_get_function(mock_task_with_input), arguments={'input': models.Argument.wrap('input', 'value')}) ) for ctx in [successful_ctx, failing_ctx, task_with_inputs_ctx]: executor.execute(ctx) @retrying.retry(stop_max_delay=10000, wait_fixed=100) def assertion(): assert successful_ctx.states == ['start', 'success'] assert failing_ctx.states == ['start', 'failure'] assert task_with_inputs_ctx.states == ['start', 'failure'] assert isinstance(failing_ctx.exception, MockException) assert isinstance(task_with_inputs_ctx.exception, MockException) assert task_with_inputs_ctx.exception.message == expected_value assertion()
def run(self, count): # Get unique number of available TCP ports on the system sshd_ports = [] for try_port in random.sample(range(10000, 11000), count): # If the port is already in use, skip it. while can_connect(try_port): try_port += 1 sshd_ports.append(try_port) # Run sshd servers in parallel, cleaning up when the yield returns. subprocesses = [] for port in sshd_ports: subprocesses.append(subprocess.Popen( ['/usr/sbin/sshd', '-p{}'.format(port), '-f{}'.format(self.sshd_config_path), '-e', '-D'], cwd=str(self.tmpdir))) # Wait for the ssh servers to come up @retry(stop_max_delay=1000, retry_on_result=lambda x: x is False) def check_server(port): return can_connect(port) for port in sshd_ports: check_server(port) yield sshd_ports # Stop all the subproceses. They are ephemeral temporary SSH connections, no point in being nice # with SIGTERM. for s in subprocesses: s.kill()
def test_if_dcos_history_service_is_getting_data(dcos_api_session): @retry(stop_max_delay=20000, wait_fixed=500) def check_up(): r = dcos_api_session.get('/dcos-history-service/history/last') assert r.status_code == 200 # Make sure some basic fields are present from state-summary which the DC/OS # UI relies upon. Their exact content could vary so don't test the value. json = r.json() assert {'cluster', 'frameworks', 'slaves', 'hostname'} <= json.keys() assert len(json["slaves"]) == len(dcos_api_session.all_slaves) check_up()
def test_10_change_plugins(self): """Validate that plugins get updated after a config change.""" plugins = "groovy greenballs" charm_name = self.spec.deployment.charm_name self.spec.deployment.configure(charm_name, {"plugins": plugins}) self.spec.deployment.sentry.wait() @retry(stop_max_attempt_number=10, wait_fixed=1000) def assert_plugins(): plugins = self.spec.plugins_list() self.assertIn("groovy", plugins, "Failed to locate groovy") self.assertIn("greenballs", plugins, "Failed to locate greenballs") assert_plugins()
def wait_for( self, timeout=10000, interval=1000, asserter=lambda x: x): """Wait for element till given condition Support: Android iOS Web(WebView) Args: timeout(int): How long we should be retrying stuff. interval(int): How long between retries. asserter(callable): The asserter func to determine the result. Returns: Return the Element. Raises: WebDriverException. """ if not callable(asserter): raise TypeError('Asserter must be callable.') @retry( retry_on_exception=lambda ex: isinstance(ex, WebDriverException), stop_max_delay=timeout, wait_fixed=interval ) def _wait_for(el): asserter(el) return el return _wait_for(self)
def wait_for_element( self, using, value, timeout=10000, interval=1000, asserter=is_displayed): """Wait for element till the given condition Support: Android iOS Web(WebView) Args: using(str): The element location strategy. value(str): The value of the location strategy. timeout(int): How long we should be retrying stuff. interval(int): How long between retries. asserter(callable): The asserter func to determine the result. Returns: Return the Element. Raises: WebDriverException. """ if not callable(asserter): raise TypeError('Asserter must be callable.') @retry( retry_on_exception=lambda ex: isinstance(ex, WebDriverException), stop_max_delay=timeout, wait_fixed=interval ) def _wait_for_element(ctx, using, value): el = ctx.element(using, value) asserter(el) return el return _wait_for_element(self, using, value)
def wait_for_elements( self, using, value, timeout=10000, interval=1000, asserter=is_displayed): """Wait for elements till the given condition Support: Android iOS Web(WebView) Args: using(str): The element location strategy. value(str): The value of the location strategy. timeout(int): How long we should be retrying stuff. interval(int): How long between retries. asserter(callable): The asserter func to determine the result. Returns: Return the list of Element if any of them satisfy the condition. Raises: WebDriverException. """ if not callable(asserter): raise TypeError('Asserter must be callable.') @retry( retry_on_exception=lambda ex: isinstance(ex, WebDriverException), stop_max_delay=timeout, wait_fixed=interval ) def _wait_for_elements(ctx, using, value): els = ctx.elements(using, value) if not len(els): raise WebDriverException('no such element') else: el = els[0] asserter(el) return els return _wait_for_elements(self, using, value)
def wait_for( self, timeout=10000, interval=1000, asserter=lambda x: x): """Wait for driver till satisfy the given condition Support: Android iOS Web(WebView) Args: timeout(int): How long we should be retrying stuff. interval(int): How long between retries. asserter(callable): The asserter func to determine the result. Returns: Return the driver. Raises: WebDriverException. """ if not callable(asserter): raise TypeError('Asserter must be callable.') @retry( retry_on_exception=lambda ex: isinstance(ex, WebDriverException), stop_max_delay=timeout, wait_fixed=interval ) def _wait_for(driver): asserter(driver) return driver return _wait_for(self)
def wait_for_element( self, using, value, timeout=10000, interval=1000, asserter=is_displayed): """Wait for element till satisfy the given condition Support: Android iOS Web(WebView) Args: using(str): The element location strategy. value(str): The value of the location strategy. timeout(int): How long we should be retrying stuff. interval(int): How long between retries. asserter(callable): The asserter func to determine the result. Returns: Return the Element. Raises: WebDriverException. """ if not callable(asserter): raise TypeError('Asserter must be callable.') @retry( retry_on_exception=lambda ex: isinstance(ex, WebDriverException), stop_max_delay=timeout, wait_fixed=interval ) def _wait_for_element(ctx, using, value): el = ctx.element(using, value) asserter(el) return el return _wait_for_element(self, using, value)
def get_geolocation(geocode_obj, loc): """ Wrapper function around geopy's geocode function. Used for retry, which will run it at most 3 times to get a non-error return value. It will not retry if it successfully returns a value. """ loc = geocode_obj.geocode(loc) return loc
def mark(f): """Marks an entire test as eventually consistent and retries.""" __tracebackhide__ = True return retry( wait_exponential_multiplier=WAIT_EXPONENTIAL_MULTIPLIER, wait_exponential_max=WAIT_EXPONENTIAL_MAX_DEFAULT, stop_max_attempt_number=STOP_MAX_ATTEMPT_NUMBER_DEFAULT, retry_on_exception=_retry_on_exception( (AssertionError, exceptions.GoogleCloudError)))(f)
def run_query(client, base_query, start_record, limit_to, verbose=False): """inline method to take advantage of retry""" if verbose: print("[start: %d limit: %d]" % (start_record, limit_to)) start = datetime.datetime.now() result = client.runSQL(query=base_query, start_record=start_record, limit_to=limit_to) end = datetime.datetime.now() if verbose: print("[%s - %s]" % (start, end)) return result
def read_session(function): ''' decorator that set the session variable to use inside a function. With that decorator it's possible to use the session variable like if a global variable session is declared. session is a sqlalchemy session, and you can get one calling get_session(). This is useful if only SELECTs and the like are being done; anything involving INSERTs, UPDATEs etc should use transactional_session. ''' @retry(retry_on_exception=retry_if_db_connection_error, wait_fixed=0.5, stop_max_attempt_number=2, wrap_exception=False) @wraps(function) def new_funct(*args, **kwargs): if isgeneratorfunction(function): raise RucioException('read_session decorator should not be used with generator. Use stream_session instead.') if not kwargs.get('session'): session = get_session() try: kwargs['session'] = session return function(*args, **kwargs) except TimeoutError, error: session.rollback() # pylint: disable=maybe-no-member raise DatabaseException(str(error)) except DatabaseError, error: session.rollback() # pylint: disable=maybe-no-member raise DatabaseException(str(error)) except: session.rollback() # pylint: disable=maybe-no-member raise finally: session.remove() try: return function(*args, **kwargs) except: raise new_funct.__doc__ = function.__doc__ return new_funct
def sensor_supports_humidity(self): """Determine if the sensor supports relative humidity measurements. :return: does the sensor support relative humidity measurements? """ return self.is_sensor_present and 'humid' in self.sensor_type.lower() # pylint: disable=no-self-argument # In order to use this method within the @retry decorator, this method # must be defined as such.