/** * Use transactions to add index file and then delete the old one * * @param fileLengthKey the key using for hash file length * @param fileDataKey the key using for hash file data * @param oldField the old hash field * @param newField the new hash field * @param values the data values of the old hash field * @param fileLength the data length of the old hash field */ @Override public void rename(String fileLengthKey, String fileDataKey, String oldField, String newField, List<byte[]> values, long fileLength) { ShardedJedis shardedJedis = getShardedJedis(); ShardedJedisPipeline pipelined = shardedJedis.pipelined(); //add new file length pipelined.hset(fileLengthKey.getBytes(), newField.getBytes(), Longs.toByteArray(fileLength)); //add new file content Long blockSize = getBlockSize(fileLength); for (int i = 0; i < blockSize; i++) { pipelined.hset(fileDataKey.getBytes(), getBlockName(newField, i), compressFilter(values.get(i))); } pipelined.sync(); shardedJedis.close(); values.clear(); deleteFile(fileLengthKey, fileDataKey, oldField, blockSize); }
@Override public void saveFile(String fileLengthKey, String fileDataKey, String fileName, List<byte[]> values, long fileLength) { ShardedJedis shardedJedis = getShardedJedis(); ShardedJedisPipeline pipelined = shardedJedis.pipelined(); pipelined.hset(fileLengthKey.getBytes(), fileName.getBytes(), Longs.toByteArray(fileLength)); Long blockSize = getBlockSize(fileLength); for (int i = 0; i < blockSize; i++) { pipelined.hset(fileDataKey.getBytes(), getBlockName(fileName, i), compressFilter(values.get(i))); if (i % Constants.SYNC_COUNT == 0) { pipelined.sync(); pipelined = shardedJedis.pipelined(); } } pipelined.sync(); shardedJedis.close(); values.clear(); }
@Test public void testSyncWithNoCommandQueued() { JedisShardInfo shardInfo1 = new JedisShardInfo(redis1.getHost(), redis1.getPort()); JedisShardInfo shardInfo2 = new JedisShardInfo(redis2.getHost(), redis2.getPort()); shardInfo1.setPassword("foobared"); shardInfo2.setPassword("foobared"); List<JedisShardInfo> shards = new ArrayList<JedisShardInfo>(); shards.add(shardInfo1); shards.add(shardInfo2); ShardedJedis jedis2 = new ShardedJedis(shards); ShardedJedisPipeline pipeline = jedis2.pipelined(); pipeline.sync(); jedis2.close(); jedis2 = new ShardedJedis(shards); pipeline = jedis2.pipelined(); List<Object> resp = pipeline.syncAndReturnAll(); assertTrue(resp.isEmpty()); jedis2.close(); }
/** * 在hash中存入单个键值对 * * @param key * @param field * @param value * @param expireTime */ public void hsetByte(String key, String field, Object value, int expireTime) { ShardedJedis shardedJedis = shardedJedisPool.getResource(); if (shardedJedis == null) { return; } try { ShardedJedisPipeline pipelined = shardedJedis.pipelined(); pipelined.hset(serializer.serialize(key), serializer.serialize(field), serializer.serialize(value)); pipelined.expire(serializer.serialize(key), expireTime); } catch (Exception e) { log.error(e.getMessage(), e); } finally { shardedJedis.close(); } }
public void testShardPipelined() {// 0.127秒 JedisShardInfo jedis = new JedisShardInfo("", 6379); jedis.setPassword("b840fc02d52404542994"); List<JedisShardInfo> shards = Arrays.asList(jedis); ShardedJedis sharding = new ShardedJedis(shards); ShardedJedisPipeline pipeline = sharding.pipelined(); long start = System.currentTimeMillis(); for (int i = 0; i < 1000; i++) { pipeline.set("n" + i, "n" + i); System.out.println(i); } pipeline.syncAndReturnAll(); long end = System.currentTimeMillis(); System.out.println("共花费:" + (end - start) / 1000.0 + "秒"); sharding.disconnect(); try { Closeables.close(sharding, true); } catch (IOException e) { e.printStackTrace(); } }
@Override public Map<String, String> get(final String... keys) { Assert.notEmpty(keys); ShardedJedis jedis = null; try { jedis = POOL.getJedis(config.getRedisType()); final ShardedJedisPipeline pipelined = jedis.pipelined(); final Map<String, Response<String>> values = Maps.newHashMap(); for (String key : keys) { values.put(key, pipelined.get(key)); } pipelined.sync(); final Map<String, String> valueMap = Maps.newHashMap(); values.forEach((key, response) -> valueMap.put(key, response.get())); return valueMap; } catch (final Throwable e) { throw new RedisClientException(e.getMessage(), e); } finally { POOL.close(jedis); } }
@Override public Map<String, Boolean> set(final Map<String, Object> map) { Assert.notEmpty(map); ShardedJedis jedis = null; try { jedis = POOL.getJedis(config.getRedisType()); final ShardedJedisPipeline pipelined = jedis.pipelined(); final Map<String, Response<String>> responses = Maps.newHashMap(); map.forEach((key, value) -> responses.put(key, pipelined.set(key, toJSONString(value)))); pipelined.sync(); final Map<String, Boolean> values = Maps.newHashMap(); responses.forEach((key, response) -> values.put(key, isOK(response.get()))); return values; } catch (final Throwable e) { throw new RedisClientException(e.getMessage(), e); } finally { POOL.close(jedis); } }
@Override public boolean setByNX(final String key, final String value, final int timeout) { Assert.hasText(key); Assert.hasText(value); ShardedJedis jedis = null; try { jedis = POOL.getJedis(config.getRedisType()); final ShardedJedisPipeline pipeline = jedis.pipelined(); final Response<Long> set = pipeline.setnx(key, value); final Response<Long> expire = pipeline.expire(key, timeout); pipeline.sync(); return isSuccess(set.get()) && isSuccess(expire.get()); } catch (final Throwable e) { throw new RedisClientException(e.getMessage(), e); } finally { POOL.close(jedis); } }
@Override public Map<String, Boolean> setByNX(final Map<String, Object> map) { Assert.notEmpty(map); ShardedJedis jedis = null; try { jedis = POOL.getJedis(config.getRedisType()); final ShardedJedisPipeline pipelined = jedis.pipelined(); final Map<String, Response<Long>> responses = Maps.newHashMap(); for (Entry<String, Object> entry : map.entrySet()) { responses.put(entry.getKey(), pipelined.setnx(entry.getKey(), toJSONString(entry.getValue()))); } pipelined.sync(); final Map<String, Boolean> values = Maps.newHashMap(); responses.forEach((key, response) -> values.put(key, isSuccess(response.get()))); return values; } catch (final Throwable e) { throw new RedisClientException(e.getMessage(), e); } finally { POOL.close(jedis); } }
@Override public Map<String, Boolean> hsetByNX(final String key, final Map<String, Object> map) { Assert.hasText(key); Assert.notEmpty(map); ShardedJedis jedis = null; try { jedis = POOL.getJedis(config.getRedisType()); final ShardedJedisPipeline pipeline = jedis.pipelined(); final Map<String, Response<Long>> responses = Maps.newHashMap(); for (Entry<String, Object> entry : map.entrySet()) { responses.put(entry.getKey(), pipeline.hsetnx(key, entry.getKey(), toJSONString(entry.getValue()))); } pipeline.sync(); final Map<String, Boolean> values = Maps.newHashMap(); responses.forEach((field, response) -> values.put(field, isSuccess(response.get()))); return values; } catch (final Throwable e) { throw new RedisClientException(e.getMessage(), e); } finally { POOL.close(jedis); } }
@Override public List<String> lrangeltrim(final String key, final int count) { Assert.hasText(key); ShardedJedis jedis = null; try { jedis = POOL.getJedis(config.getRedisType()); final ShardedJedisPipeline pipeline = jedis.pipelined(); final Response<List<String>> values = pipeline.lrange(key, 0, count); pipeline.ltrim(key, count + 1, -1); pipeline.sync(); return values.get(); } catch (final Throwable e) { throw new RedisClientException(e.getMessage(), e); } finally { POOL.close(jedis); } }
@Override public Map<String, Long> scard(final String... keys) { Assert.notEmpty(keys); ShardedJedis jedis = null; try { jedis = POOL.getJedis(config.getRedisType()); final ShardedJedisPipeline pipeline = jedis.pipelined(); final Map<String, Response<Long>> responses = Maps.newHashMap(); for (String key : keys) { responses.put(key, pipeline.scard(key)); } pipeline.sync(); final Map<String, Long> values = Maps.newHashMap(); responses.forEach((key, response) -> values.put(key, response.get())); return values; } catch (final Throwable e) { throw new RedisClientException(e.getMessage(), e); } finally { POOL.close(jedis); } }
@Override public String sets(final String... keyvalues) { return exec0(new ShardedJedisCallback<String>() { @Override public String doCallback(ShardedJedis jedis) { Object[] args = Arrays.asList(keyvalues).toArray(); final Map<String, Object> map = MapUtil.gmap(args); pipelined(new PiplineCallbackAdapter() { @Override public void doCallback(ShardedJedisPipeline pipeline) { for (Entry<String, Object> entry : map.entrySet()) { pipeline.set(entry.getKey(), entry.getValue().toString()); } } }); return new String(); } }); }
@Override public Long del(final String... keys) { return exec0(new ShardedJedisCallback<Long>() { @Override public Long doCallback(ShardedJedis jedis) { pipelined(new PiplineCallbackAdapter() { @Override public void doCallback(ShardedJedisPipeline pipeline) { for (String key : keys) { pipeline.del(key); } } }); return new Long(keys.length); } }); }
/** * 批量的 {@link #getString(String)} * * @param keys key数组 * @return value的集合 */ public List<String> batchGetString(final String[] keys) { return new Executor<List<String>>(shardedJedisPool) { @Override List<String> execute() { ShardedJedisPipeline pipeline = jedis.pipelined(); List<String> result = new ArrayList<String>(keys.length); List<Response<String>> responses = new ArrayList<Response<String>>(keys.length); for (String key : keys) { responses.add(pipeline.get(key)); } pipeline.sync(); for (Response<String> resp : responses) { result.add(resp.get()); } return result; } }.getResult(); }
/** * 批量的{@link #hashMultipleGet(String, String...)},在管道中执行 * * @param pairs 多个hash的多个field * @return 执行结果的集合 */ public List<List<String>> batchHashMultipleGet(final List<Pair<String, String[]>> pairs) { return new Executor<List<List<String>>>(shardedJedisPool) { @Override List<List<String>> execute() { ShardedJedisPipeline pipeline = jedis.pipelined(); List<List<String>> result = new ArrayList<List<String>>(pairs.size()); List<Response<List<String>>> responses = new ArrayList<Response<List<String>>>(pairs.size()); for (Pair<String, String[]> pair : pairs) { responses.add(pipeline.hmget(pair.getKey(), pair.getValue())); } pipeline.sync(); for (Response<List<String>> resp : responses) { result.add(resp.get()); } return result; } }.getResult(); }
/** * 批量的{@link #hashGetAll(String)} * * @param keys key的数组 * @return 执行结果的集合 */ public List<Map<String, String>> batchHashGetAll(final String... keys) { return new Executor<List<Map<String, String>>>(shardedJedisPool) { @Override List<Map<String, String>> execute() { ShardedJedisPipeline pipeline = jedis.pipelined(); List<Map<String, String>> result = new ArrayList<Map<String, String>>(keys.length); List<Response<Map<String, String>>> responses = new ArrayList<Response<Map<String, String>>>(keys.length); for (String key : keys) { responses.add(pipeline.hgetAll(key)); } pipeline.sync(); for (Response<Map<String, String>> resp : responses) { result.add(resp.get()); } return result; } }.getResult(); }
/** * 批量的{@link #hashMultipleGet(String, String...)},与{@link #batchHashGetAll(String...)}不同的是,返回值为Map类型 * * @param keys key的数组 * @return 多个hash的所有filed和value */ public Map<String, Map<String, String>> batchHashGetAllForMap(final String... keys) { return new Executor<Map<String, Map<String, String>>>(shardedJedisPool) { @Override Map<String, Map<String, String>> execute() { ShardedJedisPipeline pipeline = jedis.pipelined(); // 设置map容量防止rehash int capacity = 1; while ((int) (capacity * 0.75) <= keys.length) { capacity <<= 1; } Map<String, Map<String, String>> result = new HashMap<String, Map<String, String>>(capacity); List<Response<Map<String, String>>> responses = new ArrayList<Response<Map<String, String>>>(keys.length); for (String key : keys) { responses.add(pipeline.hgetAll(key)); } pipeline.sync(); for (int i = 0; i < keys.length; ++i) { result.put(keys[i], responses.get(i).get()); } return result; } }.getResult(); }
/** * 一次获得多个链表的数据 * * @param keys key的数组 * @return 执行结果 */ public Map<String, List<String>> batchGetAllList(final List<String> keys) { return new Executor<Map<String, List<String>>>(shardedJedisPool) { @Override Map<String, List<String>> execute() { ShardedJedisPipeline pipeline = jedis.pipelined(); Map<String, List<String>> result = new HashMap<String, List<String>>(); List<Response<List<String>>> responses = new ArrayList<Response<List<String>>>(keys.size()); for (String key : keys) { responses.add(pipeline.lrange(key, 0, -1)); } pipeline.sync(); for (int i = 0; i < keys.size(); ++i) { result.put(keys.get(i), responses.get(i).get()); } return result; } }.getResult(); }
/** * Use transactions to delete index file * * @param fileLengthKey the key using for hash file length * @param fileDataKey the key using for hash file data * @param field the hash field * @param blockSize the index file data block size */ @Override public void deleteFile(String fileLengthKey, String fileDataKey, String field, long blockSize) { ShardedJedis shardedJedis = getShardedJedis(); ShardedJedisPipeline pipelined = shardedJedis.pipelined(); //delete file length pipelined.hdel(fileLengthKey.getBytes(), field.getBytes()); //delete file content for (int i = 0; i < blockSize; i++) { byte[] blockName = getBlockName(field, i); pipelined.hdel(fileDataKey.getBytes(), blockName); } pipelined.sync(); shardedJedis.close(); }
@Override public List<byte[]> loadFileOnce(String fileDataKey, String fileName, long blockSize) { ShardedJedis shardedJedis = getShardedJedis(); ShardedJedisPipeline pipelined = shardedJedis.pipelined(); List<byte[]> res = new ArrayList<>(); List<Response<byte[]>> temps = new ArrayList<>(); int temp = 0; //如果不分批次sync容易read time out和Java heap space while (temp < blockSize) { Response<byte[]> data = pipelined.hget(fileDataKey.getBytes(), getBlockName(fileName, temp)); temps.add(data); if (temp % Constants.SYNC_COUNT == 0) { pipelined.sync(); res.addAll(temps.stream().map(response -> uncompressFilter(response.get())).collect(Collectors.toList())); temps.clear(); pipelined = shardedJedis.pipelined(); } temp++; } try { pipelined.sync(); } catch (JedisConnectionException e) { log.error("pipelined = {}, blockSize = {}!", pipelined.toString(), blockSize); log.error("", e); } finally { shardedJedis.close(); } res.addAll(temps.stream().map(response -> uncompressFilter(response.get())).collect(Collectors.toList())); temps.clear(); return res; }
@Test public void shardedPipeline() { List<JedisShardInfo> shards = new ArrayList<JedisShardInfo>(); shards.add(new JedisShardInfo(redis1.getHost(), redis1.getPort())); shards.add(new JedisShardInfo(redis2.getHost(), redis2.getPort())); shards.get(0).setPassword("foobared"); shards.get(1).setPassword("foobared"); ShardedJedis jedis = new ShardedJedis(shards); final List<String> keys = getKeysDifferentShard(jedis); jedis.set(keys.get(0), "a"); jedis.set(keys.get(1), "b"); assertNotSame(jedis.getShard(keys.get(0)), jedis.getShard(keys.get(1))); List<Object> results = jedis.pipelined(new ShardedJedisPipeline() { public void execute() { get(keys.get(0)); get(keys.get(1)); } }); List<Object> expected = new ArrayList<Object>(2); expected.add(SafeEncoder.encode("a")); expected.add(SafeEncoder.encode("b")); assertEquals(2, results.size()); assertArrayEquals(SafeEncoder.encode("a"), (byte[]) results.get(0)); assertArrayEquals(SafeEncoder.encode("b"), (byte[]) results.get(1)); }
@Test public void pipeline() throws UnsupportedEncodingException { ShardedJedisPipeline p = jedis.pipelined(); p.set("foo", "bar"); p.get("foo"); List<Object> results = p.syncAndReturnAll(); assertEquals(2, results.size()); assertEquals("OK", results.get(0)); assertEquals("bar", results.get(1)); }
@Test(expected = JedisDataException.class) public void pipelineResponseWithinPipeline() { jedis.set("string", "foo"); ShardedJedisPipeline p = jedis.pipelined(); Response<String> string = p.get("string"); string.get(); p.sync(); }
@Test public void canRetrieveUnsetKey() { ShardedJedisPipeline p = jedis.pipelined(); Response<String> shouldNotExist = p.get(UUID.randomUUID().toString()); p.sync(); assertNull(shouldNotExist.get()); }
public List<Object> pAddSet(String key, int seconds, String... values) { try (ShardedJedis shardedJedis = shardedJedisPool.getResource()) { ShardedJedisPipeline pipeline = shardedJedis.pipelined(); for(String value : values) { pipeline.sadd(key, value); } pipeline.expire(key, seconds); List<Object> result = pipeline.syncAndReturnAll(); result.remove(result.size() -1); return result; } }
public void testShardPipelinnedPool() {// 0.124秒 JedisShardInfo jedis = new JedisShardInfo("", 6379); jedis.setPassword("b840fc02d52404542994"); List<JedisShardInfo> shards = Arrays.asList(jedis); ShardedJedisPool pool = new ShardedJedisPool(new JedisPoolConfig(), shards); ShardedJedis sharding = pool.getResource(); ShardedJedisPipeline pipeline = sharding.pipelined(); long start = System.currentTimeMillis(); for (int i = 0; i < 1000; i++) { pipeline.set("n" + i, "n" + i); System.out.println(i); } pipeline.syncAndReturnAll(); long end = System.currentTimeMillis(); System.out.println("共花费:" + (end - start) / 1000.0 + "秒"); sharding.disconnect(); pool.destroy(); try { Closeables.close(sharding, true); Closeables.close(pool, true); } catch (IOException e) { e.printStackTrace(); } }