@Override public Iterable<Metric<?>> findAll(String group) { BoundZSetOperations<String, String> zSetOperations = this.redisOperations .boundZSetOps(keyFor(group)); Set<String> keys = zSetOperations.range(0, -1); Iterator<String> keysIt = keys.iterator(); List<Metric<?>> result = new ArrayList<Metric<?>>(keys.size()); List<String> values = this.redisOperations.opsForValue().multiGet(keys); for (String v : values) { String key = keysIt.next(); result.add(deserialize(group, key, v, zSetOperations.score(key))); } return result; }
@Override public void set(String group, Collection<Metric<?>> values) { String groupKey = keyFor(group); trackMembership(groupKey); BoundZSetOperations<String, String> zSetOperations = this.redisOperations .boundZSetOps(groupKey); for (Metric<?> metric : values) { String raw = serialize(metric); String key = keyFor(metric.getName()); zSetOperations.add(key, metric.getValue().doubleValue()); this.redisOperations.opsForValue().set(key, raw); } }
@Override public void increment(String group, Delta<?> delta) { String groupKey = keyFor(group); trackMembership(groupKey); BoundZSetOperations<String, String> zSetOperations = this.redisOperations .boundZSetOps(groupKey); String key = keyFor(delta.getName()); double value = zSetOperations.incrementScore(key, delta.getValue().doubleValue()); String raw = serialize( new Metric<Double>(delta.getName(), value, delta.getTimestamp())); this.redisOperations.opsForValue().set(key, raw); }
@Override public void reset(String group) { String groupKey = keyFor(group); if (this.redisOperations.hasKey(groupKey)) { BoundZSetOperations<String, String> zSetOperations = this.redisOperations .boundZSetOps(groupKey); Set<String> keys = zSetOperations.range(0, -1); for (String key : keys) { this.redisOperations.delete(key); } this.redisOperations.delete(groupKey); } this.zSetOperations.remove(groupKey); }
protected void addOps(final EmailSchedulingData emailSchedulingData) { final String orderingKey = orderingKey(emailSchedulingData); final String valueKey = emailSchedulingData.getId(); final double score = calculateScore(emailSchedulingData); BoundZSetOperations<String, String> orderingZSetOps = orderingTemplate.boundZSetOps(orderingKey); orderingZSetOps.add(valueKey, score); orderingZSetOps.persist(); BoundValueOperations<String, EmailSchedulingData> valueValueOps = valueTemplate.boundValueOps(valueKey); valueValueOps.set(emailSchedulingData); valueValueOps.persist(); }
protected Collection<EmailSchedulingData> getNextBatchOps(final String orderingKey, final int batchMaxSize) { Preconditions.checkArgument(batchMaxSize > 0, "Batch size should be a positive integer."); final BoundZSetOperations<String, String> boundZSetOperations = orderingTemplate.boundZSetOps(orderingKey); final long amount = boundZSetOperations.size(); final Set<String> valueIds = boundZSetOperations.range(0, max(0, min(amount, batchMaxSize) - 1)); return valueIds.stream() .map(id -> getOps(id)) .filter(Objects::nonNull) .collect(Collectors.toSet()); }
@Override public BoundZSetOperations<K, V> boundZSetOps(K key) { try { return redisTemplate.boundZSetOps(key); } catch (Exception ex) { throw new RedisBaoException(ex); } }
public BoundZSetOperations<String, String> getUserSet() { return redisTemplate.boundZSetOps(onlineUserListKey); }
@Override public BoundZSetOperations<K, V> boundZSetOps(K key) { throw new MethodNotSupportException("myRedisTemplate not support this method : boundZSetOps(K key) , please use opsForXX"); //return new DefaultBoundZSetOperations<K, V>(key, this); }
/** * Finds message IDs in the Redis store for which no response from consumers * has arrived since the configured timeout value */ @Scheduled(fixedRateString="${consistemncy.checker.rate:1000}") public void checkDeliveredMessages() { if( redisTemplate == null ) { return; } try { for(int i=0; i<numPublishers; i++) { Date checkTime = new Date(); long checkTimeLong = checkTime.getTime(); long checkSince = checkTimeLong - timeSince; BoundZSetOperations<String, Long> publishedZSetOps = redisTemplate.boundZSetOps(utils.getPublishedZKey(i)); BoundSetOperations<String, Long> publishedSetOps = redisTemplate.boundSetOps(utils.getPublishedKey(i)); // Get the ids of the messages published longer than timeout to // wait for their reception Set<Long> oldPublishedIds = publishedZSetOps.rangeByScore(0, checkSince); Set<Long> oldUnrespondedIds = new HashSet<>( oldPublishedIds ); for(int j=0; j<numConsumers; j++) { log.debug("Checking messages published by {} at {} {} ({}) since ({})", utils.getPublishedKey(i), utils.getReceivedKey(j), checkTime, checkTimeLong, checkSince); BoundSetOperations<String, Long> receivedSetOps = redisTemplate.boundSetOps(utils.getReceivedKey(j)); // Get the Set difference between all published ID minus all responded ids Set<Long> unresponded = publishedSetOps.diff( utils.getReceivedKey(j) ); // Filter out recent IDs for which the timeout hasn't fired yet oldUnrespondedIds.retainAll(unresponded); if( !oldUnrespondedIds.isEmpty() ) { log.error("NO RESPONSE in {} FOR {} MESSAGES: {}", utils.getReceivedKey(j), utils.getPublishedKey(i), oldPublishedIds); } // Clean old checked records receivedSetOps.remove(oldPublishedIds); } publishedZSetOps.removeRangeByScore(0, checkSince); publishedSetOps.remove(oldPublishedIds); } } catch(Exception ex) { log.warn("Consistency could not be checked: {}", ex.getMessage()); } }
BoundZSetOperations<K, V> boundZSetOps(K key);