/** * 发送一个心跳包,保持长连接 * @return MqttDeliveryToken specified token you can choose to wait for completion */ private synchronized MqttDeliveryToken sendKeepAlive() throws MqttConnectivityException, MqttPersistenceException, MqttException { if(!isConnected()) throw new MqttConnectivityException(); if(mKeepAliveTopic == null) { mKeepAliveTopic = mClient.getTopic(TOPIC); } Log.i(DEBUG_TAG,"Sending Keepalive to " + MQTT_BROKER); MqttMessage message = new MqttMessage(MQTT_KEEP_ALIVE_MESSAGE.getBytes()); message.setQos(MQTT_KEEP_ALIVE_QOS); /**发送一个心跳包给服务器,然后回调到:messageArrived 方法中*/ return mKeepAliveTopic.publish(message); }
/** * Sends a Keep Alive message to the specified topic * @see MQTT_KEEP_ALIVE_MESSAGE * @see MQTT_KEEP_ALIVE_TOPIC_FORMAT * @return MqttDeliveryToken specified token you can choose to wait for completion */ private synchronized MqttDeliveryToken sendKeepAlive() throws MqttConnectivityException, MqttPersistenceException, MqttException { if(!isConnected()) throw new MqttConnectivityException(); if(mKeepAliveTopic == null) { mKeepAliveTopic = mClient.getTopic( String.format(Locale.US, MQTT_KEEP_ALIVE_TOPIC_FORAMT,mDeviceId)); } Log.i(DEBUG_TAG,"Sending Keepalive to " + MQTT_BROKER); MqttMessage message = new MqttMessage(MQTT_KEEP_ALIVE_MESSAGE); message.setQos(MQTT_KEEP_ALIVE_QOS); return mKeepAliveTopic.publish(message); }
/** * @throws Exception */ public void initialise() throws Exception { random = new Random(); subscribedTopics = new HashMap<String, Integer>(); messages = new ArrayList<MqttMessage>(); topics = new ArrayList<String>(); lock = new Object(); retainedPublishes = new HashMap<String, MqttMessage>(); currentTokens = new HashMap<MqttDeliveryToken, String>(); client = clientFactory.createMqttClient(serverURI, CLIENTID); client.setCallback(this); // Clean any hungover state MqttConnectOptions connOpts = new MqttConnectOptions(); connOpts.setCleanSession(true); client.connect(connOpts); client.disconnect(); }
@Override public void publish() { if(mqttClient == null || !mqttClient.isConnected()) { log.debug("MQTT client unavailable"); stateService.setRabbitDown(); return; } Date now = new Date(); String messageId = getMessageId(); String messagePayload = getMessageBody(messageId, now); try { MqttTopic topic = mqttClient.getTopic(rabbitQueueName); MqttMessage mqttMessage = new MqttMessage(messagePayload.getBytes()); mqttMessage.setQos(mqttQos); MqttDeliveryToken token = topic.publish(mqttMessage); if(mqttQos > 0) { token.waitForCompletion(); } log.info("{} [{}] {}", instanceName, messageId, messagePayload); stateService.setRabbitUp(); } catch(MqttException ex) { log.warn("({}) Publish of MQTT message [{}] to RabbitMQ has failed", utils.getPublishedKey(consistencyChecker.getIndex()), messageId); if( ex.getReasonCode() == 32109 ) { log.warn("Connection lost (unsupported QoS mode?)"); } stateService.setRabbitDown(); } }
public void sendMessageToSensor(String data) { if(!isConnected()){ System.out.println("Not connected, aborting"); return; } // setup topic MqttTopic topic = mClient.getTopic(sharedPref.getString("pref_sensor", "")); int pubQoS = 2; MqttMessage message = new MqttMessage(data.getBytes()); message.setQos(pubQoS); message.setRetained(false); // Publish the message System.out.println("Publishing to topic \"" + topic + "\" qos " + pubQoS + " with message " + message.toString()); MqttDeliveryToken token = null; try { // publish message to broker token = topic.publish(message); // Wait until the message has been delivered to the broker token.waitForCompletion(); Thread.sleep(100); } catch (Exception e) { e.printStackTrace(); } }
/** * Start a registered producer, so that it can start sending messages. * * @param publisher * to start. */ private void startProducer(MqttMessageProducer publisher) { logger.trace("Starting message producer for broker '{}'", name); publisher.setSenderChannel(new MqttSenderChannel() { @Override public void publish(String topic, byte[] payload) throws Exception { if (!started) { logger.warn("Broker connection not started. Cannot publish message to topic '{}'", topic); return; } // Create and configure a message MqttMessage message = new MqttMessage(payload); message.setQos(qos); message.setRetained(retain); // publish message asynchronously MqttTopic mqttTopic = client.getTopic(topic); MqttDeliveryToken deliveryToken = mqttTopic.publish(message); logger.debug("Publishing message {} to topic '{}'", deliveryToken.getMessageId(), topic); if (!async) { // wait for publish confirmation deliveryToken.waitForCompletion(10000); if (!deliveryToken.isComplete()) { logger.error( "Did not receive completion message within timeout limit whilst publishing to topic '{}'", topic); } } } }); }
/** * Publishes to the given topic * @param topic the topic to publish to * @param payload the payload to publish * @param qos the qos to publish at * @param retained whether to publish retained * @param subscribed whether we think we're currently subscribed to the topic * @param waitForCompletion whether we should wait for the message to complete delivery * @throws Exception */ public void publish(String topic, String payload, int qos, boolean retained, boolean subscribed, boolean waitForCompletion) throws Exception { logToFile("publish [topic:" + topic + "][payload:" + payload + "][qos:" + qos + "][retained:" + retained + "][subscribed:" + subscribed + "][waitForCompletion:" + waitForCompletion + "]"); if (subscribed != subscribedTopics.containsKey(topic)) { throw new Exception("Subscription state mismatch [topic:" + topic + "][expected:" + subscribed + "]"); } MqttMessage msg = new MqttMessage(payload.getBytes()); msg.setQos(qos); msg.setRetained(retained); if (retained) { retainedPublishes.put(topic, msg); } MqttDeliveryToken token = client.getTopic(topic).publish(msg); synchronized (currentTokens) { if (!token.isComplete()) { currentTokens.put(token, "[" + topic + "][" + msg.toString() + "]"); } } if (retained || waitForCompletion) { token.waitForCompletion(); synchronized (currentTokens) { currentTokens.remove(token); } } if (subscribed) { waitForMessage(topic, msg, false); } }
private void handleActionComplete(MqttToken token) throws MqttException { final String methodName = "handleActionComplete"; synchronized (token) { // @TRACE 705=callback and notify for key={0} log.fine(CLASS_NAME, methodName, "705", new Object[] { token.internalTok.getKey() }); // Unblock any waiters and if pending complete now set completed token.internalTok.notifyComplete(); if (!token.internalTok.isNotified()) { // If a callback is registered and delivery has finished // call delivery complete callback. if ( mqttCallback != null && token instanceof MqttDeliveryToken && token.isComplete()) { mqttCallback.deliveryComplete((MqttDeliveryToken) token); } // Now call async action completion callbacks fireActionEvent(token); } // Set notified so we don't tell the user again about this action. if ( token.isComplete() ){ if ( token instanceof MqttDeliveryToken || token.getActionCallback() instanceof IMqttActionListener ) { token.internalTok.setNotified(true); } } if (token.isComplete()) { // Finish by doing any post processing such as delete // from persistent store but only do so if the action // is complete clientState.notifyComplete(token); } } }
/** * Called during shutdown to work out if there are any tokens still * to be notified and waiters to be unblocked. Notifying and unblocking * takes place after most shutdown processing has completed. The tokenstore * is tidied up so it only contains outstanding delivery tokens which are * valid after reconnect (if clean session is false) * @param reason The root cause of the disconnection, or null if it is a clean disconnect */ public Vector resolveOldTokens(MqttException reason) { final String methodName = "resolveOldTokens"; //@TRACE 632=reason {0} log.fine(CLASS_NAME,methodName,"632", new Object[] {reason}); // If any outstanding let the user know the reason why it is still // outstanding by putting the reason shutdown is occurring into the // token. MqttException shutReason = reason; if (reason == null) { shutReason = new MqttException(MqttException.REASON_CODE_CLIENT_DISCONNECTING); } // Set the token up so it is ready to be notified after disconnect // processing has completed. Do not // remove the token from the store if it is a delivery token, it is // valid after a reconnect. Vector outT = tokenStore.getOutstandingTokens(); Enumeration outTE = outT.elements(); while (outTE.hasMoreElements()) { MqttToken tok = (MqttToken)outTE.nextElement(); synchronized (tok) { if (!tok.isComplete() && !tok.internalTok.isCompletePending() && tok.getException() == null) { tok.internalTok.setException(shutReason); } } if (!(tok instanceof MqttDeliveryToken)) { // If not a delivery token it is not valid on // restart so remove tokenStore.removeToken(tok.internalTok.getKey()); } } return outT; }
@Override public void deliveryComplete(MqttDeliveryToken token) { // We do not need this because we do not publish }
/** * Start a registered producer, so that it can start sending messages. * * @param publisher * to start. */ private void startProducer(MqttMessageProducer publisher) { logger.trace("Starting message producer for broker '{}'", name); publisher.setSenderChannel(new MqttSenderChannel() { @Override public void publish(String topic, byte[] payload) throws Exception { if (!started) { logger.warn( "Broker connection not started. Cannot publish message to topic '{}'", topic); return; } // Create and configure a message MqttMessage message = new MqttMessage(payload); message.setQos(qos); message.setRetained(retain); // publish message asynchronously MqttTopic mqttTopic = client.getTopic(topic); MqttDeliveryToken deliveryToken = mqttTopic.publish(message); logger.debug("Publishing message {} to topic '{}'", deliveryToken.getMessageId(), topic); if (!async) { // wait for publish confirmation deliveryToken.waitForCompletion(10000); if (!deliveryToken.isComplete()) { logger.error( "Did not receive completion message within timeout limit whilst publishing to topic '{}'", topic); } } } }); }
@Override public void deliveryComplete(MqttDeliveryToken token) { //We do not need this because we do not publish }
@Override public void deliveryComplete(MqttDeliveryToken arg0) { }
/** * Publish Message Completion */ @Override public void deliveryComplete(MqttDeliveryToken arg0) { }
/** * Logs that a publish has completed (an acknowledgement has been received from the broker */ public void deliveryComplete(MqttDeliveryToken token) { syncOut(getDate() + Messages.MqttClientView_56); }
public MqttDeliveryToken[] getPendingDeliveryTokens() { return tokenStore.getOutstandingDelTokens(); }
@Override public void deliveryComplete(MqttDeliveryToken token) { //no-op }