public MqttToken removeToken(String key) { final String methodName = "removeToken"; //@TRACE 306=key={0} log.fine(className, methodName, "306", new Object[]{key}); if (key != null) { synchronized (tokens) { MqttToken tok = (MqttToken) tokens.get(key); if (tok != null) { synchronized (tok) { return (MqttToken) tokens.remove(key); } } } } return null; }
/** * @param persistence * @param client * @param comms * @param options * @param userToken * @param userContext * @param userCallback */ public ConnectActionListener( MqttAsyncClient client, MqttClientPersistence persistence, ClientComms comms, MqttConnectOptions options, MqttToken userToken, Object userContext, IMqttActionListener userCallback) { this.persistence = persistence; this.client = client; this.comms = comms; this.options = options; this.userToken = userToken; this.userContext = userContext; this.userCallback = userCallback; this.originalMqttVersion = options.getMqttVersion(); }
/** * Start the connect processing * @throws MqttPersistenceException */ public void connect() throws MqttPersistenceException { MqttToken token = new MqttToken(client.getClientId()); token.setActionCallback(this); token.setUserContext(this); persistence.open(client.getClientId(), client.getServerURI()); if (options.isCleanSession()) { persistence.clear(); } if (options.getMqttVersion() == MqttConnectOptions.MQTT_VERSION_DEFAULT) { options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1); } try { comms.connect(options, token); } catch (MqttException e) { onFailure(token, e); } }
/** * An action has completed - if a completion listener has been set on the * token then invoke it with the outcome of the action. * * @param token */ public void fireActionEvent(MqttToken token) { final String methodName = "fireActionEvent"; if (token != null) { IMqttActionListener asyncCB = token.getActionCallback(); if (asyncCB != null) { if (token.getException() == null) { // @TRACE 716=call onSuccess key={0} log.fine(CLASS_NAME, methodName, "716", new Object[] { token.internalTok.getKey() }); asyncCB.onSuccess(token); } else { // @TRACE 717=call onFailure key {0} log.fine(CLASS_NAME, methodName, "716", new Object[] { token.internalTok.getKey() }); asyncCB.onFailure(token, token.getException()); } } } }
private void handleMessage(MqttPublish publishMessage) throws MqttException, Exception { final String methodName = "handleMessage"; // If quisecing process any pending messages. if (mqttCallback != null) { String destName = publishMessage.getTopicName(); // @TRACE 713=call messageArrived key={0} topic={1} log.fine(CLASS_NAME, methodName, "713", new Object[] { new Integer(publishMessage.getMessageId()), destName }); mqttCallback.messageArrived(destName, publishMessage.getMessage()); if (publishMessage.getMessage().getQos() == 1) { this.clientComms.internalSend(new MqttPubAck(publishMessage), new MqttToken(clientComms.getClient().getClientId())); } else if (publishMessage.getMessage().getQos() == 2) { this.clientComms.deliveryComplete(publishMessage); MqttPubComp pubComp = new MqttPubComp(publishMessage); this.clientComms.internalSend(pubComp, new MqttToken(clientComms.getClient().getClientId())); } } }
protected void notifyResult(MqttWireMessage ack, MqttToken token, MqttException ex) { final String methodName = "notifyResult"; // unblock any threads waiting on the token token.internalTok.markComplete(ack, ex); // Let the user know an async operation has completed and then remove the token if (ack != null && ack instanceof MqttAck && !(ack instanceof MqttPubRec)) { //@TRACE 648=key{0}, msg={1}, excep={2} log.fine(CLASS_NAME,methodName, "648", new Object [] {token.internalTok.getKey(), ack, ex}); callback.asyncOperationComplete(token); } // There are cases where there is no ack as the operation failed before // an ack was received if (ack == null ) { //@TRACE 649=key={0},excep={1} log.fine(CLASS_NAME,methodName, "649", new Object [] { token.internalTok.getKey(), ex}); callback.asyncOperationComplete(token); } }
/** * Disconnect the connection and reset all the states. */ public void disconnectForcibly(long quiesceTimeout, long disconnectTimeout) throws MqttException { // Allow current inbound and outbound work to complete clientState.quiesce(quiesceTimeout); MqttToken token = new MqttToken(client.getClientId()); try { // Send disconnect packet internalSend(new MqttDisconnect(), token); // Wait util the disconnect packet sent with timeout token.waitForCompletion(disconnectTimeout); } catch (Exception ex) { // ignore, probably means we failed to send the disconnect packet. } finally { token.internalTok.markComplete(null, null); shutdownConnection(token, null); } }
@Override public void deliveryComplete(final IMqttDeliveryToken token) { final Level level = Level.FINE; if (logger.isLoggable(level)) { logger.log(level, Messages.get("msg.mqtt.message.delivered"), token instanceof MqttToken ? ((MqttToken) token).internalTok : token); } }
protected void saveToken(MqttToken token, MqttWireMessage message) throws MqttException { final String methodName = "saveToken"; synchronized (tokens) { if (closedResponse == null) { String key = message.getKey(); //@TRACE 300=key={0} message={1} log.fine(className, methodName, "300", new Object[]{key, message}); saveToken(token, key); } else { throw closedResponse; } } }
protected void saveToken(MqttToken token, String key) { final String methodName = "saveToken"; synchronized (tokens) { //@TRACE 307=key={0} token={1} log.fine(className, methodName, "307", new Object[]{key, token.toString()}); token.internalTok.setKey(key); this.tokens.put(key, token); } }
private void handleActionComplete(MqttToken token) throws MqttException { final String methodName = "handleActionComplete"; synchronized (token) { // @TRACE 705=callback and notify for key={0} log.fine(CLASS_NAME, methodName, "705", new Object[] { token.internalTok.getKey() }); // Unblock any waiters and if pending complete now set completed token.internalTok.notifyComplete(); if (!token.internalTok.isNotified()) { // If a callback is registered and delivery has finished // call delivery complete callback. if ( mqttCallback != null && token instanceof MqttDeliveryToken && token.isComplete()) { mqttCallback.deliveryComplete((MqttDeliveryToken) token); } // Now call async action completion callbacks fireActionEvent(token); } // Set notified so we don't tell the user again about this action. if ( token.isComplete() ){ if ( token instanceof MqttDeliveryToken || token.getActionCallback() instanceof IMqttActionListener ) { token.internalTok.setNotified(true); } } if (token.isComplete()) { // Finish by doing any post processing such as delete // from persistent store but only do so if the action // is complete clientState.notifyComplete(token); } } }
public void asyncOperationComplete(MqttToken token) { final String methodName = "asyncOperationComplete"; if (running) { // invoke callbacks on callback thread completeQueue.addElement(token); synchronized (workAvailable) { // @TRACE 715=new workAvailable. key={0} log.fine(CLASS_NAME, methodName, "715", new Object[] { token.internalTok.getKey() }); workAvailable.notifyAll(); } } else { // invoke async callback on invokers thread try { handleActionComplete(token); } catch (Throwable ex) { // Users code could throw an Error or Exception e.g. in the case // of class NoClassDefFoundError // @TRACE 719=callback threw ex: log.fine(CLASS_NAME, methodName, "719", null, ex); // Shutdown likely already in progress but no harm to confirm clientComms.shutdownConnection(null, new MqttException(ex)); } } }
/** * Called by the CommsSender when a message has been sent * @param message */ protected void notifySent(MqttWireMessage message) { final String methodName = "notifySent"; this.lastOutboundActivity = System.currentTimeMillis(); //@TRACE 625=key={0} log.fine(CLASS_NAME,methodName,"625",new Object[]{message.getKey()}); MqttToken token = tokenStore.getToken(message); token.internalTok.notifySent(); if (message instanceof MqttPingReq) { synchronized (pingOutstandingLock) { long time = System.currentTimeMillis(); synchronized (pingOutstandingLock) { lastPing = time; pingOutstanding++; } //@TRACE 635=ping sent. pingOutstanding: {0} log.fine(CLASS_NAME,methodName,"635",new Object[]{ new Integer(pingOutstanding)}); } } else if (message instanceof MqttPublish) { if (((MqttPublish)message).getMessage().getQos() == 0) { // once a QoS 0 message is sent we can clean up its records straight away as // we won't be hearing about it again token.internalTok.markComplete(null, null); callback.asyncOperationComplete(token); decrementInFlight(); releaseMessageId(message.getMessageId()); tokenStore.removeToken(message); checkQuiesceLock(); } } }
/** * Called during shutdown to work out if there are any tokens still * to be notified and waiters to be unblocked. Notifying and unblocking * takes place after most shutdown processing has completed. The tokenstore * is tidied up so it only contains outstanding delivery tokens which are * valid after reconnect (if clean session is false) * @param reason The root cause of the disconnection, or null if it is a clean disconnect */ public Vector resolveOldTokens(MqttException reason) { final String methodName = "resolveOldTokens"; //@TRACE 632=reason {0} log.fine(CLASS_NAME,methodName,"632", new Object[] {reason}); // If any outstanding let the user know the reason why it is still // outstanding by putting the reason shutdown is occurring into the // token. MqttException shutReason = reason; if (reason == null) { shutReason = new MqttException(MqttException.REASON_CODE_CLIENT_DISCONNECTING); } // Set the token up so it is ready to be notified after disconnect // processing has completed. Do not // remove the token from the store if it is a delivery token, it is // valid after a reconnect. Vector outT = tokenStore.getOutstandingTokens(); Enumeration outTE = outT.elements(); while (outTE.hasMoreElements()) { MqttToken tok = (MqttToken)outTE.nextElement(); synchronized (tok) { if (!tok.isComplete() && !tok.internalTok.isCompletePending() && tok.getException() == null) { tok.internalTok.setException(shutReason); } } if (!(tok instanceof MqttDeliveryToken)) { // If not a delivery token it is not valid on // restart so remove tokenStore.removeToken(tok.internalTok.getKey()); } } return outT; }
public MqttToken removeToken(String key) { final String methodName = "removeToken"; //@TRACE 306=key={0} log.fine(CLASS_NAME,methodName,"306",new Object[]{key}); if ( null != key ){ return (MqttToken) tokens.remove(key); } return null; }
protected void saveToken(MqttToken token, MqttWireMessage message) throws MqttException { final String methodName = "saveToken"; synchronized(tokens) { if (closedResponse == null) { String key = message.getKey(); //@TRACE 300=key={0} message={1} log.fine(CLASS_NAME,methodName,"300",new Object[]{key, message}); saveToken(token,key); } else { throw closedResponse; } } }
protected void saveToken(MqttToken token, String key) { final String methodName = "saveToken"; synchronized(tokens) { //@TRACE 307=key={0} token={1} log.fine(CLASS_NAME,methodName,"307",new Object[]{key,token.toString()}); token.internalTok.setKey(key); this.tokens.put(key, token); } }
/** * Sends a message to the server. Does not check if connected this validation must be done * by invoking routines. * @param message * @param token * @throws MqttException */ void internalSend(MqttWireMessage message, MqttToken token) throws MqttException { final String methodName = "internalSend"; //@TRACE 200=internalSend key={0} message={1} token={2} log.fine(CLASS_NAME, methodName, "200", new Object[]{message.getKey(), message, token}); if (token.getClient() == null ) { // Associate the client with the token - also marks it as in use. token.internalTok.setClient(getClient()); } else { // Token is already in use - cannot reuse //@TRACE 213=fail: token in use: key={0} message={1} token={2} log.fine(CLASS_NAME, methodName, "213", new Object[]{message.getKey(), message, token}); throw new MqttException(MqttException.REASON_CODE_TOKEN_INUSE); } try { // Persist if needed and send the message this.clientState.send(message, token); } catch(MqttException e) { if (message instanceof MqttPublish) { this.clientState.undo((MqttPublish)message); } throw e; } }
/** * Sends a message to the broker if in connected state, but only waits for the message to be * stored, before returning. */ public void sendNoWait(MqttWireMessage message, MqttToken token) throws MqttException { final String methodName = "sendNoWait"; if (isConnected() || (!isConnected() && message instanceof MqttConnect) || (isDisconnecting() && message instanceof MqttDisconnect)) { this.internalSend(message, token); } else { //@TRACE 208=failed: not connected log.fine(CLASS_NAME, methodName, "208"); throw ExceptionHelper.createMqttException(MqttException.REASON_CODE_CLIENT_NOT_CONNECTED); } }
private MqttToken handleOldTokens(MqttToken token, MqttException reason) { final String methodName = "handleOldTokens"; //@TRACE 222=> log.fine(CLASS_NAME,methodName,"222"); MqttToken tokToNotifyLater = null; try { // First the token that was related to the disconnect / shutdown may // not be in the token table - temporarily add it if not if (token != null) { if (tokenStore.getToken(token.internalTok.getKey())==null) { tokenStore.saveToken(token, token.internalTok.getKey()); } } Vector toksToNot = clientState.resolveOldTokens(reason); Enumeration toksToNotE = toksToNot.elements(); while(toksToNotE.hasMoreElements()) { MqttToken tok = (MqttToken)toksToNotE.nextElement(); if (tok.internalTok.getKey().equals(MqttDisconnect.KEY) || tok.internalTok.getKey().equals(MqttConnect.KEY)) { // Its con or discon so remember and notify @ end of disc routine tokToNotifyLater = tok; } else { // notify waiters and callbacks of outstanding tokens // that a problem has occurred and disconnect is in // progress callback.asyncOperationComplete(tok); } } }catch(Exception ex) { // Ignore as we are shutting down } return tokToNotifyLater; }
public void closeConnection(){ MqttToken token = new MqttToken(comms.getClient().getClientId()); comms.shutdownConnection(token, null); }
/** * Run loop to receive messages from the server. */ public void run() { final String methodName = "run"; MqttToken token = null; while (running && (in != null)) { try { //@TRACE 852=network read message log.fine(className, methodName, "852"); MqttWireMessage message = in.readMqttWireMessage(); if (message instanceof MqttAck) { token = tokenStore.getToken(message); if (token != null) { synchronized (token) { // Ensure the notify processing is done under a lock on the token // This ensures that the send processing can complete before the // receive processing starts! ( request and ack and ack processing // can occur before request processing is complete if not! clientState.notifyReceivedAck((MqttAck) message); } } else { // It its an ack and there is no token then something is not right. // An ack should always have a token assoicated with it. throw new MqttException(MqttException.REASON_CODE_UNEXPECTED_ERROR); } } else { // A new message has arrived clientState.notifyReceivedMsg(message); } } catch (MqttException ex) { //@TRACE 856=Stopping, MQttException log.fine(className, methodName, "856", null, ex); running = false; // Token maybe null but that is handled in shutdown clientComms.shutdownConnection(token, ex); } catch (IOException ioe) { //@TRACE 853=Stopping due to IOException log.fine(className, methodName, "853"); running = false; // An EOFException could be raised if the broker processes the // DISCONNECT and ends the socket before we complete. As such, // only shutdown the connection if we're not already shutting down. if (!clientComms.isDisconnecting()) { clientComms.shutdownConnection(token, new MqttException(MqttException.REASON_CODE_CONNECTION_LOST, ioe)); } // else { } } //@TRACE 854=< log.fine(className, methodName, "854"); }
public MqttToken getToken(String key) { return (MqttToken) tokens.get(key); }
public MqttToken removeToken(MqttWireMessage message) { if (message != null) { return removeToken(message.getKey()); } return null; }
/** * Run loop to receive messages from the server. */ public void run() { final String methodName = "run"; MqttToken token = null; while (running && (in != null)) { try { //@TRACE 852=network read message log.fine(CLASS_NAME,methodName,"852"); receiving = in.available() > 0; MqttWireMessage message = in.readMqttWireMessage(); receiving = false; if (message instanceof MqttAck) { token = tokenStore.getToken(message); if (token!=null) { synchronized (token) { // Ensure the notify processing is done under a lock on the token // This ensures that the send processing can complete before the // receive processing starts! ( request and ack and ack processing // can occur before request processing is complete if not! clientState.notifyReceivedAck((MqttAck)message); } } else { // It its an ack and there is no token then something is not right. // An ack should always have a token assoicated with it. throw new MqttException(MqttException.REASON_CODE_UNEXPECTED_ERROR); } } else { // A new message has arrived clientState.notifyReceivedMsg(message); } } catch (MqttException ex) { //@TRACE 856=Stopping, MQttException log.fine(CLASS_NAME,methodName,"856",null,ex); running = false; // Token maybe null but that is handled in shutdown clientComms.shutdownConnection(token, ex); } catch (IOException ioe) { //@TRACE 853=Stopping due to IOException log.fine(CLASS_NAME,methodName,"853"); running = false; // An EOFException could be raised if the broker processes the // DISCONNECT and ends the socket before we complete. As such, // only shutdown the connection if we're not already shutting down. if (!clientComms.isDisconnecting()) { clientComms.shutdownConnection(token, new MqttException(MqttException.REASON_CODE_CONNECTION_LOST, ioe)); } } finally { receiving = false; } } //@TRACE 854=< log.fine(CLASS_NAME,methodName,"854"); }
/** * Called by the CommsReceiver when an ack has arrived. * * @param message * @throws MqttException */ protected void notifyReceivedAck(MqttAck ack) throws MqttException { final String methodName = "notifyReceivedAck"; this.lastInboundActivity = System.currentTimeMillis(); // @TRACE 627=received key={0} message={1} log.fine(CLASS_NAME, methodName, "627", new Object[] { new Integer(ack.getMessageId()), ack }); MqttToken token = tokenStore.getToken(ack); MqttException mex = null; if (ack instanceof MqttPubRec) { // Complete the QoS 2 flow. Unlike all other // flows, QoS is a 2 phase flow. The second phase sends a // PUBREL - the operation is not complete until a PUBCOMP // is received MqttPubRel rel = new MqttPubRel((MqttPubRec) ack); this.send(rel, token); } else if (ack instanceof MqttPubAck || ack instanceof MqttPubComp) { // QoS 1 & 2 notify users of result before removing from // persistence notifyResult(ack, token, mex); // Do not remove publish / delivery token at this stage // do this when the persistence is removed later } else if (ack instanceof MqttPingResp) { synchronized (pingOutstandingLock) { pingOutstanding = Math.max(0, pingOutstanding-1); notifyResult(ack, token, mex); if (pingOutstanding == 0) { tokenStore.removeToken(ack); } } //@TRACE 636=ping response received. pingOutstanding: {0} log.fine(CLASS_NAME,methodName,"636",new Object[]{ new Integer(pingOutstanding)}); } else if (ack instanceof MqttConnack) { int rc = ((MqttConnack) ack).getReturnCode(); if (rc == 0) { synchronized (queueLock) { if (cleanSession) { clearState(); // Add the connect token back in so that users can be // notified when connect completes. tokenStore.saveToken(token,ack); } inFlightPubRels = 0; actualInFlight = 0; restoreInflightMessages(); connected(); } } else { mex = ExceptionHelper.createMqttException(rc); throw mex; } clientComms.connectComplete((MqttConnack) ack, mex); notifyResult(ack, token, mex); tokenStore.removeToken(ack); // Notify the sender thread that there maybe work for it to do now synchronized (queueLock) { queueLock.notifyAll(); } } else { // Sub ack or unsuback notifyResult(ack, token, mex); releaseMessageId(ack.getMessageId()); tokenStore.removeToken(ack); } checkQuiesceLock(); }
/** * Called when waiters and callbacks have processed the message. For * messages where delivery is complete the message can be removed from * persistence and counters adjusted accordingly. Also tidy up by removing * token from store... * * @param message * @throws MqttException */ protected void notifyComplete(MqttToken token) throws MqttException { final String methodName = "notifyComplete"; MqttWireMessage message = token.internalTok.getWireMessage(); if (message != null && message instanceof MqttAck) { // @TRACE 629=received key={0} token={1} message={2} log.fine(CLASS_NAME, methodName, "629", new Object[] { new Integer(message.getMessageId()), token, message }); MqttAck ack = (MqttAck) message; if (ack instanceof MqttPubAck) { // QoS 1 - user notified now remove from persistence... persistence.remove(getSendPersistenceKey(message)); outboundQoS1.remove(new Integer(ack.getMessageId())); decrementInFlight(); releaseMessageId(message.getMessageId()); tokenStore.removeToken(message); // @TRACE 650=removed Qos 1 publish. key={0} log.fine(CLASS_NAME, methodName, "650", new Object[] { new Integer(ack.getMessageId()) }); } else if (ack instanceof MqttPubComp) { // QoS 2 - user notified now remove from persistence... persistence.remove(getSendPersistenceKey(message)); persistence.remove(getSendConfirmPersistenceKey(message)); outboundQoS2.remove(new Integer(ack.getMessageId())); inFlightPubRels--; decrementInFlight(); releaseMessageId(message.getMessageId()); tokenStore.removeToken(message); // @TRACE 645=removed QoS 2 publish/pubrel. key={0}, -1 inFlightPubRels={1} log.fine(CLASS_NAME, methodName, "645", new Object[] { new Integer(ack.getMessageId()), new Integer(inFlightPubRels) }); } checkQuiesceLock(); } }
public MqttToken getToken(String key) { return (MqttToken)tokens.get(key); }
ConnectBG(ClientComms cc, MqttToken cToken, MqttConnect cPacket) { clientComms = cc; conToken = cToken; conPacket = cPacket; cBg = new Thread(this, "MQTT Con: "+getClient().getClientId()); }
DisconnectBG(MqttDisconnect disconnect, long quiesceTimeout, MqttToken token ) { this.disconnect = disconnect; this.quiesceTimeout = quiesceTimeout; this.token = token; }
/** * Based on the message type that has just been received return the associated * token from the token store or null if one does not exist. * * @param message whose token is to be returned * @return token for the requested message */ public MqttToken getToken(MqttWireMessage message) { String key = message.getKey(); return (MqttToken) tokens.get(key); }
/** * Based on the message type that has just been received return the associated * token from the token store or null if one does not exist. * @param message whose token is to be returned * @return token for the requested message */ public MqttToken getToken(MqttWireMessage message) { String key = message.getKey(); return (MqttToken)tokens.get(key); }