private static void processError(final RedisInputStream is) { String message = is.readLine(); // TODO: I'm not sure if this is the best way to do this. // Maybe Read only first 5 bytes instead? if (message.startsWith(MOVED_RESPONSE)) { String[] movedInfo = parseTargetHostAndSlot(message); throw new JedisMovedDataException(message, new HostAndPort(movedInfo[1], Integer.valueOf(movedInfo[2])), Integer.valueOf(movedInfo[0])); } else if (message.startsWith(ASK_RESPONSE)) { String[] askInfo = parseTargetHostAndSlot(message); throw new JedisAskDataException(message, new HostAndPort(askInfo[1], Integer.valueOf(askInfo[2])), Integer.valueOf(askInfo[0])); } else if (message.startsWith(CLUSTERDOWN_RESPONSE)) { throw new JedisClusterException(message); } else if (message.startsWith(BUSY_RESPONSE)) { throw new JedisBusyException(message); } else if (message.startsWith(NOSCRIPT_RESPONSE)) { throw new JedisNoScriptException(message); } throw new JedisDataException(message); }
@Test public void testReadonly() throws Exception { node1.clusterMeet(localHost, nodeInfoSlave2.getPort()); JedisClusterTestUtil.waitForClusterReady(node1, node2, node3, nodeSlave2); for (String nodeInfo : node2.clusterNodes().split("\n")) { if (nodeInfo.contains("myself")) { nodeSlave2.clusterReplicate(nodeInfo.split(" ")[0]); break; } } try { nodeSlave2.get("test"); fail(); } catch (JedisMovedDataException e) { } nodeSlave2.readonly(); nodeSlave2.get("test"); nodeSlave2.clusterReset(Reset.SOFT); nodeSlave2.flushDB(); }
private static void processError(final RedisInputStream is) { String message = is.readLine(); // TODO: I'm not sure if this is the best way to do this. // Maybe Read only first 5 bytes instead? if (message.startsWith(MOVED_RESPONSE)) { String[] movedInfo = parseTargetHostAndSlot(message); throw new JedisMovedDataException(message, new HostAndPort(movedInfo[1], Integer.valueOf(movedInfo[2])), Integer.valueOf(movedInfo[0])); } else if (message.startsWith(ASK_RESPONSE)) { String[] askInfo = parseTargetHostAndSlot(message); throw new JedisAskDataException(message, new HostAndPort(askInfo[1], Integer.valueOf(askInfo[2])), Integer.valueOf(askInfo[0])); } else if (message.startsWith(CLUSTERDOWN_RESPONSE)) { throw new JedisClusterException(message); } throw new JedisDataException(message); }
protected boolean checkException(Object obj) { if (obj instanceof Exception) { Exception e = (Exception) obj; if (e instanceof JedisRedirectionException) { //重定向slot 映射. if (e instanceof JedisMovedDataException) { // it rebuilds cluster's slot cache // recommended by Redis cluster specification this.connectionHandler.renewSlotCache(); logger.warn("JedisMovedDataException:" + e.getMessage(), e); } else { logger.error("pipeline-error:" + e.getMessage(), e); } } else { logger.error(e.getMessage(), e); } return true; } return false; }
/** * 处理Redis集群重定向"请求错误"响应。 * * @param is */ private static void processError(RedisInputStream is) { String message = is.readLine(); // TODO: I'm not sure if this is the best way to do this. // Maybe Read only first 5 bytes instead? if (message.startsWith(MOVED_RESPONSE)) { String[] movedInfo = parseTargetHostAndSlot(message); throw new JedisMovedDataException(message, new HostAndPort( movedInfo[1], Integer.valueOf(movedInfo[2])), Integer.valueOf(movedInfo[0])); } else if (message.startsWith(ASK_RESPONSE)) { String[] askInfo = parseTargetHostAndSlot(message); throw new JedisAskDataException(message, new HostAndPort( askInfo[1], Integer.valueOf(askInfo[2])), Integer.valueOf(askInfo[0])); } else if (message.startsWith(CLUSTERDOWN_RESPONSE)) { throw new JedisClusterException(message); } throw new JedisDataException(message); }
@Test public void testMovedExceptionParameters() { try { node1.set("foo", "bar"); } catch (JedisMovedDataException jme) { assertEquals(12182, jme.getSlot()); assertEquals(new HostAndPort("127.0.0.1", 7381), jme.getTargetNode()); return; } fail(); }
private void innerSync(List<Object> formatted) { HashSet<Client> clientSet = new HashSet<Client>(); try { for (Client client : clients) { // 在sync()调用时其实是不需要解析结果数据的,但是如果不调用get方法,发生了JedisMovedDataException这样的错误应用是不知道的,因此需要调用get()来触发错误。 // 其实如果Response的data属性可以直接获取,可以省掉解析数据的时间,然而它并没有提供对应方法,要获取data属性就得用反射,不想再反射了,所以就这样了 Object data = generateResponse(client.getOne()).get(); if (null != formatted) { formatted.add(data); } // size相同说明所有的client都已经添加,就不用再调用add方法了 if (clientSet.size() != jedisMap.size()) { clientSet.add(client); } } } catch (JedisRedirectionException jre) { if (jre instanceof JedisMovedDataException) { // if MOVED redirection occurred, rebuilds cluster's slot cache, // recommended by Redis cluster specification refreshCluster(); } throw jre; } finally { if (clientSet.size() != jedisMap.size()) { // 所有还没有执行过的client要保证执行(flush),防止放回连接池后后面的命令被污染 for (Jedis jedis : jedisMap.values()) { if (clientSet.contains(jedis.getClient())) { continue; } flushCachedData(jedis); } } hasDataInBuf = false; close(); } }
private T runWithRetries(byte[] key, int attempts, boolean tryRandomNode, boolean asking) { if (attempts <= 0) { throw new JedisClusterMaxRedirectionsException("Too many Cluster redirections?"); } Jedis connection = null; try { if (asking) { // TODO: Pipeline asking with the original command to make it // faster.... connection = askConnection.get(); connection.asking(); // if asking success, reset asking flag asking = false; } else { if (tryRandomNode) { connection = connectionHandler.getConnection(); } else { connection = connectionHandler.getConnectionFromSlot(JedisClusterCRC16.getSlot(key)); } } return execute(connection); } catch (JedisNoReachableClusterNodeException jnrcne) { throw jnrcne; } catch (JedisConnectionException jce) { // release current connection before recursion releaseConnection(connection); connection = null; if (attempts <= 1) { // We need this because if node is not reachable anymore - we need to finally // initiate slots renewing, // or we can stuck with cluster state without one node in opposite case. // But now if maxAttempts = 1 or 2 we will do it too often. For each time-outed // request. // TODO make tracking of successful/unsuccessful operations for node - do // renewing only // if there were no successful responses from this node last few seconds this.connectionHandler.renewSlotCache(); // no more redirections left, throw original exception, not // JedisClusterMaxRedirectionsException, because it's not MOVED situation throw jce; } return runWithRetries(key, attempts - 1, tryRandomNode, asking); } catch (JedisRedirectionException jre) { // if MOVED redirection occurred, if (jre instanceof JedisMovedDataException) { // it rebuilds cluster's slot cache // recommended by Redis cluster specification this.connectionHandler.renewSlotCache(connection); } // release current connection before recursion or renewing releaseConnection(connection); connection = null; if (jre instanceof JedisAskDataException) { asking = true; askConnection.set(this.connectionHandler.getConnectionFromNode(jre.getTargetNode())); } else if (jre instanceof JedisMovedDataException) { } else { throw new JedisClusterException(jre); } return runWithRetries(key, attempts - 1, false, asking); } finally { releaseConnection(connection); } }
@Test(expected = JedisMovedDataException.class) public void testThrowMovedException() { node1.set("foo", "bar"); }
private T runWithRetries(String key, int redirections, boolean tryRandomNode, boolean asking) { if (redirections <= 0) { throw new JedisClusterMaxRedirectionsException("Too many Cluster redirections?"); } Jedis connection = null; try { if (asking) { // TODO: Pipeline asking with the original command to make it // faster.... connection = askConnection.get(); connection.asking(); // if asking success, reset asking flag asking = false; } else { if (tryRandomNode) { connection = connectionHandler.getConnection(); } else { connection = connectionHandler.getConnectionFromSlot(JedisClusterCRC16.getSlot(key)); } } return execute(connection); } catch (JedisConnectionException jce) { if (tryRandomNode) { // maybe all connection is down throw jce; } // release current connection before recursion releaseConnection(connection); connection = null; // retry with random connection return runWithRetries(key, redirections - 1, true, asking); } catch (JedisRedirectionException jre) { // release current connection before recursion or renewing releaseConnection(connection); connection = null; if (jre instanceof JedisAskDataException) { asking = true; askConnection.set(this.connectionHandler.getConnectionFromNode(jre.getTargetNode())); } else if (jre instanceof JedisMovedDataException) { // it rebuilds cluster's slot cache // recommended by Redis cluster specification this.connectionHandler.renewSlotCache(); } else { throw new JedisClusterException(jre); } return runWithRetries(key, redirections - 1, false, asking); } finally { releaseConnection(connection); } }
private T runWithRetries(byte[] key, int redirections, boolean tryRandomNode, boolean asking) { if (redirections <= 0) { JedisClusterMaxRedirectionsException exception = new JedisClusterMaxRedirectionsException("Too many Cluster redirections? key=" + SafeEncoder.encode(key)); //收集 UsefulDataCollector.collectException(exception, "", System.currentTimeMillis(), ClientExceptionType.REDIS_CLUSTER); throw exception; } Jedis connection = null; try { if (asking) { // TODO: Pipeline asking with the original command to make it // faster.... connection = askConnection.get(); connection.asking(); // if asking success, reset asking flag asking = false; } else { if (tryRandomNode) { connection = connectionHandler.getConnection(); } else { connection = connectionHandler.getConnectionFromSlot(JedisClusterCRC16.getSlot(key)); } } return execute(connection); } catch (JedisConnectionException jce) { if (tryRandomNode) { // maybe all connection is down throw jce; } // release current connection before recursion releaseConnection(connection); connection = null; // retry with random connection return runWithRetries(key, redirections - 1, true, asking); } catch (JedisRedirectionException jre) { // if MOVED redirection occurred, if (jre instanceof JedisMovedDataException) { // it rebuilds cluster's slot cache // recommended by Redis cluster specification this.connectionHandler.renewSlotCache(connection); } // release current connection before recursion or renewing releaseConnection(connection); connection = null; if (jre instanceof JedisAskDataException) { asking = true; askConnection.set(this.connectionHandler.getConnectionFromNode(jre.getTargetNode())); } else if (jre instanceof JedisMovedDataException) { } else { throw new JedisClusterException(jre); } return runWithRetries(key, redirections - 1, false, asking); } finally { releaseConnection(connection); } }
private T runWithRetries(byte[] key, int redirections, boolean tryRandomNode, boolean asking) { if (redirections <= 0) { throw new JedisClusterMaxRedirectionsException("Too many Cluster redirections?"); } Jedis connection = null; try { if (asking) { // TODO: Pipeline asking with the original command to make it // faster.... connection = askConnection.get(); connection.asking(); // if asking success, reset asking flag asking = false; } else { if (tryRandomNode) { connection = connectionHandler.getConnection(); } else { connection = connectionHandler.getConnectionFromSlot(JedisClusterCRC16.getSlot(key)); } } return execute(connection); } catch (JedisConnectionException jce) { if (tryRandomNode) { // maybe all connection is down throw jce; } // release current connection before recursion releaseConnection(connection); connection = null; // retry with random connection return runWithRetries(key, redirections - 1, true, asking); } catch (JedisRedirectionException jre) { // if MOVED redirection occurred, if (jre instanceof JedisMovedDataException) { // it rebuilds cluster's slot cache // recommended by Redis cluster specification this.connectionHandler.renewSlotCache(connection); } // release current connection before recursion or renewing releaseConnection(connection); connection = null; if (jre instanceof JedisAskDataException) { asking = true; askConnection.set(this.connectionHandler.getConnectionFromNode(jre.getTargetNode())); } else if (jre instanceof JedisMovedDataException) { } else { throw new JedisClusterException(jre); } return runWithRetries(key, redirections - 1, false, asking); } finally { releaseConnection(connection); } }