protected void handleMyToken(MemberImpl local, CoordinationMessage msg, Member sender,Membership merged) throws ChannelException { if ( local.equals(msg.getLeader()) ) { //no leadership change if ( Arrays.sameMembers(msg.getMembers(),merged.getMembers()) ) { msg.type = COORD_CONF; super.sendMessage(Arrays.remove(msg.getMembers(),local),createData(msg,local),null); handleViewConf(msg,local,merged); } else { //membership change suggestedView = new Membership(local,AbsoluteOrder.comp,true); suggestedviewId = msg.getId(); Arrays.fill(suggestedView,merged.getMembers()); msg.view = merged.getMembers(); sendElectionMsgToNextInline(local,msg); } } else { //leadership change suggestedView = null; suggestedviewId = null; msg.view = merged.getMembers(); sendElectionMsgToNextInline(local,msg); } }
protected void handleViewConf(CoordinationMessage msg, Member sender,Membership merged) throws ChannelException { if ( viewId != null && msg.getId().equals(viewId) ) return;//we already have this view view = new Membership((MemberImpl)getLocalMember(false),AbsoluteOrder.comp,true); Arrays.fill(view,msg.getMembers()); viewId = msg.getId(); if ( viewId.equals(suggestedviewId) ) { suggestedView = null; suggestedviewId = null; } if (suggestedView != null && AbsoluteOrder.comp.compare(suggestedView.getMembers()[0],merged.getMembers()[0])<0 ) { suggestedView = null; suggestedviewId = null; } viewChange(viewId,view.getMembers()); fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_CONF_RX,this,"Accepted View")); if ( suggestedviewId == null && hasHigherPriority(merged.getMembers(),membership.getMembers()) ) { startElection(false); } }
protected Membership mergeOnArrive(CoordinationMessage msg, Member sender) { fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_PRE_MERGE, this, "Pre merge")); MemberImpl local = (MemberImpl) getLocalMember(false); Membership merged = new Membership(local, AbsoluteOrder.comp, true); Arrays.fill(merged, msg.getMembers()); Arrays.fill(merged, getMembers()); Member[] diff = Arrays.diff(merged, membership, local); for (int i = 0; i < diff.length; i++) { if (!alive(diff[i])) merged.removeMember((MemberImpl) diff[i]); else memberAdded(diff[i], false); } fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_POST_MERGE, this, "Post merge")); return merged; }
protected void handleMyToken(MemberImpl local, CoordinationMessage msg, Member sender, Membership merged) throws ChannelException { if (local.equals(msg.getLeader())) { // no leadership change if (Arrays.sameMembers(msg.getMembers(), merged.getMembers())) { msg.type = COORD_CONF; super.sendMessage(Arrays.remove(msg.getMembers(), local), createData(msg, local), null); handleViewConf(msg, local, merged); } else { // membership change suggestedView = new Membership(local, AbsoluteOrder.comp, true); suggestedviewId = msg.getId(); Arrays.fill(suggestedView, merged.getMembers()); msg.view = merged.getMembers(); sendElectionMsgToNextInline(local, msg); } } else { // leadership change suggestedView = null; suggestedviewId = null; msg.view = merged.getMembers(); sendElectionMsgToNextInline(local, msg); } }
protected void handleViewConf(CoordinationMessage msg, Member sender, Membership merged) throws ChannelException { if (viewId != null && msg.getId().equals(viewId)) return;// we already have this view view = new Membership((MemberImpl) getLocalMember(false), AbsoluteOrder.comp, true); Arrays.fill(view, msg.getMembers()); viewId = msg.getId(); if (viewId.equals(suggestedviewId)) { suggestedView = null; suggestedviewId = null; } if (suggestedView != null && AbsoluteOrder.comp.compare(suggestedView.getMembers()[0], merged.getMembers()[0]) < 0) { suggestedView = null; suggestedviewId = null; } viewChange(viewId, view.getMembers()); fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_CONF_RX, this, "Accepted View")); if (suggestedviewId == null && hasHigherPriority(merged.getMembers(), membership.getMembers())) { startElection(false); } }
private CoordinationMessage createElectionMsg(MemberImpl local, MemberImpl[] others, MemberImpl leader) { Membership m = new Membership(local,AbsoluteOrder.comp,true); Arrays.fill(m,others); MemberImpl[] mbrs = m.getMembers(); m.reset(); CoordinationMessage msg = new CoordinationMessage(leader, local, mbrs,new UniqueId(UUIDGenerator.randomUUID(true)), COORD_REQUEST); return msg; }
protected Membership mergeOnArrive(CoordinationMessage msg, Member sender) { fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_PRE_MERGE,this,"Pre merge")); MemberImpl local = (MemberImpl)getLocalMember(false); Membership merged = new Membership(local,AbsoluteOrder.comp,true); Arrays.fill(merged,msg.getMembers()); Arrays.fill(merged,getMembers()); Member[] diff = Arrays.diff(merged,membership,local); for ( int i=0; i<diff.length; i++ ) { if (!alive(diff[i])) merged.removeMember((MemberImpl)diff[i]); else memberAdded(diff[i],false); } fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_POST_MERGE,this,"Post merge")); return merged; }
protected void processCoordMessage(CoordinationMessage msg, Member sender) throws ChannelException { if ( !coordMsgReceived.get() ) { coordMsgReceived.set(true); synchronized (electionMutex) { electionMutex.notifyAll();} } msg.timestamp = System.currentTimeMillis(); Membership merged = mergeOnArrive(msg, sender); if (isViewConf(msg)) handleViewConf(msg, sender, merged); else handleToken(msg, sender, merged); }
protected void handleToken(CoordinationMessage msg, Member sender,Membership merged) throws ChannelException { MemberImpl local = (MemberImpl)getLocalMember(false); if ( local.equals(msg.getSource()) ) { //my message msg.src=local handleMyToken(local, msg, sender,merged); } else { handleOtherToken(local, msg, sender,merged); } }
protected void handleOtherToken(MemberImpl local, CoordinationMessage msg, Member sender,Membership merged) throws ChannelException { if ( local.equals(msg.getLeader()) ) { //I am the new leader //startElection(false); } else { msg.view = merged.getMembers(); sendElectionMsgToNextInline(local,msg); } }
public static Member[] diff(Membership complete, Membership local, MemberImpl ignore) { ArrayList<Member> result = new ArrayList<Member>(); MemberImpl[] comp = complete.getMembers(); for ( int i=0; i<comp.length; i++ ) { if ( ignore!=null && ignore.equals(comp[i]) ) continue; if ( local.getMember(comp[i]) == null ) result.add(comp[i]); } return result.toArray(new MemberImpl[result.size()]); }
private CoordinationMessage createElectionMsg(MemberImpl local, MemberImpl[] others, MemberImpl leader) { Membership m = new Membership(local, AbsoluteOrder.comp, true); Arrays.fill(m, others); MemberImpl[] mbrs = m.getMembers(); m.reset(); CoordinationMessage msg = new CoordinationMessage(leader, local, mbrs, new UniqueId(UUIDGenerator.randomUUID(true)), COORD_REQUEST); return msg; }
protected void processCoordMessage(CoordinationMessage msg, Member sender) throws ChannelException { if (!coordMsgReceived.get()) { coordMsgReceived.set(true); synchronized (electionMutex) { electionMutex.notifyAll(); } } msg.timestamp = System.currentTimeMillis(); Membership merged = mergeOnArrive(msg, sender); if (isViewConf(msg)) handleViewConf(msg, sender, merged); else handleToken(msg, sender, merged); }
protected void handleToken(CoordinationMessage msg, Member sender, Membership merged) throws ChannelException { MemberImpl local = (MemberImpl) getLocalMember(false); if (local.equals(msg.getSource())) { // my message msg.src=local handleMyToken(local, msg, sender, merged); } else { handleOtherToken(local, msg, sender, merged); } }
protected void handleOtherToken(MemberImpl local, CoordinationMessage msg, Member sender, Membership merged) throws ChannelException { if (local.equals(msg.getLeader())) { // I am the new leader // startElection(false); } else { msg.view = merged.getMembers(); sendElectionMsgToNextInline(local, msg); } }
public static Member[] diff(Membership complete, Membership local, MemberImpl ignore) { ArrayList<Member> result = new ArrayList<Member>(); MemberImpl[] comp = complete.getMembers(); for (int i = 0; i < comp.length; i++) { if (ignore != null && ignore.equals(comp[i])) continue; if (local.getMember(comp[i]) == null) result.add(comp[i]); } return result.toArray(new MemberImpl[result.size()]); }