使用SpringBoot的线程池和异步方法解决Mqtt掉线问题
Xplorist Lv6

使用SpringBoot的线程池和异步方法解决Mqtt掉线问题

目录

  • x
  • x
  • x

方案简述

  • 问题原因:SpringBoot后端服务中集成的mqtt客户端,如果mqtt客户端无法及时处理消息队列,当长时间处于等待状态后会导致超时mqtt连接断开,即使重连,又会因为等待超时的原因而再次掉线。
  • 解决方案:从上面的问题原因可知,解决此问题的核心是加快mqtt客户端处理消息的能力,因SpringBoot自带的线程池和异步方法处理多线程的性能很好,且在SpringBoot项目中集成简单方便,故作为解决此问题的核心技术方案。
  • 核心要点
    1. 容量足够大的线程池:这样有利于提高并发量,充分利用CPU的时间分片。
    2. 异步方法:主线程调用异步方法后直接进入创建子线程去处理异步方法中的流程,主线程跳过等待子线程执行结束直接进行后面的流程,这样可以快速结束当前主线程,然后接收下一个mqtt消息并处理。
    3. 依赖注入:使用Spring的IoC机制对类的对象创建进行管理,所有要使用Spring的 @Async 注解的类都要通过@Service或@Component之类的注解来进入Spring的bean对象管理,只有这样才能使用AOP的机制,从而使用异步方法。
    4. AOP机制:使用 @Async 注解的方法A 不能调用在同一个类中的同样用 @Async 修饰的方法B,这样会导致方法B的异步失效变成同步,解决方法是将方法B放进另外一个类中。简述原因,因为AOP的动态代理机制导致无法在同一个类中异步方法调用异步方法。

核心代码

SpringBoot配置

  • SpringBoot的启动类添加 @EnableAsync 注解
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package com.urbmn.deviceCommunity;

import lombok.extern.slf4j.Slf4j;
import org.apache.catalina.connector.Connector;
import org.apache.coyote.ProtocolHandler;
import org.apache.coyote.http11.AbstractHttp11Protocol;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.web.embedded.tomcat.TomcatConnectorCustomizer;
import org.springframework.boot.web.embedded.tomcat.TomcatServletWebServerFactory;
import org.springframework.boot.web.server.WebServerFactoryCustomizer;
import org.springframework.boot.web.servlet.server.ConfigurableServletWebServerFactory;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.session.data.redis.config.annotation.web.http.EnableRedisHttpSession;

/**
*
*/
@EnableAsync
@EnableRedisHttpSession
@EnableFeignClients
@EnableDiscoveryClient
@SpringBootApplication
@MapperScan(basePackages = "com.urbmn.deviceCommunity.mapper")
@Configuration
@Slf4j
public class DeviceCommunityApplication implements WebServerFactoryCustomizer<ConfigurableServletWebServerFactory> {

public static void main(String[] args) {
SpringApplication.run(DeviceCommunityApplication.class, args);
}

@Override
public void customize(ConfigurableServletWebServerFactory factory) {
/*TomcatServletWebServerFactory f = (TomcatServletWebServerFactory) factory;
f.setProtocol("org.apache.coyote.http11.Http11Nio2Protocol");
public static void main(String[] args) {
SpringApplication.run(ManagerApplication.class, args);
}

f.addConnectorCustomizers(c -> {
Http11NioProtocol protocol = (Http11NioProtocol) c.getProtocolHandler();
protocol.setMaxConnections(200);
protocol.setMaxThreads(200);
protocol.setSelectorTimeout(3000);
protocol.setSessionTimeout(3000);
protocol.setConnectionTimeout(3000);
});*/
((TomcatServletWebServerFactory) factory).setProtocol("org.apache.coyote.http11.Http11Nio2Protocol");
((TomcatServletWebServerFactory) factory).addConnectorCustomizers(new TomcatConnectorCustomizer() {
@Override
public void customize(Connector connector) {
ProtocolHandler protocol = connector.getProtocolHandler();
log.info("########### Tomcat({}) -- MaxConnection:{};MaxThreads:{};MinSpareThreads:{}", //
protocol.getClass().getName(), //
((AbstractHttp11Protocol<?>) protocol).getMaxConnections(), //
((AbstractHttp11Protocol<?>) protocol).getMaxThreads(), //
((AbstractHttp11Protocol<?>) protocol).getMinSpareThreads());

}
});
}
}

  • 添加SpringBoot异步方法配置类,配置异步方法的线程池
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
package com.urbmn.deviceCommunity.config;


import lombok.extern.slf4j.Slf4j;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.lang.reflect.Method;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;

@Slf4j
@Configuration
@EnableAsync
public class AsyncConfig implements AsyncConfigurer {
@Override
public Executor getAsyncExecutor() {
int corePoolSize = Runtime.getRuntime().availableProcessors() * 100;
int maxPoolSize = Runtime.getRuntime().availableProcessors() * 1200;
int queueCapacity = Runtime.getRuntime().availableProcessors() * 2400;
log.info("\n### [AsyncConfig] corePoolSize: {}, maxPoolSize: {}, queueCapacity: {}", corePoolSize, maxPoolSize, queueCapacity);
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 核心线程池数量,方法: 返回可用处理器的Java虚拟机的数量。
executor.setCorePoolSize(corePoolSize);
// 最大线程数量
executor.setMaxPoolSize(maxPoolSize);
// 线程池的队列容量
executor.setQueueCapacity(queueCapacity);
// 线程名称的前缀
executor.setThreadNamePrefix("Executor-");
// setRejectedExecutionHandler:当pool已经达到max size的时候,如何处理新任务
// CallerRunsPolicy:不在新线程中执行任务,而是由调用者所在的线程来执行
// executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 设置拒绝策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 等待所有任务结束后再关闭线程池
//executor.setWaitForTasksToCompleteOnShutdown(true);
executor.initialize();
return executor;
}

/* 异步任务中异常处理 */
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return (Throwable exception, Method method, Object... params) -> {
log.error("ClassName: {}, MethodName: {}", method.getDeclaringClass().getName(), method.getName());
log.error("exceptionName: {}", exception.getClass().getName());
log.error("exceptionMessage: {}", exception.getMessage());
};
}
}

MQTT配置

  • mqtt跟随SpringBoot启动
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package com.urbmn.deviceCommunity;

import com.urbmn.deviceCommunity.emqx.EmqxMqttConnection;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

@Component
@Slf4j
public class EmqxApplicationRunner implements ApplicationRunner {
@Resource
private EmqxMqttConnection emqxConnection;

@Override
public void run(ApplicationArguments args) throws Exception {
log.info("##################################################");
log.info("################### EmqxApplicationRunner... ...");
log.info("##################################################");

emqxConnection.create();

}
}
  • mqtt连接类,关联mqtt客户端和mqtt回调处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package com.urbmn.deviceCommunity.emqx;

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

@Slf4j
@Component
public class EmqxMqttConnection {
/*
将EmqxMqttClient和EmqxMqttCallback拆开,目的是为了避免循环依赖,不然可以直接将connect的过程放进client的方法中执行
类A和类B循环依赖的解决办法:建立第三个类C来进行两个类的关联,使得类A不必依赖注入类B,这时即使类B依赖链条中会注入类A,
B依赖于A,但A已经不依赖于B,循环依赖已经被破坏
*/
@Resource
private EmqxMqttClient emqxMqttClient;

@Resource
private EmqxMqttCallback emqxMqttCallback;

public void create() {
try {
emqxMqttClient.connect(emqxMqttCallback);
emqxMqttClient.subscribe();
} catch (MqttException exception) {
log.error("\n### [EmqxMqttConnection create] error: ", exception);
}
}
}

  • SpringBoot配置文件中的mqtt配置

bootstrap.yml 或 application.yml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
emqx:
broker: tcp://192.168.10.201:1883
userName: emqx
passwd: emqx@2021
clientId: DeviceCommunityMqtt
topics:
SYS_BROKERS: $SYS/brokers/
SYS_TOPIC_CONNECTED: $SYS/brokers/+/clients/+/connected
SYS_TOPIC_DISCONNECTED: $SYS/brokers/+/clients/+/disconnected
TOPIC_DEVICE_REQUEST: MAC/#
TOPIC_DEVICE_RESPONSE: CMD/
kickOutDevice:
url: http://192.168.10.201:18083/api/v4/clients/
user: admin
pass: public
getDeviceDataTraffic:
url: http://192.168.10.201:18083/api/v4/clients/username/device

mqtt客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
package com.urbmn.deviceCommunity.emqx;

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class EmqxMqttClient {
@Value("${emqx.broker}")
private String broker;

@Value("${emqx.userName}")
private String userName;

@Value("${emqx.passwd}")
private String passwd;

@Value("${emqx.clientId}")
private String clientId;

@Value("${emqx.topics.SYS_BROKERS}")
private String SYS_BROKERS;

@Value("${emqx.topics.SYS_TOPIC_CONNECTED}")
private String SYS_TOPIC_CONNECTED;

@Value("${emqx.topics.SYS_TOPIC_DISCONNECTED}")
private String SYS_TOPIC_DISCONNECTED;

@Value("${emqx.topics.TOPIC_DEVICE_REQUEST}")
private String TOPIC_DEVICE_REQUEST;

@Value("${emqx.topics.TOPIC_DEVICE_RESPONSE}")
private String TOPIC_DEVICE_RESPONSE;

private MqttAsyncClient client;

public void init(String broker, String clientId, MemoryPersistence persistence) {
try {
this.client = new MqttAsyncClient(broker, clientId, persistence);
} catch (MqttException e) {
//throw new RuntimeException(e);
log.error("\n### [EmqxMqttClient - init] error:", e);
}
}

public MqttAsyncClient getClient(String broker, String clientId) {
// 持久化
MemoryPersistence persistence = new MemoryPersistence();
this.init(broker, clientId, persistence);
return this.client;
}

public MqttAsyncClient getClient() {
this.client = getClient(broker, clientId);
return this.client;
}

public MqttConnectOptions getOptions() {
// MQTT 连接选项
MqttConnectOptions options = new MqttConnectOptions();
// 设置认证信息
options.setUserName(userName);
options.setPassword(passwd.toCharArray());
// options.setPassword(SHA256Utils.getSHA256(passwd).toCharArray());
// 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,设置为true表示每次连接到服务器都以新的身份连接
options.setCleanSession(false); // false用于接收离线消息,支持Qos1和Qos2
// 设置超时时间
options.setConnectionTimeout(90);
// 设置会话心跳时间
options.setKeepAliveInterval(60);
options.setMaxInflight(10000);
options.setAutomaticReconnect(true); // 设置自动重连
return options;
}

public void connect(EmqxMqttCallback callback) throws MqttException {
MqttAsyncClient client = this.getClient();
// 设置回调
client.setCallback(callback);
// 建立连接
log.info("Connecting to broker: " + broker);
IMqttToken token = client.connect(this.getOptions());
token.waitForCompletion();
log.info("Connected to broker: " + broker);
}

public void subscribe() throws MqttException {
// 订阅消息
String[] topics = {SYS_TOPIC_CONNECTED, SYS_TOPIC_DISCONNECTED, TOPIC_DEVICE_REQUEST};
int[] qoss = {2, 2, 2};
this.client.subscribe(topics, qoss);
}

public void publish(String topic, MqttMessage message) {
try {
IMqttDeliveryToken token = this.client.publish(topic, message);
token.waitForCompletion();
token.getResponse();
if (token.isComplete()) {
log.info("\n##### EMQX - MQTT ##### publish complete,topic: {}, content: {}", topic, message.toString());
}
} catch (Exception e) {
log.error("\n### [EMQX mqtt publish error] Exception: ", e);
}
}
}

mqtt回调处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package com.urbmn.deviceCommunity.emqx;

import com.urbmn.deviceCommunity.service.EmqxMqttService;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

@Slf4j
@Component
public class EmqxMqttCallback implements MqttCallback {
@Resource
private EmqxMqttService emqxMqttService;

@Override
public void connectionLost(Throwable throwable) {
log.error("\n### [EmqxMqttCallback - connectionLost] - ", throwable);
}

@Override
public void messageArrived(String mqttTopic, MqttMessage mqttMessage) throws Exception {
emqxMqttService.processMessage(mqttTopic, mqttMessage);
}

@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
log.info("\n### [EmqxMqttCallback - deliveryComplete] - iMqttDeliveryToken: {}", iMqttDeliveryToken);
}
}

mqtt消息处理服务

  • EmqxMqttService 接口
1
2
3
4
5
6
7
8
9
package com.urbmn.deviceCommunity.service;

import org.eclipse.paho.client.mqttv3.MqttMessage;

public interface EmqxMqttService {
// 处理订阅的mqtt消息
void processMessage(String mqttTopic, MqttMessage mqttMessage);
}

  • EmqxMqttServiceImpl 实现类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package com.urbmn.deviceCommunity.service.impl;

import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.urbmn.deviceCommunity.config.Constants;
import com.urbmn.deviceCommunity.entity.EmqxConn;
import com.urbmn.deviceCommunity.entity.TbDevicesInfo;
import com.urbmn.deviceCommunity.enums.TopicEnum;
import com.urbmn.deviceCommunity.mqtt.EmqxConstants;
import com.urbmn.deviceCommunity.service.DeviceInfoService;
import com.urbmn.deviceCommunity.service.EmqxMqttService;
import com.urbmn.deviceCommunity.util.MacUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.logging.log4j.util.Strings;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;

@Slf4j
@Service
public class EmqxMqttServiceImpl implements EmqxMqttService {
@Resource
private DeviceInfoService deviceInfoService;

// 处理订阅的mqtt消息
@Async
@Override
public void processMessage(String mqttTopic, MqttMessage mqttMessage) {
String clientId = "";
if (mqttTopic.startsWith(EmqxConstants.SYS_BROKERS)) {
EmqxConn emqxConn = Constants.gson.fromJson(mqttMessage.toString(), EmqxConn.class);
clientId = emqxConn.getClientid();
} else if (mqttTopic.startsWith("MAC")) {
String[] splits = mqttTopic.split("/");
if (splits.length > 3) {
clientId = splits[1];
String type = splits[2];
TopicEnum topicEnum = TopicEnum.getInstance(type);
if (topicEnum == TopicEnum.UNDEFINED) {
return; // 非设备信息模块消息不处理
}
}
}
// 判断是否社区设备
if (Strings.isBlank(clientId) || !deviceInfoService.ynCommunityDevice(clientId)) {
return; // 非社区设备的mqtt消息不处理
}
log.info("\n### [EmqxMqttCallback - DeviceCommunity - messageArrived] - mqttTopic:{}, mqttMessage: {}", mqttTopic, mqttMessage);

// 只处理社区的设备
if (mqttTopic.startsWith(EmqxConstants.SYS_BROKERS)) { // 系统消息,设备上线或下线
EmqxConn emqxConn = Constants.gson.fromJson(mqttMessage.toString(), EmqxConn.class);
if (emqxConn.getClientid().equals(EmqxConstants.clientid)) {
return; // 是本机的上下线数据则不处理
}
log.info("\n ### MQTT recv msg topic ###:" + mqttTopic);
log.info("\n ### MQTT emqxConn:" + emqxConn.getClientid() + " " + emqxConn.getIpaddress() + " topic:" + mqttTopic);
if (MacUtil.isMac(clientId)) {
TbDevicesInfo devicesInfo = new TbDevicesInfo();
devicesInfo.setDeviceMac(clientId);
log.info("\n ### MQTT [Mac Device] clientId = " + clientId + ", - topic: " + mqttTopic);
if (mqttTopic.contains("/disconnected")) { // 下线
log.info("\n ### [MQTT] device disconnected [off] 设备下线 clientid:" + clientId);
devicesInfo.setStatus(0);
} else { // 上线
log.info("\n ### [MQTT] device connected [on] 设备上线 clientid:" + clientId);
devicesInfo.setStatus(1);
}
// 【07】MQTT设备上下线更新状态
deviceInfoService.updateStatus(devicesInfo);
} else {
log.info("\n ### MQTT [Mac Device] clientId = " + clientId + ", - topic: " + mqttTopic);
if (mqttTopic.contains("/disconnected")) { // 下线
log.info("\n ### [MQTT] device disconnected [off] 设备下线 clientid:" + clientId);
deviceInfoService.kickOutDevice(clientId);
} else { // 上线
log.info("\n ### [MQTT] device connected [on] 设备上线 clientid:" + clientId);
}
}
}

// 设备消息
if (mqttTopic.startsWith("MAC")) {
JSONObject jsonObject = JSON.parseObject(mqttMessage.toString());
JSONObject content = jsonObject.getJSONObject("content");
String[] splits = mqttTopic.split("/");
if (splits.length > 3) {
clientId = splits[1];
String type = splits[2];
TopicEnum topicEnum = TopicEnum.getInstance(type);
switch (topicEnum) {
case LOGIN:
// 终端登录接口
deviceInfoService.saveLOGIN(clientId, content);
break;
case DTMU:
// DTMB升级接口
deviceInfoService.saveDTMU(clientId, content);
break;
case APPU:
// App升级接口
deviceInfoService.saveAPPU(clientId, content);
break;
case ROMU:
// Rom升级接口
deviceInfoService.saveROMU(clientId, content);
break;
case POST:
// 固定位素材接口
deviceInfoService.savePOST(clientId, content);
break;
case WARN:
// 终端报警接口
deviceInfoService.saveWARN(clientId, content);
break;
case DOWN:
// 下载状态上报
deviceInfoService.saveDOWN(clientId, content);
break;
case GPSS:
// GPS等信息上报
deviceInfoService.saveGPSS(clientId, jsonObject);
break;
case FACES:
// AI识别上报
deviceInfoService.saveFaces(clientId, jsonObject);
break;
case VOIC:
// 更新当前音量
deviceInfoService.updateVoice(clientId, jsonObject);
break;
case SCREENSTATUS:
// 屏幕状态上报
deviceInfoService.saveScreenStatus(clientId, jsonObject);
break;
//default:
//dis.saveLOGIN(content);
}
}
}
}
}

具体业务处理类

  • DeviceInfoService 接口
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package com.urbmn.deviceCommunity.service;

import com.alibaba.fastjson2.JSONObject;
import com.urbmn.deviceCommunity.dto.SavePublicVoiceDTO;
import com.urbmn.deviceCommunity.entity.TbDevicesInfo;
import com.urbmn.deviceCommunity.param.RecordLiveParam;
import com.urbmn.deviceCommunity.param.SavePrivateOnOffTimerParam;

import java.util.List;

public interface DeviceInfoService {
// 【01】MQTT保存终端登录
void saveLOGIN(String clientId, JSONObject content);
// 【02】MQTT保存DTMB升级接口
void saveDTMU(String clientId, JSONObject content);
// 【03】MQTT保存App升级接口
void saveAPPU(String clientId, JSONObject content);
// 【04】MQTT保存Rom升级接口
void saveROMU(String clientId, JSONObject content);
// 【05】MQTT保存固定位素材接口
void savePOST(String clientId, JSONObject content);
// 【06】MQTT保存终端报警接口
void saveWARN(String clientId, JSONObject content);
// 【07】MQTT设备上下线更新状态
void updateStatus(TbDevicesInfo param);
// 【08】MQTT设备其他推送接口
void otherPush(String deviceMac, String operateType);
// 【09】MQTT设备下载状态上报
void saveDOWN(String clientId, JSONObject content);
// 【10】MQTT设备GPS等信息上报
void saveGPSS(String clientId, JSONObject jsonObject);
// 【11】MQTT设备AI识别上报
void saveFaces(String clientId, JSONObject jsonObject);
// 【12】MQTT设备更新当前音量上报
void updateVoice(String clientId, JSONObject jsonObject);
// 【13】屏幕状态上报
void saveScreenStatus(String clientId, JSONObject jsonObject);
}

  • DeviceInfoServiceImpl 实现类

因源代码过长,为了方便阅读,只展示 【07】MQTT设备上下线更新状态 接口实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
package com.urbmn.deviceCommunity.service.impl;

import cn.hutool.core.date.DateUtil;
import co.elastic.clients.elasticsearch.core.IndexResponse;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.urbmn.common.api.CommonResult;
import com.urbmn.deviceCommunity.constants.MessageConstants;
import com.urbmn.deviceCommunity.dto.SavePublicVoiceDTO;
import com.urbmn.deviceCommunity.elasticSearch.ES;
import com.urbmn.deviceCommunity.emqx.EmqxMqttClient;
import com.urbmn.deviceCommunity.entity.*;
import com.urbmn.deviceCommunity.enums.OperateTypeEnum;
import com.urbmn.deviceCommunity.mapper.*;
import com.urbmn.deviceCommunity.model.ChannelApp;
import com.urbmn.deviceCommunity.model.CheckParamDTO;
import com.urbmn.deviceCommunity.model.SavePostModel;
import com.urbmn.deviceCommunity.model.dto.ListScreenshotByTimesParam;
import com.urbmn.deviceCommunity.model.es.DeviceDataTraffic;
import com.urbmn.deviceCommunity.model.es.EsDeviceConnectLog;
import com.urbmn.deviceCommunity.model.vo.mapper.RecordLiveParamMapper;
import com.urbmn.deviceCommunity.openFeign.BusAdService;
import com.urbmn.deviceCommunity.openFeign.ResourceService;
import com.urbmn.deviceCommunity.param.RecordLiveParam;
import com.urbmn.deviceCommunity.param.SaveOnOffTimerPhaseParam;
import com.urbmn.deviceCommunity.param.SavePrivateOnOffTimerParam;
import com.urbmn.deviceCommunity.service.DeviceInfoService;
import com.urbmn.deviceCommunity.service.FileService;
import com.urbmn.deviceCommunity.util.*;
import lombok.extern.slf4j.Slf4j;
import org.apache.logging.log4j.util.Strings;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.util.DigestUtils;

import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.*;

@Service
@Slf4j
public class DeviceInfoServiceImpl implements DeviceInfoService {
@Autowired
private TbDevicesInfoMapper tbDevicesInfoMapper;

// 【07】MQTT设备上下线更新状态
@Async
@Override
public void updateStatus(TbDevicesInfo param) {
//tbDevicesInfoMapperOld.update(param);
Integer status = param.getStatus();
String deviceMac = param.getDeviceMac();
if (status == 0) {
redisTemplate.delete(deviceMac);
httpClientUtil.kickOutDevice(deviceMac);
}
TbDevicesInfo devicesInfo = tbDevicesInfoMapper.selectByDeviceMac(deviceMac);
if (devicesInfo != null) {
EsDeviceConnectLog connectLog = new EsDeviceConnectLog();
connectLog.setId(UuidUtil.get());
connectLog.setMac(deviceMac);
Date now = new Date();
connectLog.setTime(now.getTime());
connectLog.setTimeStr(DateUtil.formatDateTime(now));
if (status == 0) {
connectLog.setType("disconnect");
devicesInfo.setLastDisconnect(new Date()); // 下线
} else {
connectLog.setType("connect");
devicesInfo.setLastConnect(new Date()); // 上线
}
devicesInfo.setStatus(param.getStatus());
tbDevicesInfoMapper.updateByPrimaryKey(devicesInfo);
ES.index("device_community_xa_connect_log", JSON.toJSONString(connectLog));
}
}
}

 评论