protected void sendElectionMsgToNextInline(MemberImpl local, CoordinationMessage msg) throws ChannelException { int next = Arrays.nextIndex(local,msg.getMembers()); int current = next; msg.leader = msg.getMembers()[0]; boolean sent = false; while ( !sent && current >= 0 ) { try { sendElectionMsg(local, msg.getMembers()[current], msg); sent = true; }catch ( ChannelException x ) { log.warn("Unable to send election message to:"+msg.getMembers()[current]); current = Arrays.nextIndex(msg.getMembers()[current],msg.getMembers()); if ( current == next ) throw x; } } }
protected void handleMyToken(MemberImpl local, CoordinationMessage msg, Member sender,Membership merged) throws ChannelException { if ( local.equals(msg.getLeader()) ) { //no leadership change if ( Arrays.sameMembers(msg.getMembers(),merged.getMembers()) ) { msg.type = COORD_CONF; super.sendMessage(Arrays.remove(msg.getMembers(),local),createData(msg,local),null); handleViewConf(msg,local,merged); } else { //membership change suggestedView = new Membership(local,AbsoluteOrder.comp,true); suggestedviewId = msg.getId(); Arrays.fill(suggestedView,merged.getMembers()); msg.view = merged.getMembers(); sendElectionMsgToNextInline(local,msg); } } else { //leadership change suggestedView = null; suggestedviewId = null; msg.view = merged.getMembers(); sendElectionMsgToNextInline(local,msg); } }
protected void handleViewConf(CoordinationMessage msg, Member sender,Membership merged) throws ChannelException { if ( viewId != null && msg.getId().equals(viewId) ) return;//we already have this view view = new Membership((MemberImpl)getLocalMember(false),AbsoluteOrder.comp,true); Arrays.fill(view,msg.getMembers()); viewId = msg.getId(); if ( viewId.equals(suggestedviewId) ) { suggestedView = null; suggestedviewId = null; } if (suggestedView != null && AbsoluteOrder.comp.compare(suggestedView.getMembers()[0],merged.getMembers()[0])<0 ) { suggestedView = null; suggestedviewId = null; } viewChange(viewId,view.getMembers()); fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_CONF_RX,this,"Accepted View")); if ( suggestedviewId == null && hasHigherPriority(merged.getMembers(),membership.getMembers()) ) { startElection(false); } }
@Override public void messageReceived(ChannelMessage msg) { if ( Arrays.contains(msg.getMessage().getBytesDirect(),0,COORD_ALIVE,0,COORD_ALIVE.length) ) { //ignore message, its an alive message fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_MSG_ARRIVE,this,"Alive Message")); } else if ( Arrays.contains(msg.getMessage().getBytesDirect(),0,COORD_HEADER,0,COORD_HEADER.length) ) { try { CoordinationMessage cmsg = new CoordinationMessage(msg.getMessage()); Member[] cmbr = cmsg.getMembers(); fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_MSG_ARRIVE,this,"Coord Msg Arrived("+Arrays.toNameString(cmbr)+")")); processCoordMessage(cmsg, msg.getAddress()); }catch ( ChannelException x ) { log.error("Error processing coordination message. Could be fatal.",x); } } else { super.messageReceived(msg); } }
@Override public void heartbeat() { try { MemberImpl local = (MemberImpl)getLocalMember(false); if ( view != null && (Arrays.diff(view,membership,local).length != 0 || Arrays.diff(membership,view,local).length != 0) ) { if ( isHighest() ) { fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_START_ELECT, this, "Heartbeat found inconsistency, restart election")); startElection(true); } } } catch ( Exception x ){ log.error("Unable to perform heartbeat.",x); } finally { super.heartbeat(); } }
public static void run(String[] args,CoordinationDemo demo) throws Exception { usage(); java.util.Arrays.fill(VIEW_EVENTS,true); for (int i=0; i<args.length; i++ ) { if ( "-c".equals(args[i]) ) CHANNEL_COUNT = Integer.parseInt(args[++i]); else if ( "-t".equals(args[i]) ) MULTI_THREAD = Boolean.parseBoolean(args[++i]); else if ( "-s".equals(args[i]) ) SLEEP_TIME = Long.parseLong(args[++i]); else if ( "-sc".equals(args[i]) ) CLEAR_SCREEN = Integer.parseInt(args[++i]); else if ( "-p".equals(args[i]) ) setEvents(args[++i]); else if ( "-h".equals(args[i]) ) System.exit(0); } demo.init(); demo.waitForInput(); }
@SuppressWarnings("unused") public static void main(String[] args) throws Exception { if (args.length==0) usage(); main = Thread.currentThread(); ManagedChannel channel = (ManagedChannel) ChannelCreator.createChannel(args); Properties props = new Properties(); props.setProperty("mydomainkey","mydomainvalue"); props.setProperty("someotherkey", Arrays.toString(UUIDGenerator.randomUUID(true))); new MembersWithProperties(channel, props); channel.start(Channel.DEFAULT); Runtime.getRuntime().addShutdownHook(new Shutdown(channel)); try { Thread.sleep(Long.MAX_VALUE); }catch(InterruptedException ix) { Thread.sleep(5000);//allow everything to shutdown } }
@Override public boolean equals(Object other) { boolean result = (other instanceof UniqueId); if (result) { UniqueId uid = (UniqueId) other; if (this.id == null && uid.id == null) result = true; else if (this.id == null && uid.id != null) result = false; else if (this.id != null && uid.id == null) result = false; else result = Arrays.equals(this.id, uid.id); } // end if return result; }
/** * Send a message to one or more members in the cluster * * @param destination * Member[] - the destinations, null or zero length means all * @param msg * ClusterMessage - the message to send * @param payload * TBA */ @Override public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException { if (destination == null) destination = membershipService.getMembers(); if ((msg.getOptions() & Channel.SEND_OPTIONS_MULTICAST) == Channel.SEND_OPTIONS_MULTICAST) { membershipService.broadcast(msg); } else { clusterSender.sendMessage(msg, destination); } if (Logs.MESSAGES.isTraceEnabled()) { Logs.MESSAGES.trace("ChannelCoordinator - Sent msg:" + new UniqueId(msg.getUniqueId()) + " at " + new java.sql.Timestamp(System.currentTimeMillis()) + " to " + Arrays.toNameString(destination)); } }
protected void sendElectionMsgToNextInline(MemberImpl local, CoordinationMessage msg) throws ChannelException { int next = Arrays.nextIndex(local, msg.getMembers()); int current = next; msg.leader = msg.getMembers()[0]; boolean sent = false; while (!sent && current >= 0) { try { sendElectionMsg(local, msg.getMembers()[current], msg); sent = true; } catch (ChannelException x) { log.warn("Unable to send election message to:" + msg.getMembers()[current]); current = Arrays.nextIndex(msg.getMembers()[current], msg.getMembers()); if (current == next) throw x; } } }
protected Membership mergeOnArrive(CoordinationMessage msg, Member sender) { fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_PRE_MERGE, this, "Pre merge")); MemberImpl local = (MemberImpl) getLocalMember(false); Membership merged = new Membership(local, AbsoluteOrder.comp, true); Arrays.fill(merged, msg.getMembers()); Arrays.fill(merged, getMembers()); Member[] diff = Arrays.diff(merged, membership, local); for (int i = 0; i < diff.length; i++) { if (!alive(diff[i])) merged.removeMember((MemberImpl) diff[i]); else memberAdded(diff[i], false); } fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_POST_MERGE, this, "Post merge")); return merged; }
protected void handleMyToken(MemberImpl local, CoordinationMessage msg, Member sender, Membership merged) throws ChannelException { if (local.equals(msg.getLeader())) { // no leadership change if (Arrays.sameMembers(msg.getMembers(), merged.getMembers())) { msg.type = COORD_CONF; super.sendMessage(Arrays.remove(msg.getMembers(), local), createData(msg, local), null); handleViewConf(msg, local, merged); } else { // membership change suggestedView = new Membership(local, AbsoluteOrder.comp, true); suggestedviewId = msg.getId(); Arrays.fill(suggestedView, merged.getMembers()); msg.view = merged.getMembers(); sendElectionMsgToNextInline(local, msg); } } else { // leadership change suggestedView = null; suggestedviewId = null; msg.view = merged.getMembers(); sendElectionMsgToNextInline(local, msg); } }
protected void handleViewConf(CoordinationMessage msg, Member sender, Membership merged) throws ChannelException { if (viewId != null && msg.getId().equals(viewId)) return;// we already have this view view = new Membership((MemberImpl) getLocalMember(false), AbsoluteOrder.comp, true); Arrays.fill(view, msg.getMembers()); viewId = msg.getId(); if (viewId.equals(suggestedviewId)) { suggestedView = null; suggestedviewId = null; } if (suggestedView != null && AbsoluteOrder.comp.compare(suggestedView.getMembers()[0], merged.getMembers()[0]) < 0) { suggestedView = null; suggestedviewId = null; } viewChange(viewId, view.getMembers()); fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_CONF_RX, this, "Accepted View")); if (suggestedviewId == null && hasHigherPriority(merged.getMembers(), membership.getMembers())) { startElection(false); } }
@Override public void messageReceived(ChannelMessage msg) { if (Arrays.contains(msg.getMessage().getBytesDirect(), 0, COORD_ALIVE, 0, COORD_ALIVE.length)) { // ignore message, its an alive message fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_MSG_ARRIVE, this, "Alive Message")); } else if (Arrays.contains(msg.getMessage().getBytesDirect(), 0, COORD_HEADER, 0, COORD_HEADER.length)) { try { CoordinationMessage cmsg = new CoordinationMessage(msg.getMessage()); Member[] cmbr = cmsg.getMembers(); fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_MSG_ARRIVE, this, "Coord Msg Arrived(" + Arrays.toNameString(cmbr) + ")")); processCoordMessage(cmsg, msg.getAddress()); } catch (ChannelException x) { log.error("Error processing coordination message. Could be fatal.", x); } } else { super.messageReceived(msg); } }
@Override public void heartbeat() { try { MemberImpl local = (MemberImpl) getLocalMember(false); if (view != null && (Arrays.diff(view, membership, local).length != 0 || Arrays.diff(membership, view, local).length != 0)) { if (isHighest()) { fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_START_ELECT, this, "Heartbeat found inconsistency, restart election")); startElection(true); } } } catch (Exception x) { log.error("Unable to perform heartbeat.", x); } finally { super.heartbeat(); } }