@Test public void testRedisMap() { Object object = redisTemplate.execute(new SessionCallback() { @Override public Object execute(RedisOperations operations) throws DataAccessException { operations.multi(); operations.opsForValue().get("test"); operations.delete("test"); operations.opsForValue().set("test", "6"); List<String> rs = operations.exec(); System.out.println(" rs:" + rs.toString()); return rs; } }); List<Object> strings = (List<Object>) object; for (Object str : strings) { System.err.println(str.toString()); } }
public Object put(final String key, final Object value) throws CacheException { final String realKey = key(key); return redisTemplate.execute(new SessionCallback<Object>(){ public Object execute(RedisOperations operations) throws DataAccessException { operations.multi();//事务开启 operations.opsForValue().get(realKey); if(cacheExpire){ operations.opsForValue().set(realKey, value, cacheExpireSeconds, TimeUnit.SECONDS); }else{ operations.opsForValue().set(realKey, value); } List<Object> results = operations.exec(); //结束事务 return results.get(0); } }); }
@Test public void testUseTransactionSimple() { redisTemplate.execute(new SessionCallback<List<Object>>() { @SuppressWarnings({ "rawtypes", "unchecked" }) @Override public List<Object> execute(RedisOperations operations) throws DataAccessException { operations.multi(); operations.opsForValue().set("test1", "value1"); operations.opsForSet().add("testSet1", "value1", "value2"); // 这里获取 test1 为 null,因为事务还没有提交 assertThat(operations.opsForValue().get("test1")).isNull(); return operations.exec(); } }); assertThat(redisTemplate.opsForValue().get("test1")).isEqualTo("value1"); MatcherAssert.assertThat(redisTemplate.opsForSet().members("testSet1"), Matchers.containsInAnyOrder("value1", "value2")); }
public Object remove(final String key) throws CacheException { final String realKey = key(key); return redisTemplate.execute(new SessionCallback<Object>(){ public Object execute(RedisOperations operations) throws DataAccessException { operations.multi();//事务开启 operations.opsForValue().get(realKey); operations.delete(realKey); List<Object> results = operations.exec(); //结束事务 return results.get(0); } }); }
@Override public <T> T execute(SessionCallback<T> session) { try { return redisTemplate.execute(session); } catch (Exception ex) { throw new RedisBaoException(ex); } }
@Test public void canRetryAfterRedisConnectionError() throws InterruptedException { // given doThrow(RedisConnectionFailureException.class).when(redisTemplate).execute(any(SessionCallback.class)); // execute scheduler.initialize(); Thread.sleep(500); // assert verify(redisTemplate, times(MAX_RETRIES)).execute(any(SessionCallback.class)); }
@Test public void canRecoverAfterSingleConnectionError() throws InterruptedException { // given when(redisTemplate.execute(any(SessionCallback.class))) .thenThrow(RedisConnectionFailureException.class) .thenReturn(true); // execute scheduler.initialize(); Thread.sleep(500); // assert verify(redisTemplate, atLeast(MAX_RETRIES + 1)).execute(any(SessionCallback.class)); }
@Test public void testWatch() throws InterruptedException { final String watchKey = "watchKey"; redisTemplate.opsForValue().set(watchKey, "value0"); Thread t1 = new Thread(new Runnable() { @Override public void run() { List<Object> result = redisTemplate.execute(new SessionCallback<List<Object>>() { @SuppressWarnings({ "rawtypes", "unchecked" }) @Override public List<Object> execute(RedisOperations operations) throws DataAccessException { operations.watch(watchKey); // watch String oldValue = (String) operations.opsForValue().get(watchKey); String newValue = oldValue + "-11"; operations.multi(); // 开启事物 operations.opsForValue().set(watchKey, newValue); // 修改值 try { Thread.sleep(1000); // 让子线程暂停下,以便其他线程修改 watchKey 的值 } catch (InterruptedException e) { e.printStackTrace(); } return operations.exec(); // 提交事物 } }); assertThat(result).isNull(); // 事物提交失败,返回null,否则会返回一个空 List } }); t1.start(); Thread.sleep(100); redisTemplate.opsForValue().set(watchKey, "value2"); // 主线程先修改 watchKey 的值,子线程提交时会观察到 watchKey 的值已经改变,所以子线程事物提交会失败 t1.join(); assertThat(redisTemplate.opsForValue().get(watchKey)).isEqualTo("value2"); }
<T> T execute(SessionCallback<T> session);