@Override public BoundSetOperations<K, V> boundSetOps(K key) { try { return redisTemplate.boundSetOps(key); } catch (Exception ex) { throw new RedisBaoException(ex); } }
@Test public void testRedisDelEventsAreDispatchedInSessionTaskExecutor() throws InterruptedException { BoundSetOperations<Object, Object> ops = this.redis.boundSetOps( "spring:session:RedisListenerContainerTaskExecutorITests:expirations:dummy"); ops.add("value"); ops.remove("value"); assertThat(this.executor.taskDispatched()).isTrue(); }
@Override public BoundSetOperations<K, V> boundSetOps(K key) { throw new MethodNotSupportException("myRedisTemplate not support this method : boundSetOps(K key) , please use opsForXX"); //return new DefaultBoundSetOperations<K, V>(key, this); }
/** * Finds message IDs in the Redis store for which no response from consumers * has arrived since the configured timeout value */ @Scheduled(fixedRateString="${consistemncy.checker.rate:1000}") public void checkDeliveredMessages() { if( redisTemplate == null ) { return; } try { for(int i=0; i<numPublishers; i++) { Date checkTime = new Date(); long checkTimeLong = checkTime.getTime(); long checkSince = checkTimeLong - timeSince; BoundZSetOperations<String, Long> publishedZSetOps = redisTemplate.boundZSetOps(utils.getPublishedZKey(i)); BoundSetOperations<String, Long> publishedSetOps = redisTemplate.boundSetOps(utils.getPublishedKey(i)); // Get the ids of the messages published longer than timeout to // wait for their reception Set<Long> oldPublishedIds = publishedZSetOps.rangeByScore(0, checkSince); Set<Long> oldUnrespondedIds = new HashSet<>( oldPublishedIds ); for(int j=0; j<numConsumers; j++) { log.debug("Checking messages published by {} at {} {} ({}) since ({})", utils.getPublishedKey(i), utils.getReceivedKey(j), checkTime, checkTimeLong, checkSince); BoundSetOperations<String, Long> receivedSetOps = redisTemplate.boundSetOps(utils.getReceivedKey(j)); // Get the Set difference between all published ID minus all responded ids Set<Long> unresponded = publishedSetOps.diff( utils.getReceivedKey(j) ); // Filter out recent IDs for which the timeout hasn't fired yet oldUnrespondedIds.retainAll(unresponded); if( !oldUnrespondedIds.isEmpty() ) { log.error("NO RESPONSE in {} FOR {} MESSAGES: {}", utils.getReceivedKey(j), utils.getPublishedKey(i), oldPublishedIds); } // Clean old checked records receivedSetOps.remove(oldPublishedIds); } publishedZSetOps.removeRangeByScore(0, checkSince); publishedSetOps.remove(oldPublishedIds); } } catch(Exception ex) { log.warn("Consistency could not be checked: {}", ex.getMessage()); } }
BoundSetOperations<K, V> boundSetOps(K key);