/** * Mark the token as complete and ready for users to be notified. * * @param msg response message. Optional - there are no response messages for some flows * @param ex if there was a problem store the exception in the token. */ protected void markComplete(MqttWireMessage msg, MqttException ex) { final String methodName = "markComplete"; //@TRACE 404=>key={0} response={1} excep={2} log.fine(className, methodName, "404", new Object[]{getKey(), msg, ex}); synchronized (responseLock) { // ACK means that everything was OK, so mark the message for garbage collection. if (msg instanceof MqttAck) { this.message = null; } this.pendingComplete = true; this.response = msg; this.exception = ex; } }
private MqttWireMessage restoreMessage(String key, MqttPersistable persistable) throws MqttException { final String methodName = "restoreMessage"; MqttWireMessage message = null; try { message = MqttWireMessage.createWireMessage(persistable); } catch (MqttException ex) { //@TRACE 602=key={0} exception log.fine(CLASS_NAME, methodName, "602", new Object[] {key}, ex); if (ex.getCause() instanceof EOFException) { // Premature end-of-file means that the message is corrupted if (key != null) { persistence.remove(key); } } else { throw ex; } } //@TRACE 601=key={0} message={1} log.fine(CLASS_NAME, methodName, "601", new Object[]{key,message}); return message; }
protected void notifyResult(MqttWireMessage ack, MqttToken token, MqttException ex) { final String methodName = "notifyResult"; // unblock any threads waiting on the token token.internalTok.markComplete(ack, ex); // Let the user know an async operation has completed and then remove the token if (ack != null && ack instanceof MqttAck && !(ack instanceof MqttPubRec)) { //@TRACE 648=key{0}, msg={1}, excep={2} log.fine(CLASS_NAME,methodName, "648", new Object [] {token.internalTok.getKey(), ack, ex}); callback.asyncOperationComplete(token); } // There are cases where there is no ack as the operation failed before // an ack was received if (ack == null ) { //@TRACE 649=key={0},excep={1} log.fine(CLASS_NAME,methodName, "649", new Object [] { token.internalTok.getKey(), ex}); callback.asyncOperationComplete(token); } }
/** * Mark the token as complete and ready for users to be notified. * @param msg response message. Optional - there are no response messages for some flows * @param ex if there was a problem store the exception in the token. */ protected void markComplete(MqttWireMessage msg, MqttException ex) { final String methodName = "markComplete"; //@TRACE 404=>key={0} response={1} excep={2} log.fine(CLASS_NAME,methodName,"404",new Object[]{getKey(),msg,ex}); synchronized(responseLock) { // ACK means that everything was OK, so mark the message for garbage collection. if (msg instanceof MqttAck) { this.message = null; } this.pendingComplete = true; this.response = msg; this.exception = ex; } }
@Override public MqttWireMessage getResponse() { if(connect2Token != null) return connect2Token.getResponse(); if(disconnectToken != null) return disconnectToken.getResponse(); if(connect1Token != null) return connect1Token.getResponse(); return null; }
private void handleRunException(MqttWireMessage message, Exception ex) { final String methodName = "handleRunException"; //@TRACE 804=exception log.fine(className, methodName, "804", null, ex); MqttException mex; if (!(ex instanceof MqttException)) { mex = new MqttException(MqttException.REASON_CODE_CONNECTION_LOST, ex); } else { mex = (MqttException) ex; } running = false; clientComms.shutdownConnection(null, mex); }
protected void saveToken(MqttToken token, MqttWireMessage message) throws MqttException { final String methodName = "saveToken"; synchronized (tokens) { if (closedResponse == null) { String key = message.getKey(); //@TRACE 300=key={0} message={1} log.fine(className, methodName, "300", new Object[]{key, message}); saveToken(token, key); } else { throw closedResponse; } } }
public void waitForCompletion(long timeout) throws MqttException { final String methodName = "waitForCompletion"; //@TRACE 407=key={0} wait max={1} token={2} log.fine(className, methodName, "407", new Object[]{getKey(), new Long(timeout), this}); MqttWireMessage resp = waitForResponse(timeout); if (resp == null && !completed) { //@TRACE 406=key={0} timed out token={1} log.fine(className, methodName, "406", new Object[]{getKey(), this}); throw new MqttException(MqttException.REASON_CODE_CLIENT_TIMEOUT); } checkResult(); }
protected MqttWireMessage waitForResponse(long timeout) throws MqttException { final String methodName = "waitForResponse"; synchronized (responseLock) { //@TRACE 400=>key={0} timeout={1} sent={2} completed={3} hasException={4} response={5} token={6} log.fine(className, methodName, "400", new Object[]{getKey(), new Long(timeout), new Boolean(sent), new Boolean(completed), (exception == null) ? "false" : "true", response, this}, exception); if (!this.completed) { if (this.exception == null) { try { //@TRACE 408=key={0} wait max={1} log.fine(className, methodName, "408", new Object[]{getKey(), new Long(timeout)}); if (timeout == -1) { responseLock.wait(); } else { responseLock.wait(timeout); } } catch (InterruptedException e) { exception = new MqttException(e); } } if (!this.completed) { if (this.exception != null) { //@TRACE 401=failed with exception log.fine(className, methodName, "401", null, exception); throw exception; } } } } //@TRACE 402=key={0} response={1} log.fine(className, methodName, "402", new Object[]{getKey(), this.response}); return this.response; }
/** * Inserts a new message to the list, ensuring that list is ordered from lowest to highest in terms of the message id's. * @param list the list to insert the message into * @param newMsg the message to insert into the list */ private void insertInOrder(Vector list, MqttWireMessage newMsg) { int newMsgId = newMsg.getMessageId(); for (int i = 0; i < list.size(); i++) { MqttWireMessage otherMsg = (MqttWireMessage) list.elementAt(i); int otherMsgId = otherMsg.getMessageId(); if (otherMsgId > newMsgId) { list.insertElementAt(newMsg, i); return; } } list.addElement(newMsg); }
/** * Called by the CommsSender when a message has been sent * @param message */ protected void notifySent(MqttWireMessage message) { final String methodName = "notifySent"; this.lastOutboundActivity = System.currentTimeMillis(); //@TRACE 625=key={0} log.fine(CLASS_NAME,methodName,"625",new Object[]{message.getKey()}); MqttToken token = tokenStore.getToken(message); token.internalTok.notifySent(); if (message instanceof MqttPingReq) { synchronized (pingOutstandingLock) { long time = System.currentTimeMillis(); synchronized (pingOutstandingLock) { lastPing = time; pingOutstanding++; } //@TRACE 635=ping sent. pingOutstanding: {0} log.fine(CLASS_NAME,methodName,"635",new Object[]{ new Integer(pingOutstanding)}); } } else if (message instanceof MqttPublish) { if (((MqttPublish)message).getMessage().getQos() == 0) { // once a QoS 0 message is sent we can clean up its records straight away as // we won't be hearing about it again token.internalTok.markComplete(null, null); callback.asyncOperationComplete(token); decrementInFlight(); releaseMessageId(message.getMessageId()); tokenStore.removeToken(message); checkQuiesceLock(); } } }
private void handleRunException(MqttWireMessage message, Exception ex) { final String methodName = "handleRunException"; //@TRACE 804=exception log.fine(CLASS_NAME,methodName,"804",null, ex); MqttException mex; if ( !(ex instanceof MqttException)) { mex = new MqttException(MqttException.REASON_CODE_CONNECTION_LOST, ex); } else { mex = (MqttException)ex; } running = false; clientComms.shutdownConnection(null, mex); }
protected void saveToken(MqttToken token, MqttWireMessage message) throws MqttException { final String methodName = "saveToken"; synchronized(tokens) { if (closedResponse == null) { String key = message.getKey(); //@TRACE 300=key={0} message={1} log.fine(CLASS_NAME,methodName,"300",new Object[]{key, message}); saveToken(token,key); } else { throw closedResponse; } } }
public void waitForCompletion(long timeout) throws MqttException { final String methodName = "waitForCompletion"; //@TRACE 407=key={0} wait max={1} token={2} log.fine(CLASS_NAME,methodName, "407",new Object[]{getKey(), new Long(timeout), this}); MqttWireMessage resp = waitForResponse(timeout); if (resp == null && !completed) { //@TRACE 406=key={0} timed out token={1} log.fine(CLASS_NAME,methodName, "406",new Object[]{getKey(), this}); exception = new MqttException(MqttException.REASON_CODE_CLIENT_TIMEOUT); throw exception; } checkResult(); }
protected MqttWireMessage waitForResponse(long timeout) throws MqttException { final String methodName = "waitForResponse"; synchronized (responseLock) { //@TRACE 400=>key={0} timeout={1} sent={2} completed={3} hasException={4} response={5} token={6} log.fine(CLASS_NAME, methodName, "400",new Object[]{getKey(), new Long(timeout),new Boolean(sent),new Boolean(completed),(exception==null)?"false":"true",response,this},exception); while (!this.completed) { if (this.exception == null) { try { //@TRACE 408=key={0} wait max={1} log.fine(CLASS_NAME,methodName,"408",new Object[] {getKey(),new Long(timeout)}); if (timeout <= 0) { responseLock.wait(); } else { responseLock.wait(timeout); } } catch (InterruptedException e) { exception = new MqttException(e); } } if (!this.completed) { if (this.exception != null) { //@TRACE 401=failed with exception log.fine(CLASS_NAME,methodName,"401",null,exception); throw exception; } if (timeout > 0) { // time up and still not completed break; } } } } //@TRACE 402=key={0} response={1} log.fine(CLASS_NAME,methodName, "402",new Object[]{getKey(), this.response}); return this.response; }
/** * Sends a message to the server. Does not check if connected this validation must be done * by invoking routines. * @param message * @param token * @throws MqttException */ void internalSend(MqttWireMessage message, MqttToken token) throws MqttException { final String methodName = "internalSend"; //@TRACE 200=internalSend key={0} message={1} token={2} log.fine(CLASS_NAME, methodName, "200", new Object[]{message.getKey(), message, token}); if (token.getClient() == null ) { // Associate the client with the token - also marks it as in use. token.internalTok.setClient(getClient()); } else { // Token is already in use - cannot reuse //@TRACE 213=fail: token in use: key={0} message={1} token={2} log.fine(CLASS_NAME, methodName, "213", new Object[]{message.getKey(), message, token}); throw new MqttException(MqttException.REASON_CODE_TOKEN_INUSE); } try { // Persist if needed and send the message this.clientState.send(message, token); } catch(MqttException e) { if (message instanceof MqttPublish) { this.clientState.undo((MqttPublish)message); } throw e; } }
/** * Sends a message to the broker if in connected state, but only waits for the message to be * stored, before returning. */ public void sendNoWait(MqttWireMessage message, MqttToken token) throws MqttException { final String methodName = "sendNoWait"; if (isConnected() || (!isConnected() && message instanceof MqttConnect) || (isDisconnecting() && message instanceof MqttDisconnect)) { this.internalSend(message, token); } else { //@TRACE 208=failed: not connected log.fine(CLASS_NAME, methodName, "208"); throw ExceptionHelper.createMqttException(MqttException.REASON_CODE_CLIENT_NOT_CONNECTED); } }
@Override public MqttWireMessage getResponse() { return delegate.getResponse(); }
@Override @Nullable public MqttWireMessage getResponse() { // nothing return null; }
/** * Run loop to receive messages from the server. */ public void run() { final String methodName = "run"; MqttToken token = null; while (running && (in != null)) { try { //@TRACE 852=network read message log.fine(className, methodName, "852"); MqttWireMessage message = in.readMqttWireMessage(); if (message instanceof MqttAck) { token = tokenStore.getToken(message); if (token != null) { synchronized (token) { // Ensure the notify processing is done under a lock on the token // This ensures that the send processing can complete before the // receive processing starts! ( request and ack and ack processing // can occur before request processing is complete if not! clientState.notifyReceivedAck((MqttAck) message); } } else { // It its an ack and there is no token then something is not right. // An ack should always have a token assoicated with it. throw new MqttException(MqttException.REASON_CODE_UNEXPECTED_ERROR); } } else { // A new message has arrived clientState.notifyReceivedMsg(message); } } catch (MqttException ex) { //@TRACE 856=Stopping, MQttException log.fine(className, methodName, "856", null, ex); running = false; // Token maybe null but that is handled in shutdown clientComms.shutdownConnection(token, ex); } catch (IOException ioe) { //@TRACE 853=Stopping due to IOException log.fine(className, methodName, "853"); running = false; // An EOFException could be raised if the broker processes the // DISCONNECT and ends the socket before we complete. As such, // only shutdown the connection if we're not already shutting down. if (!clientComms.isDisconnecting()) { clientComms.shutdownConnection(token, new MqttException(MqttException.REASON_CODE_CONNECTION_LOST, ioe)); } // else { } } //@TRACE 854=< log.fine(className, methodName, "854"); }
public MqttToken removeToken(MqttWireMessage message) { if (message != null) { return removeToken(message.getKey()); } return null; }
public MqttWireMessage getWireMessage() { return response; }
/** * Run loop to receive messages from the server. */ public void run() { final String methodName = "run"; MqttToken token = null; while (running && (in != null)) { try { //@TRACE 852=network read message log.fine(CLASS_NAME,methodName,"852"); receiving = in.available() > 0; MqttWireMessage message = in.readMqttWireMessage(); receiving = false; if (message instanceof MqttAck) { token = tokenStore.getToken(message); if (token!=null) { synchronized (token) { // Ensure the notify processing is done under a lock on the token // This ensures that the send processing can complete before the // receive processing starts! ( request and ack and ack processing // can occur before request processing is complete if not! clientState.notifyReceivedAck((MqttAck)message); } } else { // It its an ack and there is no token then something is not right. // An ack should always have a token assoicated with it. throw new MqttException(MqttException.REASON_CODE_UNEXPECTED_ERROR); } } else { // A new message has arrived clientState.notifyReceivedMsg(message); } } catch (MqttException ex) { //@TRACE 856=Stopping, MQttException log.fine(CLASS_NAME,methodName,"856",null,ex); running = false; // Token maybe null but that is handled in shutdown clientComms.shutdownConnection(token, ex); } catch (IOException ioe) { //@TRACE 853=Stopping due to IOException log.fine(CLASS_NAME,methodName,"853"); running = false; // An EOFException could be raised if the broker processes the // DISCONNECT and ends the socket before we complete. As such, // only shutdown the connection if we're not already shutting down. if (!clientComms.isDisconnecting()) { clientComms.shutdownConnection(token, new MqttException(MqttException.REASON_CODE_CONNECTION_LOST, ioe)); } } finally { receiving = false; } } //@TRACE 854=< log.fine(CLASS_NAME,methodName,"854"); }
private String getSendPersistenceKey(MqttWireMessage message) { return PERSISTENCE_SENT_PREFIX + message.getMessageId(); }
private String getSendConfirmPersistenceKey(MqttWireMessage message) { return PERSISTENCE_CONFIRMED_PREFIX + message.getMessageId(); }
private String getReceivedPersistenceKey(MqttWireMessage message) { return PERSISTENCE_RECEIVED_PREFIX + message.getMessageId(); }
/** * Produces a new list with the messages properly ordered according to their message id's. * @param list the list containing the messages to produce a new reordered list for * - this will not be modified or replaced, i.e., be read-only to this method * @return a new reordered list */ private Vector reOrder(Vector list) { // here up the new list Vector newList = new Vector(); if (list.size() == 0) { return newList; // nothing to reorder } int previousMsgId = 0; int largestGap = 0; int largestGapMsgIdPosInList = 0; for (int i = 0; i < list.size(); i++) { int currentMsgId = ((MqttWireMessage) list.elementAt(i)).getMessageId(); if (currentMsgId - previousMsgId > largestGap) { largestGap = currentMsgId - previousMsgId; largestGapMsgIdPosInList = i; } previousMsgId = currentMsgId; } int lowestMsgId = ((MqttWireMessage) list.elementAt(0)).getMessageId(); int highestMsgId = previousMsgId; // last in the sorted list // we need to check that the gap after highest msg id to the lowest msg id is not beaten if (MAX_MSG_ID - highestMsgId + lowestMsgId > largestGap) { largestGapMsgIdPosInList = 0; } // starting message has been located, let's start from this point on for (int i = largestGapMsgIdPosInList; i < list.size(); i++) { newList.addElement(list.elementAt(i)); } // and any wrapping back to the beginning for (int i = 0; i < largestGapMsgIdPosInList; i++) { newList.addElement(list.elementAt(i)); } return newList; }
/** * Called by the CommsReceiver when a message has been received. * Handles inbound messages and other flows such as PUBREL. * * @param message * @throws MqttException */ protected void notifyReceivedMsg(MqttWireMessage message) throws MqttException { final String methodName = "notifyReceivedMsg"; this.lastInboundActivity = System.currentTimeMillis(); // @TRACE 651=received key={0} message={1} log.fine(CLASS_NAME, methodName, "651", new Object[] { new Integer(message.getMessageId()), message }); if (!quiescing) { if (message instanceof MqttPublish) { MqttPublish send = (MqttPublish) message; switch (send.getMessage().getQos()) { case 0: case 1: if (callback != null) { callback.messageArrived(send); } break; case 2: persistence.put(getReceivedPersistenceKey(message), (MqttPublish) message); inboundQoS2.put(new Integer(send.getMessageId()), send); this.send(new MqttPubRec(send), null); break; default: //should NOT reach here } } else if (message instanceof MqttPubRel) { MqttPublish sendMsg = (MqttPublish) inboundQoS2 .get(new Integer(message.getMessageId())); if (sendMsg != null) { if (callback != null) { callback.messageArrived(sendMsg); } } else { // Original publish has already been delivered. MqttPubComp pubComp = new MqttPubComp(message .getMessageId()); this.send(pubComp, null); } } } }
/** * Called when waiters and callbacks have processed the message. For * messages where delivery is complete the message can be removed from * persistence and counters adjusted accordingly. Also tidy up by removing * token from store... * * @param message * @throws MqttException */ protected void notifyComplete(MqttToken token) throws MqttException { final String methodName = "notifyComplete"; MqttWireMessage message = token.internalTok.getWireMessage(); if (message != null && message instanceof MqttAck) { // @TRACE 629=received key={0} token={1} message={2} log.fine(CLASS_NAME, methodName, "629", new Object[] { new Integer(message.getMessageId()), token, message }); MqttAck ack = (MqttAck) message; if (ack instanceof MqttPubAck) { // QoS 1 - user notified now remove from persistence... persistence.remove(getSendPersistenceKey(message)); outboundQoS1.remove(new Integer(ack.getMessageId())); decrementInFlight(); releaseMessageId(message.getMessageId()); tokenStore.removeToken(message); // @TRACE 650=removed Qos 1 publish. key={0} log.fine(CLASS_NAME, methodName, "650", new Object[] { new Integer(ack.getMessageId()) }); } else if (ack instanceof MqttPubComp) { // QoS 2 - user notified now remove from persistence... persistence.remove(getSendPersistenceKey(message)); persistence.remove(getSendConfirmPersistenceKey(message)); outboundQoS2.remove(new Integer(ack.getMessageId())); inFlightPubRels--; decrementInFlight(); releaseMessageId(message.getMessageId()); tokenStore.removeToken(message); // @TRACE 645=removed QoS 2 publish/pubrel. key={0}, -1 inFlightPubRels={1} log.fine(CLASS_NAME, methodName, "645", new Object[] { new Integer(ack.getMessageId()), new Integer(inFlightPubRels) }); } checkQuiesceLock(); } }
public MqttWireMessage getResponse() { return response; }
public MqttWireMessage getResponse() { return internalTok.getResponse(); }
/** * Based on the message type that has just been received return the associated * token from the token store or null if one does not exist. * * @param message whose token is to be returned * @return token for the requested message */ public MqttToken getToken(MqttWireMessage message) { String key = message.getKey(); return (MqttToken) tokens.get(key); }
/** * Waits for the message delivery to complete, but doesn't throw an exception * in the case of a NACK. It does still throw an exception if something else * goes wrong (e.g. an IOException). This is used for packets like CONNECT, * which have useful information in the ACK that needs to be accessed. */ protected MqttWireMessage waitForResponse() throws MqttException { return waitForResponse(-1); }
/** * Returns the response wire message */ public MqttWireMessage getResponse();
/** * Based on the message type that has just been received return the associated * token from the token store or null if one does not exist. * @param message whose token is to be returned * @return token for the requested message */ public MqttToken getToken(MqttWireMessage message) { String key = message.getKey(); return (MqttToken)tokens.get(key); }