/** * This marks the Indego device as offline. * * @param mqttClient the connection to use * @throws MqttPersistenceException * @throws MqttException */ private void pushMqttStateOffline (MqttClient mqttClient) throws MqttPersistenceException, MqttException { LOG.info("Pushing offline state to MQTT"); publish(mqttClient, MQTT_TOPIC_ONLINE, false, true); publish(mqttClient, MQTT_TOPIC_STATE_CODE, 0, RETAINMENT); publish(mqttClient, MQTT_TOPIC_STATE_MESSAGE, "", RETAINMENT); publish(mqttClient, MQTT_TOPIC_ERROR_CODE, 0, RETAINMENT); publish(mqttClient, MQTT_TOPIC_STATE_LEVEL, -2, RETAINMENT); publish(mqttClient, MQTT_TOPIC_MOWED_PERCENTAGE, 0, RETAINMENT); publish(mqttClient, MQTT_TOPIC_MAP_SVG_CACHE_TS, 0, RETAINMENT); publish(mqttClient, MQTT_TOPIC_MAP_UPDATE_AVAILABLE, false, RETAINMENT); publish(mqttClient, MQTT_TOPIC_MOWED_TS, 0, RETAINMENT); publish(mqttClient, MQTT_TOPIC_RUNTIME_TOTAL_OPERATE_MINS, 0, RETAINMENT); publish(mqttClient, MQTT_TOPIC_RUNTIME_TOTAL_CHARGE_MINS, 0, RETAINMENT); publish(mqttClient, MQTT_TOPIC_RUNTIME_SESSION_OPERATE_MINS, 0, RETAINMENT); publish(mqttClient, MQTT_TOPIC_RUNTIME_SESSION_CHARGE_MINS, 0, RETAINMENT); }
/** * 发送一个心跳包,保持长连接 * @return MqttDeliveryToken specified token you can choose to wait for completion */ private synchronized MqttDeliveryToken sendKeepAlive() throws MqttConnectivityException, MqttPersistenceException, MqttException { if(!isConnected()) throw new MqttConnectivityException(); if(mKeepAliveTopic == null) { mKeepAliveTopic = mClient.getTopic(TOPIC); } Log.i(DEBUG_TAG,"Sending Keepalive to " + MQTT_BROKER); MqttMessage message = new MqttMessage(MQTT_KEEP_ALIVE_MESSAGE.getBytes()); message.setQos(MQTT_KEEP_ALIVE_QOS); /**发送一个心跳包给服务器,然后回调到:messageArrived 方法中*/ return mKeepAliveTopic.publish(message); }
public MqttPersistable get(String key) throws MqttPersistenceException { checkIsOpen(); MqttPersistable result; try { File file = new File(clientDir, key + MESSAGE_FILE_EXTENSION); FileInputStream fis = new FileInputStream(file); int size = fis.available(); byte[] data = new byte[size]; int read = 0; while (read < size) { read += fis.read(data, read, size - read); } fis.close(); result = new MqttPersistentData(key, data, 0, data.length, null, 0, 0); } catch (IOException ex) { throw new MqttPersistenceException(ex); } return result; }
/** * Identifies any backup files in the specified directory and restores them * to their original file. This will overwrite any existing file of the same * name. This is safe as a stray backup file will only exist if a problem * occured whilst writing to the original file. * * @param dir The directory in which to scan and restore backups */ private void restoreBackups(File dir) throws MqttPersistenceException { File[] files = dir.listFiles(new FileFilter() { public boolean accept(File f) { return f.getName().endsWith(MESSAGE_BACKUP_FILE_EXTENSION); } }); if (files == null) { throw new MqttPersistenceException(); } for (int i = 0; i < files.length; i++) { File originalFile = new File(dir, files[i].getName().substring(0, files[i].getName().length() - MESSAGE_BACKUP_FILE_EXTENSION.length())); boolean result = files[i].renameTo(originalFile); if (!result) { originalFile.delete(); files[i].renameTo(originalFile); } } }
/** * Sends a Keep Alive message to the specified topic * @see MQTT_KEEP_ALIVE_MESSAGE * @see MQTT_KEEP_ALIVE_TOPIC_FORMAT * @return MqttDeliveryToken specified token you can choose to wait for completion */ private synchronized MqttDeliveryToken sendKeepAlive() throws MqttConnectivityException, MqttPersistenceException, MqttException { if(!isConnected()) throw new MqttConnectivityException(); if(mKeepAliveTopic == null) { mKeepAliveTopic = mClient.getTopic( String.format(Locale.US, MQTT_KEEP_ALIVE_TOPIC_FORAMT,mDeviceId)); } Log.i(DEBUG_TAG,"Sending Keepalive to " + MQTT_BROKER); MqttMessage message = new MqttMessage(MQTT_KEEP_ALIVE_MESSAGE); message.setQos(MQTT_KEEP_ALIVE_QOS); return mKeepAliveTopic.publish(message); }
public MqttPersistable get(String key) throws MqttPersistenceException { checkIsOpen(); MqttPersistable result; try { File file = new File(clientDir, key+MESSAGE_FILE_EXTENSION); FileInputStream fis = new FileInputStream(file); int size = fis.available(); byte[] data = new byte[size]; int read = 0; while (read<size) { read += fis.read(data,read,size-read); } fis.close(); result = new MqttPersistentData(key, data, 0, data.length, null, 0, 0); } catch(IOException ex) { throw new MqttPersistenceException(ex); } return result; }
/** * Identifies any backup files in the specified directory and restores them * to their original file. This will overwrite any existing file of the same * name. This is safe as a stray backup file will only exist if a problem * occured whilst writing to the original file. * @param dir The directory in which to scan and restore backups */ private void restoreBackups(File dir) throws MqttPersistenceException { File[] files = dir.listFiles(new FileFilter() { public boolean accept(File f) { return f.getName().endsWith(MESSAGE_BACKUP_FILE_EXTENSION); } }); if (files == null) { throw new MqttPersistenceException(); } for (int i=0;i<files.length;i++) { File originalFile = new File(dir,files[i].getName().substring(0,files[i].getName().length()-MESSAGE_BACKUP_FILE_EXTENSION.length())); boolean result = files[i].renameTo(originalFile); if (!result) { originalFile.delete(); files[i].renameTo(originalFile); } } }
/** * Start the connect processing * @throws MqttPersistenceException */ public void connect() throws MqttPersistenceException { MqttToken token = new MqttToken(client.getClientId()); token.setActionCallback(this); token.setUserContext(this); persistence.open(client.getClientId(), client.getServerURI()); if (options.isCleanSession()) { persistence.clear(); } if (options.getMqttVersion() == MqttConnectOptions.MQTT_VERSION_DEFAULT) { options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1); } try { comms.connect(options, token); } catch (MqttException e) { onFailure(token, e); } }
/** * This removes the MqttSend message from the outbound queue and persistence. * @param message * @throws MqttPersistenceException */ protected void undo(MqttPublish message) throws MqttPersistenceException { final String methodName = "undo"; synchronized (queueLock) { //@TRACE 618=key={0} QoS={1} log.fine(CLASS_NAME,methodName,"618", new Object[]{new Integer(message.getMessageId()), new Integer(message.getMessage().getQos())}); if (message.getMessage().getQos() == 1) { outboundQoS1.remove(new Integer(message.getMessageId())); } else { outboundQoS2.remove(new Integer(message.getMessageId())); } pendingMessages.removeElement(message); persistence.remove(getSendPersistenceKey(message)); tokenStore.removeToken(message); checkQuiesceLock(); } }
public MqttPersistable get(String key) throws MqttPersistenceException { checkIsOpen(); MqttPersistable result; try { FileConnection file = (FileConnection) Connector.open(clientDir.getURL() + key + MESSAGE_FILE_EXTENSION); DataInputStream fis = file.openDataInputStream(); int size = fis.available(); byte[] data = new byte[size]; int read = 0; while (read<size) { read += fis.read(data,read,size-read); } fis.close(); result = new MqttPersistentData(key, data, 0, data.length, null, 0, 0); } catch(IOException ex) { throw new MqttPersistenceException(ex); } return result; }
/** * disconnectClient stops any connectionLost processing that is happening * and disconnects the TCP/IP socket connection. * @throws MqttException */ public boolean disconnectClient() throws MqttException { synchronized( connLock ) { connLockNotified = true; connLock.notify(); } try { wmqttClient.disconnect(); } catch( MqttPersistenceException mqpe ) { // Persistence is not used } connected = false; return true; }
private void publish() throws MqttException, MqttPersistenceException { for (final String otherClient : otherClients) { final String topic2Mqtt = pubTopic2Mqtt + otherClient + PUBPOSTFIX; LOG.info("{} publishing \"{}\" to topic {}", clientId, message, topic2Mqtt); mqttAsyncClient.publish(topic2Mqtt, (LocalDateTime.now() + message).getBytes(), 1, false); } final String topic2Kafka = KAFKABASETOPIC.replace("{p}", clientId); LOG.info("{} publishing \"{}\" to topic {}", clientId, message, topic2Kafka); mqttAsyncClient.publish(topic2Kafka, (LocalDateTime.now() + message).getBytes(), 1, false); }
/** * Publishes a single topic on the MQTT broker * * @param mqttClient the broker connection * @param topic the topic to publish (relative to configured topic root) * @param data the data to publish * @param retained if the data should be retained * @throws MqttPersistenceException * @throws MqttException */ private void publish (MqttClient mqttClient, String topic, String data, boolean retained) throws MqttPersistenceException, MqttException { if ( LOG.isDebugEnabled() ) { LOG.debug(String.format("Publishing '%s' to topic '%s' (retained = %s)", data, topic, retained)); } MqttMessage msg = new MqttMessage(data.getBytes()); msg.setQos(configuration.getMqttQos()); msg.setRetained(retained); mqttClient.publish(configuration.getMqttTopicRoot() + topic, msg); }
private void handleCommand(String command) throws SparkplugException, MqttPersistenceException, MqttException, IOException { String [] tokens = command.split(" "); if (tokens.length > 0) { String cmd = tokens[0]; if (cmd.equals("")) { return; } if (cmd.equals("?") || cmd.toLowerCase().equals("help")) { // Help with commands System.out.println("\nCOMMANDS"); System.out.println(" - rebirth: Publishes a rebirth command to the Edge Node"); System.out.println(" usage: rebirth"); return; } else if (cmd.toLowerCase().equals("rebirth")) { // Issue a rebirth client.publish(NAMESPACE + "/" + groupId + "/NCMD/" + edgeNode, new SparkplugBPayloadEncoder().getBytes(new SparkplugBPayloadBuilder() .addMetric(new MetricBuilder("Node Control/Rebirth", MetricDataType.Boolean, true) .createMetric()) .createPayload()), 0, false); return; } } System.out.println("\nInvalid command: " + command); }
private void publishClientStatus(Boolean state) throws MqttException, MqttPersistenceException { if (!nullOrEmpty(publishClientInfoTopic)) { client.publish(publishClientInfoTopic, state.toString() .getBytes(), 0, RETAINED); } }
@Override public void open(String clientId, String serverURI) throws MqttPersistenceException { if (clientId == null || serverURI == null) throw new MqttPersistenceException(); if (dataStore == null) { this.serverURI = serverURI; this.clientId = clientId; dataStore = new Hashtable<String, MqttPersistable> (); } }
public void close() throws MqttPersistenceException { synchronized (this) { // checkIsOpen(); if (fileLock != null) { fileLock.release(); } if (getFiles().length == 0) { clientDir.delete(); } clientDir = null; } }
/** * Deletes the data with the specified key from the previously specified persistence directory. */ public void remove(String key) throws MqttPersistenceException { checkIsOpen(); File file = new File(clientDir, key + MESSAGE_FILE_EXTENSION); if (file.exists()) { file.delete(); } }
/** * Returns all of the persistent data from the previously specified persistence directory. * * @return all of the persistent data from the persistence directory. * @throws MqttPersistenceException */ public Enumeration keys() throws MqttPersistenceException { checkIsOpen(); File[] files = getFiles(); Vector result = new Vector(files.length); for (int i = 0; i < files.length; i++) { String filename = files[i].getName(); String key = filename.substring(0, filename.length() - MESSAGE_FILE_EXTENSION.length()); result.addElement(key); } return result.elements(); }
private File[] getFiles() throws MqttPersistenceException { checkIsOpen(); File[] files = clientDir.listFiles(FILE_FILTER); if (files == null) { throw new MqttPersistenceException(); } return files; }
public void clear() throws MqttPersistenceException { checkIsOpen(); File[] files = getFiles(); for (int i = 0; i < files.length; i++) { files[i].delete(); } }
public byte[] getHeaderBytes() throws MqttPersistenceException { try { return getHeader(); } catch (MqttException ex) { throw new MqttPersistenceException(ex.getCause()); } }
public byte[] getPayloadBytes() throws MqttPersistenceException { try { return getPayload(); } catch (MqttException ex) { throw new MqttPersistenceException(ex.getCause()); } }
/** * Initalizes the DeviceId and most instance variables * Including the Connection Handler, Datastore, Alarm Manager * and ConnectivityManager. */ @Override public void onCreate() { super.onCreate(); mDeviceId = String.format(DEVICE_ID_FORMAT, Secure.getString(getContentResolver(), Secure.ANDROID_ID)); HandlerThread thread = new HandlerThread(MQTT_THREAD_NAME); thread.start(); mConnHandler = new Handler(thread.getLooper()); try { mDataStore = new MqttDefaultFilePersistence(getCacheDir().getAbsolutePath()); } catch(MqttPersistenceException e) { e.printStackTrace(); mDataStore = null; mMemStore = new MemoryPersistence(); } mOpts = new MqttConnectOptions(); mOpts.setCleanSession(MQTT_CLEAN_SESSION); // Do not set keep alive interval on mOpts we keep track of it with alarm's mAlarmManager = (AlarmManager) getSystemService(ALARM_SERVICE); mConnectivityManager = (ConnectivityManager) getSystemService(CONNECTIVITY_SERVICE); }
@CliCommand(value = { "publish" }, help = "Publish a message to an MQTT Broker") public String publish( @CliOption(key = { "topic" }, mandatory = true) String topic, @CliOption(key = { "", "message" }, mandatory = true) String payload, @CliOption(key = { "qos" }, mandatory = false, specifiedDefaultValue="1", unspecifiedDefaultValue="0") int qos, @CliOption(key = { "retained" }, mandatory = false, specifiedDefaultValue="true", unspecifiedDefaultValue="false") boolean retained) throws MqttPersistenceException, MqttException { MqttMessage.validateQos(qos); client.publish(topic, payload.getBytes(), qos, retained); return (verbose) ? String.format("(sent %s)", payload) : null; }
/** * Deletes the data with the specified key from the previously specified persistence directory. */ public void remove(String key) throws MqttPersistenceException { checkIsOpen(); File file = new File(clientDir, key+MESSAGE_FILE_EXTENSION); if (file.exists()) { file.delete(); } }
/** * Returns all of the persistent data from the previously specified persistence directory. * @return all of the persistent data from the persistence directory. * @throws MqttPersistenceException */ public Enumeration keys() throws MqttPersistenceException { checkIsOpen(); File[] files = getFiles(); Vector result = new Vector(files.length); for (int i=0;i<files.length;i++) { String filename = files[i].getName(); String key = filename.substring(0,filename.length()-MESSAGE_FILE_EXTENSION.length()); result.addElement(key); } return result.elements(); }
public void clear() throws MqttPersistenceException { checkIsOpen(); File[] files = getFiles(); for (int i=0; i<files.length; i++) { files[i].delete(); } }
protected void deliveryComplete(MqttPublish message) throws MqttPersistenceException { final String methodName = "deliveryComplete"; //@TRACE 641=remove publish from persistence. key={0} log.fine(CLASS_NAME,methodName,"641", new Object[]{new Integer(message.getMessageId())}); persistence.remove(getReceivedPersistenceKey(message)); inboundQoS2.remove(new Integer(message.getMessageId())); }
public void close() throws MqttPersistenceException { // checkIsOpen(); //if (fileLock != null) { // fileLock.release(); //} if (!getFiles().hasMoreElements()) { try { clientDir.delete(); } catch (IOException e) { throw new MqttPersistenceException(); } } clientDir = null; }
private Enumeration getFiles() throws MqttPersistenceException { checkIsOpen(); Enumeration files; try { files = clientDir.list("*" + MESSAGE_FILE_EXTENSION, false); } catch (IOException e) { throw new MqttPersistenceException(); } if (files == null) { throw new MqttPersistenceException(); } return files; }
/** * Deletes the data with the specified key from the previously specified persistence directory. */ public void remove(String key) throws MqttPersistenceException { checkIsOpen(); try { FileConnection file = (FileConnection) Connector.open(clientDir.getURL() + key + MESSAGE_FILE_EXTENSION);; if (file.exists()) { file.delete(); } } catch(IOException ex) { throw new MqttPersistenceException(ex); } }
/** * Returns all of the persistent data from the previously specified persistence directory. * @return all of the persistent data from the persistence directory. * @throws MqttPersistenceException */ public Enumeration keys() throws MqttPersistenceException { checkIsOpen(); Enumeration files = getFiles(); Vector result = new Vector(); while (files.hasMoreElements()) { String filename = (String) files.nextElement(); String key = filename.substring(0,filename.length()-MESSAGE_FILE_EXTENSION.length()); result.addElement(key); } return result.elements(); }
public boolean containsKey(String key) throws MqttPersistenceException { checkIsOpen(); try { FileConnection file =(FileConnection) Connector.open(clientDir.getURL() + key + MESSAGE_FILE_EXTENSION); return file.exists(); } catch(IOException ex) { throw new MqttPersistenceException(ex); } }
public void clear() throws MqttPersistenceException { checkIsOpen(); Enumeration files = getFiles(); while (files.hasMoreElements()) { try { FileConnection next = (FileConnection) Connector.open(clientDir.getURL()+files.nextElement()); next.delete(); } catch(IOException ex) { throw new MqttPersistenceException(ex); } } }
/** * This writes the given device state to the data topics and sets the online state topic to true. * * @param mqttClient the connection to use * @param state the Indego state to write out * @throws MqttPersistenceException * @throws MqttException */ private void pushMqttStateOnline (MqttClient mqttClient, DeviceStateInformation state) throws MqttPersistenceException, MqttException { LOG.info("Pushing online state to MQTT"); DeviceStatus status = DeviceStatus.decodeStatusCode(state.getState()); int stateLevel; switch ( status.getAssociatedCommand() ) { case MOW: stateLevel = 2; break; case PAUSE: stateLevel = 1; break; case RETURN: stateLevel = 0; break; default: stateLevel = -1; break; } if ( state.getError() != 0 ) { stateLevel = -1; } publish(mqttClient, MQTT_TOPIC_ONLINE, true, true); publish(mqttClient, MQTT_TOPIC_STATE_CODE, status.getCode(), RETAINMENT); publish(mqttClient, MQTT_TOPIC_STATE_MESSAGE, status.getMessage(), RETAINMENT); publish(mqttClient, MQTT_TOPIC_ERROR_CODE, state.getError(), RETAINMENT); publish(mqttClient, MQTT_TOPIC_STATE_LEVEL, stateLevel, RETAINMENT); publish(mqttClient, MQTT_TOPIC_MOWED_PERCENTAGE, state.getMowed(), RETAINMENT); publish(mqttClient, MQTT_TOPIC_MAP_SVG_CACHE_TS, state.getMapSvgCacheTimestamp(), RETAINMENT); publish(mqttClient, MQTT_TOPIC_MAP_UPDATE_AVAILABLE, state.isMapUpdateAvailable(), RETAINMENT); publish(mqttClient, MQTT_TOPIC_MOWED_TS, state.getMowedTimestamp(), RETAINMENT); publish(mqttClient, MQTT_TOPIC_MOW_MODE, state.getMowMode(), RETAINMENT); publish(mqttClient, MQTT_TOPIC_RUNTIME_TOTAL_OPERATE_MINS, state.getRuntime().getTotal().getOperate(), RETAINMENT); publish(mqttClient, MQTT_TOPIC_RUNTIME_TOTAL_CHARGE_MINS, state.getRuntime().getTotal().getCharge(), RETAINMENT); publish(mqttClient, MQTT_TOPIC_RUNTIME_SESSION_OPERATE_MINS, state.getRuntime().getSession().getOperate(), RETAINMENT); publish(mqttClient, MQTT_TOPIC_RUNTIME_SESSION_CHARGE_MINS, state.getRuntime().getSession().getCharge(), RETAINMENT); }