public QuorumMaj(Properties props) throws ConfigException { for (Entry<Object, Object> entry : props.entrySet()) { String key = entry.getKey().toString(); String value = entry.getValue().toString(); if (key.startsWith("server.")) { int dot = key.indexOf('.'); long sid = Long.parseLong(key.substring(dot + 1)); QuorumServer qs = new QuorumServer(sid, value); allMembers.put(Long.valueOf(sid), qs); if (qs.type == LearnerType.PARTICIPANT) votingMembers.put(Long.valueOf(sid), qs); else { observingMembers.put(Long.valueOf(sid), qs); } } else if (key.equals("version")) { version = Long.parseLong(value, 16); } } half = votingMembers.size() / 2; }
private void verifyElectionTimeTakenJMXAttribute(List<QuorumPeer> peers) throws Exception { LOG.info("Verify QuorumPeer#electionTimeTaken jmx bean attribute"); for (int i = 1; i <= peers.size(); i++) { QuorumPeer qp = peers.get(i - 1); if (qp.getLearnerType() == LearnerType.OBSERVER) { continue; // Observer don't have electionTimeTaken attribute. } Long electionTimeTaken = -1L; String bean = ""; if (qp.getPeerState() == ServerState.FOLLOWING) { bean = String.format( "%s:name0=ReplicatedServer_id%d,name1=replica.%d,name2=Follower", CommonNames.DOMAIN, i, i); } else if (qp.getPeerState() == ServerState.LEADING) { bean = String.format( "%s:name0=ReplicatedServer_id%d,name1=replica.%d,name2=Leader", CommonNames.DOMAIN, i, i); } electionTimeTaken = (Long) JMXEnv.ensureBeanAttribute(bean, "ElectionTimeTaken"); Assert.assertTrue("Wrong electionTimeTaken value!", electionTimeTaken >= 0); } }
/** * lets the leader know that a follower is capable of following and is done * syncing * * @param handler handler of the follower * @return last proposed zxid */ synchronized public long startForwarding(LearnerHandler handler, long lastSeenZxid) { // Queue up any outstanding requests enabling the receipt of // new requests if (lastProposed > lastSeenZxid) { for (Proposal p : toBeApplied) { if (p.packet.getZxid() <= lastSeenZxid) { continue; } handler.queuePacket(p.packet); // Since the proposal has been committed we need to send the // commit message also QuorumPacket qp = new QuorumPacket(Leader.COMMIT, p.packet .getZxid(), null, null); handler.queuePacket(qp); } // Only participant need to get outstanding proposals if (handler.getLearnerType() == LearnerType.PARTICIPANT) { List<Long>zxids = new ArrayList<Long>(outstandingProposals.keySet()); Collections.sort(zxids); for (Long zxid: zxids) { if (zxid <= lastSeenZxid) { continue; } handler.queuePacket(outstandingProposals.get(zxid).packet); } } } if (handler.getLearnerType() == LearnerType.PARTICIPANT) { addForwardingFollower(handler); } else { addObserverLearnerHandler(handler); } return lastProposed; }
/** * A learning state can be either FOLLOWING or OBSERVING. This method simply * decides which one depending on the role of the server. * * @return ServerState */ private ServerState learningState() { if (self.getLearnerType() == LearnerType.PARTICIPANT) { LOG.debug("I'm a participant: " + self.getId()); return ServerState.FOLLOWING; } else { LOG.debug("I'm an observer: " + self.getId()); return ServerState.OBSERVING; } }
/** * Returns the initial vote value of server identifier. * * @return long */ private long getInitId() { if (self.getLearnerType() == LearnerType.PARTICIPANT) return self.getId(); else return Long.MIN_VALUE; }
/** * Returns initial last logged zxid. * * @return long */ private long getInitLastLoggedZxid() { if (self.getLearnerType() == LearnerType.PARTICIPANT) return self.getLastLoggedZxid(); else return Long.MIN_VALUE; }
/** * Returns the initial vote value of the peer epoch. * * @return long */ private long getPeerEpoch() { if (self.getLearnerType() == LearnerType.PARTICIPANT) try { return self.getCurrentEpoch(); } catch (IOException e) { RuntimeException re = new RuntimeException(e.getMessage()); re.setStackTrace(e.getStackTrace()); throw re; } else return Long.MIN_VALUE; }
/** * Defines a majority to avoid computing it every time. * */ public QuorumMaj(Map<Long, QuorumServer> allMembers) { this.allMembers = allMembers; for (QuorumServer qs : allMembers.values()) { if (qs.type == LearnerType.PARTICIPANT) { votingMembers.put(Long.valueOf(qs.id), qs); } else { observingMembers.put(Long.valueOf(qs.id), qs); } } half = votingMembers.size() / 2; }
/** * lets the leader know that a follower is capable of following and is done * syncing * * @param handler handler of the follower * @return last proposed zxid * @throws InterruptedException */ synchronized public long startForwarding(LearnerHandler handler, long lastSeenZxid) { // Queue up any outstanding requests enabling the receipt of // new requests if (lastProposed > lastSeenZxid) { for (Proposal p : toBeApplied) { if (p.packet.getZxid() <= lastSeenZxid) { continue; } handler.queuePacket(p.packet); // Since the proposal has been committed we need to send the // commit message also QuorumPacket qp = new QuorumPacket(Leader.COMMIT, p.packet .getZxid(), null, null); handler.queuePacket(qp); } // Only participant need to get outstanding proposals if (handler.getLearnerType() == LearnerType.PARTICIPANT) { List<Long>zxids = new ArrayList<Long>(outstandingProposals.keySet()); Collections.sort(zxids); for (Long zxid: zxids) { if (zxid <= lastSeenZxid) { continue; } handler.queuePacket(outstandingProposals.get(zxid).packet); } } } if (handler.getLearnerType() == LearnerType.PARTICIPANT) { addForwardingFollower(handler); } else { addObserverLearnerHandler(handler); } return lastProposed; }
/** * A learning state can be either FOLLOWING or OBSERVING. * This method simply decides which one depending on the * role of the server. * * @return ServerState */ private ServerState learningState(){ if(self.getLearnerType() == LearnerType.PARTICIPANT){ LOG.debug("I'm a participant: " + self.getId()); return ServerState.FOLLOWING; } else{ LOG.debug("I'm an observer: " + self.getId()); return ServerState.OBSERVING; } }
/** * Returns the initial vote value of the peer epoch. * * @return long */ private long getPeerEpoch(){ if(self.getLearnerType() == LearnerType.PARTICIPANT) try { return self.getCurrentEpoch(); } catch(IOException e) { RuntimeException re = new RuntimeException(e.getMessage()); re.setStackTrace(e.getStackTrace()); throw re; } else return Long.MIN_VALUE; }
private void setupPeerType() { // Warn about inconsistent peer type LearnerType roleByServersList = quorumVerifier.getObservingMembers().containsKey(serverId) ? LearnerType.OBSERVER : LearnerType.PARTICIPANT; if (roleByServersList != peerType) { LOG.warn("Peer type from servers list (" + roleByServersList + ") doesn't match peerType (" + peerType + "). Defaulting to servers list."); peerType = roleByServersList; } }
/** * lets the leader know that a follower is capable of following and is done * syncing * * @param handler handler of the follower * @return last proposed zxid */ synchronized public long startForwarding(LearnerHandler handler, long lastSeenZxid) { // Queue up any outstanding requests enabling the receipt of // new requests if (lastProposed > lastSeenZxid) { for (Proposal p : toBeApplied) { if (p.packet.getZxid() <= lastSeenZxid) { continue; } handler.queuePacket(p.packet); // Since the proposal has been committed we need to send the // commit message also QuorumPacket qp = new QuorumPacket(Leader.COMMIT, p.packet .getZxid(), null, null); handler.queuePacket(qp); } List<Long>zxids = new ArrayList<Long>(outstandingProposals.keySet()); Collections.sort(zxids); for (Long zxid: zxids) { if (zxid <= lastSeenZxid) { continue; } handler.queuePacket(outstandingProposals.get(zxid).packet); } } if (handler.getLearnerType() == LearnerType.PARTICIPANT) { addForwardingFollower(handler); } else { addObserverLearnerHandler(handler); } return lastProposed; }
/** * lets the leader know that a follower is capable of following and is done * syncing * * @param handler handler of the follower * @return last proposed zxid */ synchronized public long startForwarding(LearnerHandler handler, long lastSeenZxid) { // Queue up any outstanding requests enabling the receipt of // new requests if (lastProposed > lastSeenZxid) { for (Proposal p : toBeApplied) { if (p.packet.getZxid() <= lastSeenZxid) { continue; } handler.queuePacket(p.packet); // Since the proposal has been committed we need to send the // commit message also QuorumPacket qp = new QuorumPacket(Leader.COMMIT, p.packet .getZxid(), null, null); handler.queuePacket(qp); } List<Long>zxids = new ArrayList<Long>(outstandingProposals.keySet()); Collections.sort(zxids); for (Long zxid: zxids) { if (zxid <= lastSeenZxid) { continue; } handler.queuePacket(outstandingProposals.get(zxid).packet); } } if (handler.getLearnerType() == LearnerType.PARTICIPANT) { synchronized (forwardingFollowers) { forwardingFollowers.add(handler); } } else { synchronized (observingLearners) { observingLearners.add(handler); } } return lastProposed; }