@Override public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException { try { super.sendMessage(destination, msg, payload); } catch (ChannelException cx) { FaultyMember[] mbrs = cx.getFaultyMembers(); for (int i = 0; i < mbrs.length; i++) { if (mbrs[i].getCause() != null && (!(mbrs[i].getCause() instanceof RemoteProcessException))) {// RemoteProcessException's // are // ok this.memberDisappeared(mbrs[i].getMember()); } // end if } // for throw cx; } }
public Serializable replyRequest(Serializable msg, Member invoker) { if (log.isDebugEnabled()) { log.debug("RPC request received by RpcMessagingHandler"); } if (msg instanceof ClusteringMessage) { ClusteringMessage clusteringMsg = (ClusteringMessage) msg; try { clusteringMsg.execute(configurationContext); } catch (ClusteringFault e) { String errMsg = "Cannot handle RPC message"; log.error(errMsg, e); throw new RemoteProcessException(errMsg, e); } return clusteringMsg.getResponse(); } else { throw new IllegalArgumentException("Invalid RPC message of type " + msg.getClass() + " received"); } }
/** * Send the list of members to the <code>member</code> * * @param member The member to whom the member list has to be sent */ public void sendMemberList(Member member) { try { MemberListCommand memListCmd = new MemberListCommand(); List<Member> members = new ArrayList<Member>(this.members); memListCmd.setMembers(members.toArray(new Member[members.size()])); rpcMembershipChannel.send(new Member[]{member}, memListCmd, RpcChannel.ALL_REPLY, Channel.SEND_OPTIONS_ASYNCHRONOUS | TribesConstants.MEMBERSHIP_MSG_OPTION, 10000); if (log.isDebugEnabled()) { log.debug("Sent MEMBER_LIST to " + TribesUtil.getName(member)); } } catch (Exception e) { String errMsg = "Could not send MEMBER_LIST to member " + TribesUtil.getName(member); log.error(errMsg, e); throw new RemoteProcessException(errMsg, e); } }
@Override public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException { try { super.sendMessage(destination, msg, payload); }catch ( ChannelException cx ) { FaultyMember[] mbrs = cx.getFaultyMembers(); for ( int i=0; i<mbrs.length; i++ ) { if ( mbrs[i].getCause()!=null && (!(mbrs[i].getCause() instanceof RemoteProcessException)) ) {//RemoteProcessException's are ok this.memberDisappeared(mbrs[i].getMember()); }//end if }//for throw cx; } }
/** * Wait for Acknowledgement from other server. * FIXME Please, not wait only for three characters, better control that the wait ack message is correct. * @throws java.io.IOException * @throws java.net.SocketTimeoutException */ protected void waitForAck() throws java.io.IOException { try { boolean ackReceived = false; boolean failAckReceived = false; ackbuf.clear(); int bytesRead = 0; int i = soIn.read(); while ((i != -1) && (bytesRead < Constants.ACK_COMMAND.length)) { bytesRead++; byte d = (byte)i; ackbuf.append(d); if (ackbuf.doesPackageExist() ) { byte[] ackcmd = ackbuf.extractDataPackage(true).getBytes(); ackReceived = Arrays.equals(ackcmd,org.apache.catalina.tribes.transport.Constants.ACK_DATA); failAckReceived = Arrays.equals(ackcmd,org.apache.catalina.tribes.transport.Constants.FAIL_ACK_DATA); ackReceived = ackReceived || failAckReceived; break; } i = soIn.read(); } if (!ackReceived) { if (i == -1) throw new IOException(sm.getString("IDataSender.ack.eof",getAddress(), Integer.valueOf(socket.getLocalPort()))); else throw new IOException(sm.getString("IDataSender.ack.wrong",getAddress(), Integer.valueOf(socket.getLocalPort()))); } else if ( failAckReceived && getThrowOnFailedAck()) { throw new RemoteProcessException("Received a failed ack:org.apache.catalina.tribes.transport.Constants.FAIL_ACK_DATA"); } } catch (IOException x) { String errmsg = sm.getString("IDataSender.ack.missing", getAddress(), Integer.valueOf(socket.getLocalPort()), Long.valueOf(getTimeout())); if ( SenderState.getSenderState(getDestination()).isReady() ) { SenderState.getSenderState(getDestination()).setSuspect(); if ( log.isWarnEnabled() ) log.warn(errmsg, x); } else { if ( log.isDebugEnabled() )log.debug(errmsg, x); } throw x; } finally { ackbuf.clear(); } }
/** * Wait for Acknowledgement from other server. * FIXME Please, not wait only for three characters, better control that the wait ack message is correct. * @throws java.io.IOException * @throws java.net.SocketTimeoutException */ protected void waitForAck() throws java.io.IOException { try { boolean ackReceived = false; boolean failAckReceived = false; ackbuf.clear(); int bytesRead = 0; int i = soIn.read(); while ((i != -1) && (bytesRead < Constants.ACK_COMMAND.length)) { bytesRead++; byte d = (byte)i; ackbuf.append(d); if (ackbuf.doesPackageExist() ) { byte[] ackcmd = ackbuf.extractDataPackage(true).getBytes(); ackReceived = Arrays.equals(ackcmd,org.apache.catalina.tribes.transport.Constants.ACK_DATA); failAckReceived = Arrays.equals(ackcmd,org.apache.catalina.tribes.transport.Constants.FAIL_ACK_DATA); ackReceived = ackReceived || failAckReceived; break; } i = soIn.read(); } if (!ackReceived) { if (i == -1) throw new IOException(sm.getString("IDataSender.ack.eof",getAddress(), new Integer(socket.getLocalPort()))); else throw new IOException(sm.getString("IDataSender.ack.wrong",getAddress(), new Integer(socket.getLocalPort()))); } else if ( failAckReceived && getThrowOnFailedAck()) { throw new RemoteProcessException("Received a failed ack:org.apache.catalina.tribes.transport.Constants.FAIL_ACK_DATA"); } } catch (IOException x) { String errmsg = sm.getString("IDataSender.ack.missing", getAddress(),new Integer(socket.getLocalPort()), new Long(getTimeout())); if ( SenderState.getSenderState(getDestination()).isReady() ) { SenderState.getSenderState(getDestination()).setSuspect(); if ( log.isWarnEnabled() ) log.warn(errmsg, x); } else { if ( log.isDebugEnabled() )log.debug(errmsg, x); } throw x; } finally { ackbuf.clear(); } }
/** * Inform all members that a particular member just joined * * @param member The member who just joined */ public void sendMemberJoinedToAll(Member member) { try { MemberJoinedCommand cmd = new MemberJoinedCommand(); cmd.setMember(member); ArrayList<Member> membersToSend = (ArrayList<Member>) (((ArrayList) members).clone()); membersToSend.remove(member); // Do not send MEMBER_JOINED to the new member who just joined if (membersToSend.size() > 0) { rpcMembershipChannel.send(membersToSend.toArray(new Member[membersToSend.size()]), cmd, RpcChannel.ALL_REPLY, Channel.SEND_OPTIONS_ASYNCHRONOUS | TribesConstants.MEMBERSHIP_MSG_OPTION, 10000); if (log.isDebugEnabled()) { log.debug("Sent MEMBER_JOINED[" + TribesUtil.getName(member) + "] to all members in domain " + new String(domain)); } } } catch (Exception e) { String errMsg = "Could not send MEMBER_JOINED[" + TribesUtil.getName(member) + "] to all members "; log.error(errMsg, e); throw new RemoteProcessException(errMsg, e); } }
/** * publish info about a map pair (key/value) to other nodes in the cluster * @param key Object * @param value Object * @return Member - the backup node * @throws ChannelException */ @Override protected Member[] publishEntryInfo(Object key, Object value) throws ChannelException { if (! (key instanceof Serializable && value instanceof Serializable) ) return new Member[0]; //select a backup node Member[] backup = getMapMembers(); if (backup == null || backup.length == 0) return null; try { //publish the data out to all nodes MapMessage msg = new MapMessage(getMapContextName(), MapMessage.MSG_COPY, false, (Serializable) key, (Serializable) value, null,channel.getLocalMember(false), backup); getChannel().send(backup, msg, getChannelSendOptions()); } catch (ChannelException e) { FaultyMember[] faultyMembers = e.getFaultyMembers(); if (faultyMembers.length == 0) throw e; ArrayList<Member> faulty = new ArrayList<Member>(); for (FaultyMember faultyMember : faultyMembers) { if (!(faultyMember.getCause() instanceof RemoteProcessException)) { faulty.add(faultyMember.getMember()); } } Member[] realFaultyMembers = faulty.toArray(new Member[faulty.size()]); if (realFaultyMembers.length != 0) { backup = excludeFromSet(realFaultyMembers, backup); if (backup.length == 0) { throw e; } else { if (log.isWarnEnabled()) { log.warn("Unable to replicate backup key:" + key + ". Success nodes:" + Arrays.toString(backup) + ". Failed nodes:" + Arrays.toString(realFaultyMembers), e); } } } } return backup; }
/** * publish info about a map pair (key/value) to other nodes in the cluster * * @param key * Object * @param value * Object * @return Member - the backup node * @throws ChannelException */ @Override protected Member[] publishEntryInfo(Object key, Object value) throws ChannelException { if (!(key instanceof Serializable && value instanceof Serializable)) return new Member[0]; // select a backup node Member[] backup = getMapMembers(); if (backup == null || backup.length == 0) return null; try { // publish the data out to all nodes MapMessage msg = new MapMessage(getMapContextName(), MapMessage.MSG_COPY, false, (Serializable) key, (Serializable) value, null, channel.getLocalMember(false), backup); getChannel().send(backup, msg, getChannelSendOptions()); } catch (ChannelException e) { FaultyMember[] faultyMembers = e.getFaultyMembers(); if (faultyMembers.length == 0) throw e; ArrayList<Member> faulty = new ArrayList<Member>(); for (FaultyMember faultyMember : faultyMembers) { if (!(faultyMember.getCause() instanceof RemoteProcessException)) { faulty.add(faultyMember.getMember()); } } Member[] realFaultyMembers = faulty.toArray(new Member[faulty.size()]); if (realFaultyMembers.length != 0) { backup = excludeFromSet(realFaultyMembers, backup); if (backup.length == 0) { throw e; } else { if (log.isWarnEnabled()) { log.warn("Unable to replicate backup key:" + key + ". Success nodes:" + Arrays.toString(backup) + ". Failed nodes:" + Arrays.toString(realFaultyMembers), e); } } } } return backup; }
/** * Wait for Acknowledgement from other server. FIXME Please, not wait only * for three characters, better control that the wait ack message is * correct. * * @throws java.io.IOException * @throws java.net.SocketTimeoutException */ protected void waitForAck() throws java.io.IOException { try { boolean ackReceived = false; boolean failAckReceived = false; ackbuf.clear(); int bytesRead = 0; int i = soIn.read(); while ((i != -1) && (bytesRead < Constants.ACK_COMMAND.length)) { bytesRead++; byte d = (byte) i; ackbuf.append(d); if (ackbuf.doesPackageExist()) { byte[] ackcmd = ackbuf.extractDataPackage(true).getBytes(); ackReceived = Arrays.equals(ackcmd, org.apache.catalina.tribes.transport.Constants.ACK_DATA); failAckReceived = Arrays.equals(ackcmd, org.apache.catalina.tribes.transport.Constants.FAIL_ACK_DATA); ackReceived = ackReceived || failAckReceived; break; } i = soIn.read(); } if (!ackReceived) { if (i == -1) throw new IOException( sm.getString("IDataSender.ack.eof", getAddress(), Integer.valueOf(socket.getLocalPort()))); else throw new IOException(sm.getString("IDataSender.ack.wrong", getAddress(), Integer.valueOf(socket.getLocalPort()))); } else if (failAckReceived && getThrowOnFailedAck()) { throw new RemoteProcessException( "Received a failed ack:org.apache.catalina.tribes.transport.Constants.FAIL_ACK_DATA"); } } catch (IOException x) { String errmsg = sm.getString("IDataSender.ack.missing", getAddress(), Integer.valueOf(socket.getLocalPort()), Long.valueOf(getTimeout())); if (SenderState.getSenderState(getDestination()).isReady()) { SenderState.getSenderState(getDestination()).setSuspect(); if (log.isWarnEnabled()) log.warn(errmsg, x); } else { if (log.isDebugEnabled()) log.debug(errmsg, x); } throw x; } finally { ackbuf.clear(); } }
/** * Send MEMBER_LIST message to WKA member * * @param wkaMember The WKA member to whom the MEMBER_LIST has to be sent * @return true - if the WKA member belongs to the domain of this local member */ private boolean sendMemberListToWellKnownMember(Member wkaMember) { /*if (wkaMember.isFailing() || wkaMember.isSuspect()) { return false; }*/ // send the member list to it MemberListCommand memListCmd; try { memListCmd = new MemberListCommand(); List<Member> members = new ArrayList<Member>(this.members); members.add(localMember); // Need to set the local member too memListCmd.setMembers(members.toArray(new Member[members.size()])); Response[] responses = rpcMembershipChannel.send(new Member[]{wkaMember}, memListCmd, RpcChannel.ALL_REPLY, Channel.SEND_OPTIONS_ASYNCHRONOUS | TribesConstants.MEMBERSHIP_MSG_OPTION, 10000); // Once a response is received from the WKA member to the MEMBER_LIST message, // if it does not belong to this domain, simply remove it from the members if (responses != null && responses.length > 0 && responses[0] != null) { nonRespondingWkaMembers.remove(wkaMember); Member source = responses[0].getSource(); if (!TribesUtil.areInSameDomain(source, wkaMember)) { if (log.isDebugEnabled()) { log.debug("WKA Member " + TribesUtil.getName(source) + " does not belong to local domain " + new String(domain) + ". Hence removing it from the list."); } return false; } } else { // No response from WKA member if(nonRespondingWkaMembers.add(wkaMember)){ log.debug("No response from WKA member added to the list of non-responsive members : " + wkaMember); } // we still don't know the details of this WKA member, hence should return false return false; } } catch (Exception e) { String errMsg = "Could not send MEMBER_LIST to well-known member " + TribesUtil.getName(wkaMember); log.error(errMsg, e); throw new RemoteProcessException(errMsg, e); } return true; }