1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111
| package com.urbmn.deviceCommunity.emqx;
import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component;
@Slf4j @Component public class EmqxMqttClient { @Value("${emqx.broker}") private String broker;
@Value("${emqx.userName}") private String userName;
@Value("${emqx.passwd}") private String passwd;
@Value("${emqx.clientId}") private String clientId;
@Value("${emqx.topics.SYS_BROKERS}") private String SYS_BROKERS;
@Value("${emqx.topics.SYS_TOPIC_CONNECTED}") private String SYS_TOPIC_CONNECTED;
@Value("${emqx.topics.SYS_TOPIC_DISCONNECTED}") private String SYS_TOPIC_DISCONNECTED;
@Value("${emqx.topics.TOPIC_DEVICE_REQUEST}") private String TOPIC_DEVICE_REQUEST;
@Value("${emqx.topics.TOPIC_DEVICE_RESPONSE}") private String TOPIC_DEVICE_RESPONSE;
private MqttAsyncClient client;
public void init(String broker, String clientId, MemoryPersistence persistence) { try { this.client = new MqttAsyncClient(broker, clientId, persistence); } catch (MqttException e) { log.error("\n### [EmqxMqttClient - init] error:", e); } }
public MqttAsyncClient getClient(String broker, String clientId) { MemoryPersistence persistence = new MemoryPersistence(); this.init(broker, clientId, persistence); return this.client; }
public MqttAsyncClient getClient() { this.client = getClient(broker, clientId); return this.client; }
public MqttConnectOptions getOptions() { MqttConnectOptions options = new MqttConnectOptions(); options.setUserName(userName); options.setPassword(passwd.toCharArray()); options.setCleanSession(false); options.setConnectionTimeout(90); options.setKeepAliveInterval(60); options.setMaxInflight(10000); options.setAutomaticReconnect(true); return options; }
public void connect(EmqxMqttCallback callback) throws MqttException { MqttAsyncClient client = this.getClient(); client.setCallback(callback); log.info("Connecting to broker: " + broker); IMqttToken token = client.connect(this.getOptions()); token.waitForCompletion(); log.info("Connected to broker: " + broker); }
public void subscribe() throws MqttException { String[] topics = {SYS_TOPIC_CONNECTED, SYS_TOPIC_DISCONNECTED, TOPIC_DEVICE_REQUEST}; int[] qoss = {2, 2, 2}; this.client.subscribe(topics, qoss); }
public void publish(String topic, MqttMessage message) { try { IMqttDeliveryToken token = this.client.publish(topic, message); token.waitForCompletion(); token.getResponse(); if (token.isComplete()) { log.info("\n##### EMQX - MQTT ##### publish complete,topic: {}, content: {}", topic, message.toString()); } } catch (Exception e) { log.error("\n### [EMQX mqtt publish error] Exception: ", e); } } }
|