我们从Python开源项目中,提取了以下17个代码示例,用于说明如何使用redis.WatchError()。
def test_aquire_lease_redis_available_lose_secondary(self, mock_settings, mock_time, mock_get_secondary_cache_source, mock_get_connection): mock_settings.ENDPOINTS = {} mock_settings.ELASTICACHE_ENDPOINTS = ELASTICACHE_ENDPOINTS_REDIS mock_time.time.return_value = 999. mock_get_secondary_cache_source.return_value = _get_test_arn(AWS.ELASTICACHE) mock_pipe = mock_get_connection.return_value.pipeline.return_value.__enter__.return_value mock_pipe.get.return_value = None mock_pipe.execute.side_effect = redis.WatchError ret = acquire_lease('a', 1, 1, primary=False) self.assertFalse(ret) mock_pipe.watch.assert_called_with('lease-a') mock_pipe.get.assert_called_with('lease-a') mock_pipe.multi.assert_called_with() mock_pipe.setex.assert_called_with('lease-a', 86400, '1:1:1299:1') mock_pipe.execute.assert_called_with()
def test_aquire_lease_redis_available_lose(self, mock_settings, mock_time, mock_get_primary_cache_source, mock_get_connection): mock_settings.ENDPOINTS = {} mock_settings.ELASTICACHE_ENDPOINTS = ELASTICACHE_ENDPOINTS_REDIS mock_time.time.return_value = 999. mock_get_primary_cache_source.return_value = _get_test_arn(AWS.ELASTICACHE) mock_pipe = mock_get_connection.return_value.pipeline.return_value.__enter__.return_value mock_pipe.get.return_value = None mock_pipe.execute.side_effect = redis.WatchError ret = acquire_lease('a', 1, 1) self.assertFalse(ret) mock_pipe.watch.assert_called_with('lease-a') mock_pipe.get.assert_called_with('lease-a') mock_pipe.multi.assert_called_with() mock_pipe.setex.assert_called_with('lease-a', 86400, '1:1:1299:1') mock_pipe.execute.assert_called_with()
def test_aquire_lease_redis_leased_expired_lose(self, mock_settings, mock_time, mock_get_primary_cache_source, mock_get_connection): mock_settings.ENDPOINTS = {} mock_settings.ELASTICACHE_ENDPOINTS = ELASTICACHE_ENDPOINTS_REDIS mock_time.time.return_value = 999. mock_get_primary_cache_source.return_value = _get_test_arn(AWS.ELASTICACHE) mock_pipe = mock_get_connection.return_value.pipeline.return_value.__enter__.return_value mock_pipe.get.return_value = '99:99:0:99' mock_pipe.execute.side_effect = redis.WatchError ret = acquire_lease('a', 1, 1) self.assertFalse(ret) mock_pipe.watch.assert_called_with('lease-a') mock_pipe.get.assert_called_with('lease-a') mock_pipe.multi.assert_called_with() mock_pipe.setex.assert_called_with('lease-a', 86400, '1:1:1299:100') mock_pipe.execute.assert_called_with()
def test_release_lease_redis_owned_self_loses(self, mock_settings, mock_get_primary_cache_source, mock_get_connection): mock_settings.ENDPOINTS = {} mock_settings.ELASTICACHE_ENDPOINTS = ELASTICACHE_ENDPOINTS_REDIS mock_get_primary_cache_source.return_value = _get_test_arn(AWS.ELASTICACHE) mock_pipe = mock_get_connection.return_value.pipeline.return_value.__enter__.return_value mock_pipe.get.return_value = '99:99:99:99' mock_pipe.execute.side_effect = redis.WatchError ret = release_lease('a', 99, 99, 99) self.assertFalse(ret) mock_pipe.watch.assert_called_with('lease-a') mock_pipe.get.assert_called_with('lease-a') mock_pipe.multi.assert_called_with() mock_pipe.setex.assert_called_with('lease-a', 86400, '-1:-1:0:99') mock_pipe.execute.assert_called_with()
def _batch_insert_prob_redis(conn, names, all_hashes, colour, count=0): r = conn with r.pipeline() as pipe: try: pipe.watch(names) vals = get_vals(r, names, all_hashes) pipe.multi() for name, values, hs in zip(names, vals, all_hashes): for val, h in zip(values, hs): ba = BitArray() if val is None: val = b'' ba.frombytes(val) ba.setbit(colour, 1) pipe.hset(name, h, ba.tobytes()) pipe.execute() except redis.WatchError: logger.warning("Retrying %s %s " % (r, name)) if count < 5: self._batch_insert(conn, hk, colour, count=count+1) else: logger.warning( "Failed %s %s. Too many retries. Contining regardless." % (r, name))
def get_current_task(self): with self.local_redis.pipeline() as pipe: while True: try: pipe.watch(TASK_ID_KEY) task_id = int(retry_get(pipe, TASK_ID_KEY)) if task_id == self.cached_task_id: logger.debug('[worker] Returning cached task {}'.format(task_id)) break pipe.multi() pipe.get(TASK_DATA_KEY) logger.info('[worker] Getting new task {}. Cached task was {}'.format(task_id, self.cached_task_id)) self.cached_task_id, self.cached_task_data = task_id, deserialize(pipe.execute()[0]) break except redis.WatchError: continue return self.cached_task_id, self.cached_task_data
def incr(self, key, amount, maximum, ttl): with self.client.pipeline() as pipe: while True: try: pipe.watch(key) value = int(pipe.get(key) or b"0") value += amount if value > maximum: return False pipe.multi() pipe.set(key, value, px=ttl) pipe.execute() return True except redis.WatchError: continue
def decr(self, key, amount, minimum, ttl): with self.client.pipeline() as pipe: while True: try: pipe.watch(key) value = int(pipe.get(key) or b"0") value -= amount if value < minimum: return False pipe.multi() pipe.set(key, value, px=ttl) pipe.execute() return True except redis.WatchError: continue
def incr_and_sum(self, key, keys, amount, maximum, ttl): with self.client.pipeline() as pipe: while True: try: pipe.watch(key, *keys) value = int(pipe.get(key) or b"0") value += amount if value > maximum: return False values = pipe.mget(keys) total = amount + sum(int(n) for n in values if n) if total > maximum: return False pipe.multi() pipe.set(key, value, px=ttl) pipe.execute() return True except redis.WatchError: continue
def save_enqueued(self, pipe): """ Preparing job to enqueue. Works via pipeline. Nothing done if WatchError happens while next `pipeline.execute()`. """ job = self.create_job(status=JobStatus.QUEUED) self.set_job_params(pipeline=pipe) job.origin = self.origin job.enqueued_at = utcnow() if job.timeout is None: job.timeout = self.timeout job.save(pipeline=pipe) self.job = job
def save_deferred(self, depends_on, pipe): """ Preparing job to defer (add as dependent). Works via pipeline. Nothing done if WatchError happens while next `pipeline.execute()`. """ job = self.create_job(depends_on=depends_on, status=JobStatus.DEFERRED) self.set_job_params(pipeline=pipe) job.register_dependency(pipeline=pipe) job.save(pipeline=pipe) return job
def create(self, redis): """ Create job by create job context in redis, each job will create len(hosts) meta keys, conflict dectection is done by redis key exists check of job meta keys. If no conflict, job context (the `meta_keys`) will created via redis pipeline to avoid operate confilct. """ pipeline = redis.pipeline() try: pipeline.watch(self.meta_keys) for key in self.meta_keys: if pipeline.exists(key): raise JobConflictError("operate conflict, job already exists on some host(s)") LOG.info("going to create job meta data <{0}>".format(';'.join(self.meta_keys))) pipeline.multi() for key in self.meta_keys: pipeline.hmset(key, dict(startat=self._startat)) pipeline.execute() LOG.info("job meta data create finished, <{0}>".format(';'.join(self.meta_keys))) except WatchError: LOG.info("conflict detected on job meta data create <{0}>".format(';'.join(self.meta_keys))) raise JobConflictError("operate conflict, try again later") finally: pipeline.reset()
def get_delayed_writes(self, delayed_write_key): """ Method to get all delayed write-cuboid keys for a single delayed_write_key Returns: list(str): List of delayed write-cuboid keys """ write_cuboid_key_list = [] with self.status_client.pipeline() as pipe: try: # Get all items in the list and cleanup, in a transaction so other procs can't add anything pipe.watch(delayed_write_key) pipe.multi() # Get all items in the list pipe.lrange(delayed_write_key, 0, -1) # Delete the delayed-write-key as it should be empty now pipe.delete(delayed_write_key) # Delete its associated resource-delayed-write key that stores the resource string pipe.delete("RESOURCE-{}".format(delayed_write_key)) # Execute. write_cuboid_key_list = pipe.execute() # If you got here things worked OK. Clean up the result. First entry in list is the LRANGE result write_cuboid_key_list = write_cuboid_key_list[0] # Keys are encoded write_cuboid_key_list = [x.decode() for x in write_cuboid_key_list] except redis.WatchError as _: # Watch error occurred. Just bail out and let the daemon pick this up later. return [] except Exception as e: raise SpdbError("An error occurred while attempting to retrieve delay-write keys: \n {}".format(e), ErrorCodes.REDIS_ERROR) return write_cuboid_key_list
def release_gpus_in_use(driver_id, local_scheduler_id, gpu_ids, redis_client): """Release the GPUs that a given worker was using. Note that this does not affect the local scheduler's bookkeeping. It only affects the GPU allocations which are recorded in the primary Redis shard, which are redundant with the local scheduler bookkeeping. Args: driver_id: The ID of the driver that is releasing some GPUs. local_scheduler_id: The ID of the local scheduler that owns the GPUs being released. gpu_ids: The IDs of the GPUs being released. redis_client: A client for the primary Redis shard. """ # Attempt to release GPU IDs atomically. with redis_client.pipeline() as pipe: while True: try: # If this key is changed before the transaction below (the # multi/exec block), then the transaction will not take place. pipe.watch(local_scheduler_id) # Figure out which GPUs are currently in use. result = redis_client.hget(local_scheduler_id, "gpus_in_use") gpus_in_use = dict() if result is None else json.loads( result.decode("ascii")) assert driver_id in gpus_in_use assert gpus_in_use[driver_id] >= len(gpu_ids) gpus_in_use[driver_id] -= len(gpu_ids) pipe.multi() pipe.hset(local_scheduler_id, "gpus_in_use", json.dumps(gpus_in_use)) pipe.execute() # If a WatchError is not raised, then the operations should # have gone through atomically. break except redis.WatchError: # Another client must have changed the watched key between the # time we started WATCHing it and the pipeline's execution. We # should just retry. continue
def _release_lease_redis(cache_arn, correlation_id, steps, retries, fence_token): """ Releases a lease from redis. """ import redis redis_conn = get_connection(cache_arn) if not redis_conn: return # pragma: no cover with redis_conn.pipeline() as pipe: try: # get the current value of the lease (within a watch) redis_key = LEASE_DATA.LEASE_KEY_PREFIX + correlation_id pipe.watch(redis_key) current_lease_value = pipe.get(redis_key) pipe.multi() # if there is already a lease holder, then we have a few options if current_lease_value: # split the current lease apart current_steps, current_retries, current_time, current_fence_token = \ _deserialize_lease_value(current_lease_value) # release it by: # 1. setting the lease value to "unowned" (steps/retries = -1) # 2. setting it as expired (expires = 0) with set # 3. setting the fence token to the current value so it can be incremented later if (current_steps, current_retries, current_fence_token) == (steps, retries, fence_token): new_fence_token = fence_token new_lease_value = _serialize_lease_value(-1, -1, 0, new_fence_token) pipe.setex(redis_key, LEASE_DATA.LEASE_CLEANUP_TIMEOUT, new_lease_value) # otherwise, something else owns the lease, so we can't release it else: return False else: # the lease is no longer owned by anyone return False # execute the transaction pipe.execute() # if we make it this far, we have released the lease return True except redis.WatchError: return False except redis.exceptions.ConnectionError: logger.exception('') return 0
def _bump_timestamp(self, collection_id, parent_id, record=None, modified_field=None, last_modified=None): key = '{0}.{1}.timestamp'.format(collection_id, parent_id) while 1: with self._client.pipeline() as pipe: try: pipe.watch(key) previous = pipe.get(key) pipe.multi() # XXX factorize code from memory and redis backends. is_specified = (record is not None and modified_field in record or last_modified is not None) if is_specified: # If there is a timestamp in the new record, # try to use it. if last_modified is not None: current = last_modified else: current = record[modified_field] else: current = utils.msec_time() if previous and int(previous) >= current: collection_timestamp = int(previous) + 1 else: collection_timestamp = current # Return the newly generated timestamp as the current one # only if nothing else was specified. is_equal = previous and int(previous) == current if not is_specified or is_equal: current = collection_timestamp pipe.set(key, collection_timestamp) pipe.execute() return current except redis.WatchError: # pragma: no cover # Our timestamp has been modified by someone else, let's # retry. # XXX: untested. continue
def add_to_page_out(self, temp_page_out_key, lookup_key, resolution, morton, time_sample): """ Method to add a key to the page-out tracking set Args: lookup_key (str): Lookup key for a channel resolution (int): level in the resolution heirarchy morton (int): morton id for the cuboid time_sample (int): time sample for cuboid Returns: (bool, bool): Tuple where first value is if the transaction succeeded and the second is if the key is in page out already """ page_out_key = "PAGE-OUT&{}&{}".format(lookup_key, resolution) in_page_out = True cnt = 0 with self.status_client.pipeline() as pipe: while 1: try: # Create temp set pipe.watch(page_out_key) pipe.multi() pipe.sadd(temp_page_out_key, "{}&{}".format(time_sample, morton)) pipe.expire(temp_page_out_key, 15) pipe.sdiff(temp_page_out_key, page_out_key) pipe.sadd(page_out_key, "{}&{}".format(time_sample, morton)) result = pipe.execute() if len(result[2]) > 0: in_page_out = False else: in_page_out = True break except redis.WatchError as e: # Watch error occurred, try again! cnt += 1 if cnt > 200: raise SpdbError("Failed to add to page out due to timeout. {}".format(e), ErrorCodes.REDIS_ERROR) continue except Exception as e: raise SpdbError("Failed to check page-out set. {}".format(e), ErrorCodes.REDIS_ERROR) return in_page_out