我们从Python开源项目中,提取了以下11个代码示例,用于说明如何使用redis.call()。
def gather_stage(harvester, job): '''Calls the harvester's gather_stage, returning harvest object ids, with some error handling. This is split off from gather_callback so that tests can call it without dealing with queue stuff. ''' job.gather_started = datetime.datetime.utcnow() try: harvest_object_ids = harvester.gather_stage(job) except (Exception, KeyboardInterrupt): harvest_objects = model.Session.query(HarvestObject).filter_by( harvest_job_id=job.id ) for harvest_object in harvest_objects: model.Session.delete(harvest_object) model.Session.commit() raise finally: job.gather_finished = datetime.datetime.utcnow() job.save() return harvest_object_ids
def filter(self, values: set): if not isinstance(values, set): raise ValueError if values: key = "{}:tasks:processed".format(self.name) lua = """ local results = {} for _, e in pairs(ARGV) do local x = redis.call('sismember', KEYS[1], e) if x == 1 then table.insert(results, e) end end return results """ script = self.db.register_script(lua) results = script(keys=[key], args={self.codec.dumps(v) for v in values}) return values - {self.codec.loads(e) for e in results}
def queue_purge(self, queue=None): ''' Purge the consumer's queue. The ``queue`` parameter exists only for compatibility and is ignored. ''' # Use a script to make the operation atomic lua_code = b''' local routing_key = KEYS[1] local message_key = ARGV[1] local count = 0 while true do local s = redis.call("lpop", routing_key) if s == false then break end local value = cjson.decode(s) local id = value[message_key] local persistance_key = routing_key .. ":" .. id redis.call("del", persistance_key) count = count + 1 end return count ''' script = self.redis.register_script(lua_code) return script(keys=[self.routing_key], args=[self.message_key])
def __call__(self, record_id, unique_key=None): # Support DAL shortcut query: table(record_id) q = self.id # This will call the __getattr__ below # returning a MockQuery # Instructs MockQuery, to behave as db(table.id == record_id) q.op = 'eq' q.value = record_id q.unique_key = unique_key row = q.select() return row[0] if row else Storage()
def __init__( self, expiry=60, hosts=None, prefix="asgi:", group_expiry=86400, capacity=100, channel_capacity=None, symmetric_encryption_keys=None, stats_prefix="asgi-meta:", connection_kwargs=None, ): super(RedisChannelLayer, self).__init__( expiry=expiry, hosts=hosts, prefix=prefix, group_expiry=group_expiry, capacity=capacity, channel_capacity=channel_capacity, symmetric_encryption_keys=symmetric_encryption_keys, stats_prefix=stats_prefix, connection_kwargs=connection_kwargs, ) self.hosts = self._setup_hosts(hosts) # Precalculate some values for ring selection self.ring_size = len(self.hosts) # Create connections ahead of time (they won't call out just yet, but # we want to connection-pool them later) self._connection_list = self._generate_connections( self.hosts, redis_kwargs=connection_kwargs or {}, ) self._receive_index_generator = itertools.cycle(range(len(self.hosts))) self._send_index_generator = itertools.cycle(range(len(self.hosts))) self._register_scripts() ### Setup ###
def clear_date(date): conn = getconn(date) pattern = '*_{}*'.format(date) def flush_all(r): print 'clearing {} on {}'.format(r, date) r.flushdb() def clear_conn_lua(r): script = ''' for _, k in ipairs(redis.call('keys', ARGV[1])) do redis.call('del', k) end ''' r.eval(script, 0, pattern) def clear_conn(r): print 'clearing {} on {}'.format(r, date) p = r.pipeline() cursor = 0 while True: cursor, keys = r.scan(cursor, match=pattern, count=100000) print 'curosr {}, keys {}'.format(cursor, len(keys)) if len(keys): p.delete(*keys) if int(cursor) == 0: break p.execute() tasks = [] for r in conn.conns: t = threading.Thread(target=flush_all, args=(r,)) t.start() tasks.append(t) for t in tasks: t.join()
def deleteBanchoSessions(self): """ Remove all `peppy:sessions:*` redis keys. Call at bancho startup to delete old cached sessions :return: """ try: # TODO: Make function or some redis meme glob.redis.eval("return redis.call('del', unpack(redis.call('keys', ARGV[1])))", 0, "peppy:sessions:*") except redis.RedisError: pass
def script_load(script): sha = [None] #A def call(conn, keys=[], args=[], force_eval=False): #B if not force_eval: if not sha[0]: #C sha[0] = conn.execute_command( #D "SCRIPT", "LOAD", script, parse="LOAD") #D try: return conn.execute_command( #E "EVALSHA", sha[0], len(keys), *(keys+args)) #E except redis.exceptions.ResponseError as msg: if not msg.args[0].startswith("NOSCRIPT"): #F raise #F return conn.execute_command( #G "EVAL", script, len(keys), *(keys+args)) #G return call #H # <end id="script-load"/> #A Store the cached SHA1 hash of the result of SCRIPT LOAD in a list so we can change it later from within the call() function #B When calling the "loaded script", you must provide the connection, the set of keys that the script will manipulate, and any other arguments to the function #C We will only try loading the script if we don't already have a cached SHA1 hash #D Load the script if we don't already have the SHA1 hash cached #E Execute the command from the cached SHA1 #F If the error was unrelated to a missing script, re-raise the exception #G If we received a script-related error, or if we need to force-execute the script, directly execute the script, which will automatically cache the script on the server (with the same SHA1 that we've already cached) when done #H Return the function that automatically loads and executes scripts when called #END
def fetch_and_import_stages(harvester, obj): obj.fetch_started = datetime.datetime.utcnow() obj.state = "FETCH" obj.save() success_fetch = harvester.fetch_stage(obj) obj.fetch_finished = datetime.datetime.utcnow() obj.save() if success_fetch is True: # If no errors where found, call the import method obj.import_started = datetime.datetime.utcnow() obj.state = "IMPORT" obj.save() success_import = harvester.import_stage(obj) obj.import_finished = datetime.datetime.utcnow() if success_import: obj.state = "COMPLETE" if success_import is 'unchanged': obj.report_status = 'not modified' obj.save() return else: obj.state = "ERROR" obj.save() elif success_fetch == 'unchanged': obj.state = 'COMPLETE' obj.report_status = 'not modified' obj.save() return else: obj.state = "ERROR" obj.save() if obj.state == 'ERROR': obj.report_status = 'errored' elif obj.current == False: obj.report_status = 'deleted' elif len(model.Session.query(HarvestObject) .filter_by(package_id = obj.package_id) .limit(2) .all()) == 2: obj.report_status = 'updated' else: obj.report_status = 'added' obj.save()
def transfer_to_redis(request): """ ????????Redis? """ success, msg = False, '' try: config_data = get_config_redis_json() logger.debug(config_data) r = redis.StrictRedis(host=settings.REDIS_HOST, port=settings.REDIS_PORT, db=settings.REDIS_DB, password=settings.REDIS_PASSWORD) # ??transaction=True pipe = r.pipeline(transaction=True) # ????????? pattern_delete_lua = """ local keys = redis.call('keys', ARGV[1]) for i = 1, table.getn(keys) do redis.call('del', keys[i]) end """ pattern_delete = r.register_script(pattern_delete_lua) pattern_delete(keys=[''], args=['%s:*' % settings.CLIENT_CONFIG_REDIS_PREFIX], client=pipe) for t in config_data: logger.debug(t) # # client = {} # for k, v in t.iteritems(): # if k != 'endpoints': # client[k] = v pipe.set('%s:%s' % (settings.CLIENT_CONFIG_REDIS_PREFIX, t['app_id']), json_dumps(t)) # for s in t['endpoints']: # pipe.set('%s:%s:%s:%s' % (settings.PROXY_CONFIG_REDIS_PREFIX, t['access_key'], s['name'], s['version']), # json_dumps(s)) # pipe.delete('config:*') # the EXECUTE call sends all buffered commands to the server, returning # a list of responses, one for each command. pipe.execute() success = True except Exception as e: msg = '??????? Redis ????' logger.error(e.message) logger.error(traceback.format_exc()) return http_response_json({'success': success, 'msg': msg})