@Test public void checkResourceIsCloseable() { GenericObjectPoolConfig config = new GenericObjectPoolConfig(); config.setMaxTotal(1); config.setBlockWhenExhausted(false); JedisSentinelPool pool = new JedisSentinelPool(MASTER_NAME, sentinels, config, 1000, "foobared", 2); Jedis jedis = pool.getResource(); try { jedis.set("hello", "jedis"); } finally { jedis.close(); } Jedis jedis2 = pool.getResource(); try { assertEquals(jedis, jedis2); } finally { jedis2.close(); } }
@Test public void customClientName() { GenericObjectPoolConfig config = new GenericObjectPoolConfig(); config.setMaxTotal(1); config.setBlockWhenExhausted(false); JedisSentinelPool pool = new JedisSentinelPool(MASTER_NAME, sentinels, config, 1000, "foobared", 0, "my_shiny_client_name"); Jedis jedis = pool.getResource(); try { assertEquals("my_shiny_client_name", jedis.clientGetname()); } finally { jedis.close(); pool.destroy(); } assertTrue(pool.isClosed()); }
private void forceFailover(JedisSentinelPool pool) throws InterruptedException { HostAndPort oldMaster = pool.getCurrentHostMaster(); // jedis connection should be master Jedis beforeFailoverJedis = pool.getResource(); assertEquals("PONG", beforeFailoverJedis.ping()); waitForFailover(pool, oldMaster); Jedis afterFailoverJedis = pool.getResource(); assertEquals("PONG", afterFailoverJedis.ping()); assertEquals("foobared", afterFailoverJedis.configGet("requirepass").get(1)); assertEquals(2, afterFailoverJedis.getDB().intValue()); // returning both connections to the pool should not throw beforeFailoverJedis.close(); afterFailoverJedis.close(); }
private void initializeDatabaseConnection() throws LifecycleException { try { if (getSentinelMaster() != null) { Set<String> sentinelSet = getSentinelSet(); if (sentinelSet != null && sentinelSet.size() > 0) { connectionPool = new JedisSentinelPool(getSentinelMaster(), sentinelSet, this.connectionPoolConfig, getTimeout(), getPassword()); } else { throw new LifecycleException("Error configuring Redis Sentinel connection pool: expected both `sentinelMaster` and `sentiels` to be configured"); } } else { connectionPool = new JedisPool(this.connectionPoolConfig, getHost(), getPort(), getTimeout(), getPassword()); } } catch (Exception e) { e.printStackTrace(); throw new LifecycleException("Error connecting to Redis", e); } }
@Test public void testSentinel() { JedisSentinelPool sentinelPool = ClientBuilder.redisSentinel(appId) .setConnectionTimeout(2000) .setSoTimeout(1000) .build(); HostAndPort currentHostMaster = sentinelPool.getCurrentHostMaster(); logger.info("current master: {}", currentHostMaster.toString()); Jedis jedis = sentinelPool.getResource(); for (int i = 0; i < 10; i++) { jedis.lpush("mylist", "list-" + i); } jedis.close(); sentinelPool.destroy(); }
private void forceFailover(JedisSentinelPool pool) throws InterruptedException { HostAndPort oldMaster = pool.getCurrentHostMaster(); // jedis connection should be master Jedis beforeFailoverJedis = pool.getResource(); assertEquals("PONG", beforeFailoverJedis.ping()); waitForFailover(pool, oldMaster); Jedis afterFailoverJedis = pool.getResource(); assertEquals("PONG", afterFailoverJedis.ping()); assertEquals("foobared", afterFailoverJedis.configGet("requirepass").get(1)); assertEquals(2, afterFailoverJedis.getDB()); // returning both connections to the pool should not throw beforeFailoverJedis.close(); afterFailoverJedis.close(); }
/** * Constructor. * * @param redisConfiguration configuration */ public ManageableJedisConnection(final RedisConfiguration redisConfiguration) { this.nRetry = redisConfiguration.getRetry(); this.waitBeforeRetryMs = redisConfiguration.getWaitBeforeRetryMs(); switch (redisConfiguration.getType()) { case REDIS: LOGGER.info("Creates Simple Redis connection pool"); this.pool = new JedisPool(new JedisPoolConfig(), redisConfiguration.getHost(), redisConfiguration.getPort(), redisConfiguration.getTimeout()); break; case SENTINEL: LOGGER.info("Creates Redis Sentinel connection pool"); this.pool = new JedisSentinelPool(redisConfiguration.getMasterName(), redisConfiguration.getSentinels(), new JedisPoolConfig(), redisConfiguration.getTimeout()); break; default: { throw new IllegalArgumentException("Unexpected jedis pool type provided in configuration file"); } } }
/** * Creates a new RedisTokenRepository using Redis sentinels. * @param config The Redis configuration to use */ public RedisTokenRepository(final RedisTokenRepositorySentinelConfig config) { final Set<String> sentinels = new HashSet<>(); for (final RedisTokenRepositorySentinelConfig.HostAndPort hostAndPort : config.getHostsAndPorts()) { sentinels.add(hostAndPort.getHost() + ':' + hostAndPort.getPort()); } this.jedisPool = new JedisSentinelPool( config.getMasterName(), sentinels, new JedisPoolConfig(), config.getTimeout(), config.getPassword(), config.getDatabase() ); }
/** * 初始化DB连接 */ protected void initializeDatabaseConnection() throws LifecycleException { try { if (getSentinelMaster() != null) { Set<String> sentinelSet = getSentinelSet(); if (sentinelSet != null && sentinelSet.size() > 0) { connectionPool = new JedisSentinelPool(getSentinelMaster(), sentinelSet, this.connectionPoolConfig, getTimeout(), getPassword()); } else { throw new LifecycleException( "Error configuring Redis Sentinel connection pool: expected both `sentinelMaster` and `sentiels` to be configured"); } } else { connectionPool = new JedisPool(this.connectionPoolConfig, getHost(), getPort(), getTimeout(), getPassword()); } } catch (Exception e) { e.printStackTrace(); throw new LifecycleException("Error connecting to Redis", e); } }
@Override protected Pool<Jedis> getPool(JedisPoolConfig jedisPoolConfig, String serverAddress) { String[] hosts = serverAddress.split(";"); String host0 = hosts[0]; String[] masterNameAndHost0 = host0.split("@"); String masterName; if (masterNameAndHost0.length < 2) { masterName = DEFAULT_MASTER_NAME; } else { masterName = masterNameAndHost0[0]; hosts[0] = masterNameAndHost0[1]; } Set<String> sentinels = new HashSet<String>(Arrays.asList(hosts)); JedisSentinelPool jedisSentinelPool = new JedisSentinelPool(masterName, sentinels, jedisPoolConfig, timeout); return jedisSentinelPool; }
@Test(expected = JedisConnectionException.class) public void initializeWithNotAvailableSentinelsShouldThrowException() { Set<String> wrongSentinels = new HashSet<String>(); wrongSentinels.add(new HostAndPort("localhost", 65432).toString()); wrongSentinels.add(new HostAndPort("localhost", 65431).toString()); JedisSentinelPool pool = new JedisSentinelPool(MASTER_NAME, wrongSentinels); pool.destroy(); }
@Test public void checkCloseableConnections() throws Exception { GenericObjectPoolConfig config = new GenericObjectPoolConfig(); JedisSentinelPool pool = new JedisSentinelPool(MASTER_NAME, sentinels, config, 1000, "foobared", 2); Jedis jedis = pool.getResource(); jedis.auth("foobared"); jedis.set("foo", "bar"); assertEquals("bar", jedis.get("foo")); pool.returnResource(jedis); pool.close(); assertTrue(pool.isClosed()); }
@Test public void ensureSafeTwiceFailover() throws InterruptedException { JedisSentinelPool pool = new JedisSentinelPool(MASTER_NAME, sentinels, new GenericObjectPoolConfig(), 2000, "foobared", 2); forceFailover(pool); // after failover sentinel needs a bit of time to stabilize before a new // failover Thread.sleep(100); forceFailover(pool); // you can test failover as much as possible }
@Test public void returningBorrowedInstanceBeforeFailoverShouldNotAffectBorrowing() throws InterruptedException { final JedisSentinelPool pool = new JedisSentinelPool(MASTER_NAME, sentinels, new GenericObjectPoolConfig(), 2000, "foobared", 2); Jedis borrowed = pool.getResource(); forceFailover(pool); Thread.sleep(1000); // returns instance which was borrowed before failover borrowed.close(); final AtomicBoolean isBorrowed = new AtomicBoolean(false); Thread t = new Thread(new Runnable() { @Override public void run() { pool.getResource(); isBorrowed.set(true); } }); t.start(); // wait for 5 secs t.join(5000); assertTrue(isBorrowed.get()); }
@Test public void returnResourceShouldResetState() { GenericObjectPoolConfig config = new GenericObjectPoolConfig(); config.setMaxTotal(1); config.setBlockWhenExhausted(false); JedisSentinelPool pool = new JedisSentinelPool(MASTER_NAME, sentinels, config, 1000, "foobared", 2); Jedis jedis = pool.getResource(); Jedis jedis2 = null; try { jedis.set("hello", "jedis"); Transaction t = jedis.multi(); t.set("hello", "world"); pool.returnResource(jedis); jedis2 = pool.getResource(); assertTrue(jedis == jedis2); assertEquals("jedis", jedis2.get("hello")); } catch (JedisConnectionException e) { if (jedis2 != null) { pool.returnBrokenResource(jedis2); jedis2 = null; } } finally { if (jedis2 != null) jedis2.close(); pool.destroy(); } }
private void waitForFailover(JedisSentinelPool pool, HostAndPort oldMaster) throws InterruptedException { HostAndPort newMaster = JedisSentinelTestUtil.waitForNewPromotedMaster(MASTER_NAME, sentinelJedis1, sentinelJedis2); waitForJedisSentinelPoolRecognizeNewMaster(pool, newMaster); }
@Override public JedisSentinelPool initPool() throws Exception { if (Check.isNullOrEmpty(getServers())) { throw new IllegalArgumentException("未指定redis服务器地址"); } String[] hosts = getServers().split("\\|"); Set<String> hostAndPorts = new HashSet<>(); for (String host : hosts) { hostAndPorts.add(host); } JedisPoolConfig poolConfig = super.getPoolConfig(); return new JedisSentinelPool(getMasterName(), hostAndPorts, poolConfig); }
@Test public void testSentinel() throws Exception { GenericObjectPoolConfig pc = new GenericObjectPoolConfig(); pc.setMinIdle(2); pc.setMaxIdle(10); pc.setMaxTotal(10); Set<String> sentinels = new HashSet<>(); sentinels.add("127.0.0.1:26379"); sentinels.add("127.0.0.1:26380"); sentinels.add("127.0.0.1:26381"); JedisSentinelPool pool = new JedisSentinelPool("mymaster", sentinels, pc); testWithPool(pool); }
/** * Builds container for Redis Sentinel environment. * * @param jedisSentinelConfig configuration for JedisSentinel * @return container for Redis sentinel environment * @throws NullPointerException if jedisSentinelConfig is null */ public static RedisCommandsContainer build(FlinkJedisSentinelConfig jedisSentinelConfig) { Objects.requireNonNull(jedisSentinelConfig, "Redis sentinel config should not be Null"); GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig(); genericObjectPoolConfig.setMaxIdle(jedisSentinelConfig.getMaxIdle()); genericObjectPoolConfig.setMaxTotal(jedisSentinelConfig.getMaxTotal()); genericObjectPoolConfig.setMinIdle(jedisSentinelConfig.getMinIdle()); JedisSentinelPool jedisSentinelPool = new JedisSentinelPool(jedisSentinelConfig.getMasterName(), jedisSentinelConfig.getSentinels(), genericObjectPoolConfig, jedisSentinelConfig.getConnectionTimeout(), jedisSentinelConfig.getSoTimeout(), jedisSentinelConfig.getPassword(), jedisSentinelConfig.getDatabase()); return new RedisContainer(jedisSentinelPool); }
@Before public void setUp() { Set<String> hosts = JedisUtil.sentinelHosts(cluster); jedisSentinelConfig = new FlinkJedisSentinelConfig.Builder().setMasterName(REDIS_MASTER) .setSentinels(hosts).build(); jedisSentinelPool = new JedisSentinelPool(jedisSentinelConfig.getMasterName(), jedisSentinelConfig.getSentinels()); }
@Test public void testSentinelExample() { JedisSentinelPool sentinelPool = null; // 使用默认配置 // sentinelPool = ClientBuilder.redisSentinel(appId).build(); /** * 自定义配置 */ GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig(); poolConfig.setMaxIdle(GenericObjectPoolConfig.DEFAULT_MAX_IDLE * 3); poolConfig.setMinIdle(GenericObjectPoolConfig.DEFAULT_MIN_IDLE * 2); poolConfig.setJmxEnabled(true); poolConfig.setMaxWaitMillis(3000); sentinelPool = ClientBuilder.redisSentinel(appId) .setPoolConfig(poolConfig) .setConnectionTimeout(2000) .setSoTimeout(1000) .build(); Jedis jedis = sentinelPool.getResource(); jedis.set("key1", "1"); assertEquals("2", jedis.incr("key1")); jedis.close(); }
@Test public void checkCloseableConnections() throws Exception { GenericObjectPoolConfig config = new GenericObjectPoolConfig(); JedisSentinelPool pool = new JedisSentinelPool(MASTER_NAME, sentinels, config, 1000, "foobared", 2); Jedis jedis = pool.getResource(); jedis.auth("foobared"); jedis.set("foo", "bar"); assertEquals("bar", jedis.get("foo")); jedis.close(); pool.close(); assertTrue(pool.isClosed()); }
@Test public void ensureSafeTwiceFailover() throws InterruptedException { JedisSentinelPool pool = new JedisSentinelPool(MASTER_NAME, sentinels, new GenericObjectPoolConfig(), 1000, "foobared", 2); forceFailover(pool); // after failover sentinel needs a bit of time to stabilize before a new // failover Thread.sleep(100); forceFailover(pool); // you can test failover as much as possible }
@Test public void returnResourceShouldResetState() { GenericObjectPoolConfig config = new GenericObjectPoolConfig(); config.setMaxTotal(1); config.setBlockWhenExhausted(false); JedisSentinelPool pool = new JedisSentinelPool(MASTER_NAME, sentinels, config, 1000, "foobared", 2); Jedis jedis = pool.getResource(); Jedis jedis2 = null; try { jedis.set("hello", "jedis"); Transaction t = jedis.multi(); t.set("hello", "world"); jedis.close(); jedis2 = pool.getResource(); assertTrue(jedis == jedis2); assertEquals("jedis", jedis2.get("hello")); } catch (JedisConnectionException e) { if (jedis2 != null) { jedis2 = null; } } finally { jedis2.close(); pool.destroy(); } }
@Override public boolean addSentinel(long appId, String sentinelHost) { AppDesc appDesc = appDao.getAppDescById(appId); JedisSentinelPool jedisSentinelPool = redisCenter.getJedisSentinelPool(appDesc); if (jedisSentinelPool == null) { return false; } List<InstanceInfo> instanceInfos = instanceDao.getInstListByAppId(appId); String masterName = null; for (Iterator<InstanceInfo> i = instanceInfos.iterator(); i.hasNext(); ) { InstanceInfo instanceInfo = i.next(); if (instanceInfo.getType() != ConstUtils.CACHE_REDIS_SENTINEL) { i.remove(); continue; } if (masterName == null && StringUtils.isNotBlank(instanceInfo.getCmd())) { masterName = instanceInfo.getCmd(); } } Jedis jedis = null; String masterHost = null; Integer masterPort = null; try { jedis = jedisSentinelPool.getResource(); masterHost = jedis.getClient().getHost(); masterPort = jedis.getClient().getPort(); } catch (Exception e) { logger.error(e.getMessage(), e); } finally { jedis.close(); jedisSentinelPool.destroy(); } boolean isRun = runSentinel(appDesc, sentinelHost, masterName, masterHost, masterPort); if (!isRun) { return false; } return true; }
/** * Builds container for Redis Sentinel environment. * * @param jedisSentinelConfig configuration for JedisSentinel * @return container for Redis sentinel environment * @throws NullPointerException if jedisSentinelConfig is null */ public static RedisCommandsContainer build(FlinkJedisSentinelConfig jedisSentinelConfig) { Preconditions.checkNotNull(jedisSentinelConfig, "Redis sentinel config should not be Null"); GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig(); genericObjectPoolConfig.setMaxIdle(jedisSentinelConfig.getMaxIdle()); genericObjectPoolConfig.setMaxTotal(jedisSentinelConfig.getMaxTotal()); genericObjectPoolConfig.setMinIdle(jedisSentinelConfig.getMinIdle()); JedisSentinelPool jedisSentinelPool = new JedisSentinelPool(jedisSentinelConfig.getMasterName(), jedisSentinelConfig.getSentinels(), genericObjectPoolConfig, jedisSentinelConfig.getConnectionTimeout(), jedisSentinelConfig.getSoTimeout(), jedisSentinelConfig.getPassword(), jedisSentinelConfig.getDatabase()); return new RedisContainer(jedisSentinelPool); }