Commit 55f3f423 authored by liuyan's avatar liuyan

fix:提交代码

parent ee329005
File added
File added
# docker-compose.yml
version: '3'
networks:
kafka-net:
driver: bridge
services:
kafka:
image: bitnami/kafka:latest
networks:
- kafka-net
environment:
- ALLOW_PLAINTEXT_LISTENER=yes # 新增
- KAFKA_CFG_NODE_ID=1
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka:9093
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_LISTENERS=CONTROLLER://:9093,PLAINTEXT://:29092,PLAINTEXT_HOST://:9092
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT # 修正
ports:
- "9092:9092" # 仅暴露客户端端口
centos:
image: centos:latest
container_name: centos
restart: unless-stopped
networks:
- kafka-net
ports:
- "2222:22" # SSH 端口映射(如果启用)
- "8082:8081"
# volumes:
# - ./data:/data # 数据卷挂载
environment:
- TZ=Asia/Shanghai # 设置时区
command: [ "/sbin/init" ] # 使用 systemd 作为 PID 1
{
"name": "RuoYi-Vue",
"lockfileVersion": 3,
"requires": true,
"packages": {}
}
package com.ruoyi.api.client;
import com.ruoyi.common.core.domain.AjaxResult;
import com.ruoyi.user.dto.LoginDto;
import com.ruoyi.user.dto.RegisterDto;
import com.ruoyi.user.service.LoginService;
import com.ruoyi.user.service.RegisterService;
import io.swagger.annotations.ApiOperation;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
@RequestMapping("/api")
@RestController
public class LoginApi {
@Resource(name = "ClientLoginService")
private LoginService loginService;
@Resource(name = "RegisterService")
private RegisterService registerService;
@PostMapping("/login")
@ApiOperation("登陆接口")
public AjaxResult login(@RequestBody LoginDto loginDto){
//进行登陆
AjaxResult login = loginService.login(loginDto);
return login;
}
/**
* @param registerDto :
* @return com.ruoyi.common.core.domain.AjaxResult
* @Version: v1.0
* @Author: LiuYan
* @Date 2025-5-5 12:22
*
* 注册
**/
@PostMapping("/register")
public AjaxResult register(@RequestBody RegisterDto registerDto){
AjaxResult register = registerService.register(registerDto);
return register;
}
}
package com.ruoyi.api.client;
import org.springframework.web.bind.annotation.RequestMapping;
@RequestMapping("/api/user")
public class UserApi {
}
package com.ruoyi.api.client;
import com.ruoyi.common.core.domain.AjaxResult;
import com.ruoyi.watch.service.INmyUserWatchBaseService;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
@RestController
@RequestMapping("/api/watch")
public class UserWatchApi {
@Autowired
private INmyUserWatchBaseService nmyUserWatchBaseService;
@GetMapping("/getWatchPaths/{userId}/{deviceId}")
@ApiOperation("获取监控的资源")
public AjaxResult getWatchPaths(@PathVariable("userId") Long userId, @PathVariable("deviceId") Long deviceId){
AjaxResult ajaxResult1 = nmyUserWatchBaseService.selectNmyUserWatchBaseList(userId, deviceId);
return ajaxResult1;
}
}
package com.ruoyi.api.web;
import org.springframework.web.bind.annotation.RequestMapping;
@RequestMapping("/web/api")
public class WebLoginApi {
}
package com.ruoyi.grpc.service; package com.ruoyi.grpc.service;
import com.google.gson.Gson;
import com.google.protobuf.ByteString; import com.google.protobuf.ByteString;
import com.ruoyi.file.enums.MsgFileType;
import com.ruoyi.file.enums.MsgHandlerType;
import com.ruoyi.grpc.file.*; import com.ruoyi.grpc.file.*;
import com.ruoyi.user.dto.ClientUserOnline; import com.ruoyi.user.dto.ClientUserOnline;
import com.ruoyi.user.service.TokenApiService; import com.ruoyi.user.service.TokenApiService;
...@@ -16,8 +19,10 @@ import org.springframework.beans.factory.annotation.Value; ...@@ -16,8 +19,10 @@ import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream; import java.io.FileOutputStream;
import java.io.IOException; import java.io.IOException;
import java.nio.file.Paths;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
...@@ -49,7 +54,7 @@ public class FileServiceImpl extends FileServiceGrpc.FileServiceImplBase { ...@@ -49,7 +54,7 @@ public class FileServiceImpl extends FileServiceGrpc.FileServiceImplBase {
private String changeBasePath; private String changeBasePath;
private FileOutputStream fos; private FileOutputStream fos;
private long totalSize = 0; private long totalSize = 0;
String serverPath = "";
@Override @Override
public void onNext(FileUploadRequest request) { public void onNext(FileUploadRequest request) {
try { try {
...@@ -60,15 +65,15 @@ public class FileServiceImpl extends FileServiceGrpc.FileServiceImplBase { ...@@ -60,15 +65,15 @@ public class FileServiceImpl extends FileServiceGrpc.FileServiceImplBase {
this.token = request.getToken(); this.token = request.getToken();
this.changeBasePath = request.getBasePath(); this.changeBasePath = request.getBasePath();
// 解析用户信息 // 解析用户信息
ClientUserOnline loginUser = tokenApiService.getLoginUser(); ClientUserOnline loginUser = tokenApiService.getLoginUserByToken(token);
if (loginUser == null) { if (loginUser == null) {
throw new RuntimeException("无效Token: " + token); throw new RuntimeException("无效Token: " + token);
} }
this.userId = loginUser.getUserId(); this.userId = loginUser.getUserId();
this.deviceId = loginUser.getDeviceId(); this.deviceId = loginUser.getDeviceId();
serverPath = loginUser.getServerPath();
// 创建文件对象 // 创建文件对象
java.io.File ioFile = new java.io.File(uploadBasePath + filename); java.io.File ioFile = new java.io.File(getServerFilePath()+filename);
// 创建文件所在的目录 // 创建文件所在的目录
ioFile.getParentFile().mkdirs(); ioFile.getParentFile().mkdirs();
// 创建文件,如果文件存在则覆盖 // 创建文件,如果文件存在则覆盖
...@@ -96,6 +101,9 @@ public class FileServiceImpl extends FileServiceGrpc.FileServiceImplBase { ...@@ -96,6 +101,9 @@ public class FileServiceImpl extends FileServiceGrpc.FileServiceImplBase {
cleanupResources(); cleanupResources();
System.err.println("客户端中断上传: " + t.getMessage()); System.err.println("客户端中断上传: " + t.getMessage());
} }
public String getServerFilePath(){
return uploadBasePath + serverPath + "/" ;
}
@Override @Override
public void onCompleted() { public void onCompleted() {
...@@ -105,13 +113,25 @@ public class FileServiceImpl extends FileServiceGrpc.FileServiceImplBase { ...@@ -105,13 +113,25 @@ public class FileServiceImpl extends FileServiceGrpc.FileServiceImplBase {
.setStatus(Status.OK.toString()) .setStatus(Status.OK.toString())
.setSize(totalSize) .setSize(totalSize)
.build()); .build());
responseObserver.onCompleted();
//发消息让该同步的客户端进行同步的消息 //发消息让该同步的客户端进行同步的消息
List<WebSocketMsgDto> webSocketSendMsg = myUserWatchSubscribeService.getWebSocketSendMsg(userId, deviceId, changeBasePath); List<WebSocketMsgDto> webSocketSendMsg = myUserWatchSubscribeService.getWebSocketSendMsg(userId, deviceId, changeBasePath);
if (webSocketSendMsg.isEmpty()) {
//TODO responseObserver.onCompleted();
webSocketService.sendMsg(new ArrayList<>(),"add"); return;
}
for (WebSocketMsgDto webSocketMsgDto : webSocketSendMsg) {
//文件
webSocketMsgDto.setMsgFileType(MsgFileType.file.getCode());
//创建
webSocketMsgDto.setMsgHandlerType(MsgHandlerType.create.getCode());
//存储在服务器的路径
webSocketMsgDto.setServerFilePath(getServerFilePath());
webSocketMsgDto.setFileName(filename);
}
webSocketService.sendMsg(webSocketSendMsg);
responseObserver.onCompleted();
} catch (IOException e) { } catch (IOException e) {
e.printStackTrace(); e.printStackTrace();
...@@ -140,10 +160,19 @@ public class FileServiceImpl extends FileServiceGrpc.FileServiceImplBase { ...@@ -140,10 +160,19 @@ public class FileServiceImpl extends FileServiceGrpc.FileServiceImplBase {
@Override @Override
public void downloadFile(FileDownloadRequest request, public void downloadFile(FileDownloadRequest request,
StreamObserver<FileDownloadResponse> responseObserver) { StreamObserver<FileDownloadResponse> responseObserver) {
FileInputStream fis = null;
try { try {
String fullPath = Paths.get(request.getFilename()).toString();
java.io.File file = new java.io.File(fullPath);
java.io.File file = new java.io.File(uploadBasePath + request.getFilename()); // 检查文件是否存在
FileInputStream fis = new FileInputStream(file); if (!file.exists()) {
throw Status.NOT_FOUND.withDescription("File not found: " + request.getFilename()).asRuntimeException();
}
fis = new FileInputStream(file);
byte[] buffer = new byte[64 * 1024]; // 64KB chunks byte[] buffer = new byte[64 * 1024]; // 64KB chunks
int bytesRead; int bytesRead;
...@@ -153,10 +182,25 @@ public class FileServiceImpl extends FileServiceGrpc.FileServiceImplBase { ...@@ -153,10 +182,25 @@ public class FileServiceImpl extends FileServiceGrpc.FileServiceImplBase {
.setChunk(ByteString.copyFrom(buffer, 0, bytesRead)) .setChunk(ByteString.copyFrom(buffer, 0, bytesRead))
.build()); .build());
} }
fis.close();
responseObserver.onCompleted(); responseObserver.onCompleted();
} catch (Exception e) {
// 根据异常类型映射状态码
Status status = Status.INTERNAL;
if (e instanceof FileNotFoundException) {
status = Status.NOT_FOUND.withDescription("File not found");
}
responseObserver.onError(status.withCause(e).asRuntimeException());
log.error(e.getMessage(), e);
} finally {
if (fis != null) {
try {
fis.close();
} catch (IOException e) { } catch (IOException e) {
responseObserver.onError(e); log.error(e.getMessage(), e);
// 记录日志,但不再抛出新异常
System.err.println("Failed to close stream: " + e.getMessage());
}
}
} }
} }
......
spring:
kafka:
bootstrap-servers: 127.0.0.1:9092
producer:
#生产者配置
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: my-group
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
server:
upload:
base_path : E:/server_file/ #文件上传至服务器的路径
# 自定义Kafka配置
kafka:
topic: pull-file
FileChangeTopic: file-change
group-id: test-group
#websocker异步发送消息线程池配置
# 当 核心线程数已用完,新任务会被放入队列。
# 如果队列已满且 当前线程数 < 最大线程数,会创建新线程执行任务。
# 如果队列已满且 当前线程数 = 最大线程数,触发拒绝策略(如 AbortPolicy、CallerRunsPolicy 等)。
task:
pool:
ws-send-msg:
core-pool-size: 5
max-pool-size: 10
queue-capacity: 50 #设置任务队列的最大容量(即队列中能缓存的待执行任务数)。当 核心线程数已用完,新任务会被放入队列
keep-alive-seconds: 60 #指的是当线程数超过核心线程数时,多余的空闲线程在终止前等待新任务的最长时间 单位秒
allow-core-timeout: false
thread-name-prefix: sendMsgThreadPoll-
...@@ -16,6 +16,13 @@ ...@@ -16,6 +16,13 @@
</description> </description>
<dependencies> <dependencies>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.8</version>
</dependency>
<dependency> <dependency>
<groupId>org.projectlombok</groupId> <groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId> <artifactId>lombok</artifactId>
......
package com.ruoyi.common.constant;
public class NmyConstans {
//当前登录用户id
public static final String userId = "userId";
public static final String deviceId = "deviceId";
public static final String deviceFlag = "deviceFlag";
public static final String serverPath = "serverPath";
public static final String deviceIp = "deviceIp";
public static final String token = "token";
}
package com.ruoyi.common.enums;
public enum DelFlags {
no_del("0", "存在"), del("2", "删除");
private final String code;
private final String info;
DelFlags(String code, String info)
{
this.code = code;
this.info = info;
}
public String getCode()
{
return code;
}
public String getInfo()
{
return info;
}
}
package com.ruoyi.common.enums;
public enum DeviceOnLineStatus {
off_line("0", "掉线"), on_line("1", "在线");
private final String code;
private final String info;
DeviceOnLineStatus(String code, String info)
{
this.code = code;
this.info = info;
}
public String getCode()
{
return code;
}
public String getInfo()
{
return info;
}
}
package com.ruoyi.common.utils;
import java.io.IOException;
import java.nio.file.*;
import java.nio.file.attribute.BasicFileAttributes;
public class MyFileUtils {
//创建文件夹
public static boolean createDirectoryUsingPath(String directoryPath) {
Path path = Paths.get(directoryPath);
try {
// 创建目录,如果父目录不存在会抛出异常
// Files.createDirectory(path);
// 创建目录,如果父目录不存在会一并创建
Files.createDirectories(path);
return true;
} catch (IOException e) {
e.printStackTrace();
return false;
}
}
//删除文件夹
public static boolean deleteDirectory(String url) {
try {
Path directory = Paths.get(url);
Files.walkFileTree(directory, new SimpleFileVisitor<Path>() {
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
// 删除文件
Files.delete(file);
return FileVisitResult.CONTINUE;
}
@Override
public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
if (exc == null) {
// 删除空目录
Files.delete(dir);
return FileVisitResult.CONTINUE;
}
throw exc;
}
});
return true;
}catch (Exception e){
e.printStackTrace();
return false;
}
}
//删除文件
public static boolean deleteFile(String filePath) {
// 指定要删除的文件路径
Path path = Paths.get(filePath);
try {
// 尝试删除文件,如果文件不存在则抛出异常
// Files.delete(path);
// 尝试删除文件,如果文件不存在则不会抛出异常
if (Files.deleteIfExists(path)) {
System.out.println("文件删除成功");
return true;
} else {
System.out.println("文件不存在");
return true;
}
} catch (IOException e) {
System.out.println("文件删除失败: " + e.getMessage());
return false;
}
}
//重命名文件
public static boolean renameFile(String oldFilePath, String newFilePath) {
// 创建代表旧文件的 Path 对象
Path oldPath = Paths.get(oldFilePath);
// 创建代表新文件的 Path 对象
Path newPath = Paths.get(newFilePath);
try {
// 调用 Files.move 方法重命名文件
Files.move(oldPath, newPath, StandardCopyOption.REPLACE_EXISTING);
return true;
} catch (IOException e) {
e.printStackTrace();
return false;
}
}
}
...@@ -114,7 +114,7 @@ public class SecurityConfig ...@@ -114,7 +114,7 @@ public class SecurityConfig
requests.antMatchers("/login", "/register", "/captchaImage").permitAll() requests.antMatchers("/login", "/register", "/captchaImage").permitAll()
// 静态资源,可匿名访问 // 静态资源,可匿名访问
.antMatchers(HttpMethod.GET, "/", "/*.html", "/**/*.html", "/**/*.css", "/**/*.js", "/profile/**").permitAll() .antMatchers(HttpMethod.GET, "/", "/*.html", "/**/*.html", "/**/*.css", "/**/*.js", "/profile/**").permitAll()
.antMatchers("/swagger-ui.html", "/swagger-resources/**", "/webjars/**", "/*/api-docs", "/druid/**","/ws/**").permitAll() .antMatchers("/swagger-ui.html", "/swagger-resources/**", "/webjars/**", "/*/api-docs", "/druid/**","/ws/**","/api/**").permitAll()
// 除上面外的所有请求全部需要鉴权认证 // 除上面外的所有请求全部需要鉴权认证
.anyRequest().authenticated(); .anyRequest().authenticated();
}) })
......
package com.ruoyi.async;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
@Configuration
@EnableAsync
public class AsyncConfig {
@Autowired
WebSocketSendMsgThreadPollConfig pollConfig;
@Bean(name = "webSocketSendMsgThreadPoll")
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(pollConfig.getCorePoolSize());
executor.setMaxPoolSize(pollConfig.getMaxPoolSize());
executor.setQueueCapacity(pollConfig.getQueueCapacity());
executor.setThreadNamePrefix(pollConfig.getThreadNamePrefix());
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setKeepAliveSeconds(pollConfig.getKeepAliveSeconds());
executor.setAllowCoreThreadTimeOut(true);
// 关键配置:优雅关闭
executor.setWaitForTasksToCompleteOnShutdown(true); // 等待所有任务完成
executor.setAwaitTerminationSeconds(30); // 最多等待30秒
executor.initialize();
return executor;
}
}
package com.ruoyi.async;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
/**
* @return
* @Version: v1.0
* @Author: LiuYan
* @Date 2025-4-30 15:49
*
* 线程池配置,使用websocket发送消息的时候使用异步线程,用线程池
*
**/
@Configuration
@ConfigurationProperties(prefix = "task.pool.ws-send-msg")
public class WebSocketSendMsgThreadPollConfig {
private int corePoolSize;
private int maxPoolSize;
private int queueCapacity;
private int keepAliveSeconds;
private boolean allowCoreTimeout;
private String threadNamePrefix;
public int getCorePoolSize() {
return corePoolSize;
}
public void setCorePoolSize(int corePoolSize) {
this.corePoolSize = corePoolSize;
}
public int getMaxPoolSize() {
return maxPoolSize;
}
public void setMaxPoolSize(int maxPoolSize) {
this.maxPoolSize = maxPoolSize;
}
public int getQueueCapacity() {
return queueCapacity;
}
public void setQueueCapacity(int queueCapacity) {
this.queueCapacity = queueCapacity;
}
public int getKeepAliveSeconds() {
return keepAliveSeconds;
}
public void setKeepAliveSeconds(int keepAliveSeconds) {
this.keepAliveSeconds = keepAliveSeconds;
}
public boolean isAllowCoreTimeout() {
return allowCoreTimeout;
}
public void setAllowCoreTimeout(boolean allowCoreTimeout) {
this.allowCoreTimeout = allowCoreTimeout;
}
public String getThreadNamePrefix() {
return threadNamePrefix;
}
public void setThreadNamePrefix(String threadNamePrefix) {
this.threadNamePrefix = threadNamePrefix;
}
}
package com.ruoyi.device.controller;
import java.util.List;
import javax.servlet.http.HttpServletResponse;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.ruoyi.common.annotation.Log;
import com.ruoyi.common.core.controller.BaseController;
import com.ruoyi.common.core.domain.AjaxResult;
import com.ruoyi.common.enums.BusinessType;
import com.ruoyi.device.domain.NmyDevice;
import com.ruoyi.device.service.INmyDeviceService;
import com.ruoyi.common.utils.poi.ExcelUtil;
import com.ruoyi.common.core.page.TableDataInfo;
/**
* 设备Controller
*
* @author ruoyi
* @date 2025-04-29
*/
@RestController
@RequestMapping("/device/device")
public class NmyDeviceController extends BaseController
{
@Autowired
private INmyDeviceService nmyDeviceService;
/**
* 查询设备列表
*/
@PreAuthorize("@ss.hasPermi('device:device:list')")
@GetMapping("/list")
public TableDataInfo list(NmyDevice nmyDevice)
{
startPage();
List<NmyDevice> list = nmyDeviceService.selectNmyDeviceList(nmyDevice);
return getDataTable(list);
}
/**
* 导出设备列表
*/
@PreAuthorize("@ss.hasPermi('device:device:export')")
@Log(title = "设备", businessType = BusinessType.EXPORT)
@PostMapping("/export")
public void export(HttpServletResponse response, NmyDevice nmyDevice)
{
List<NmyDevice> list = nmyDeviceService.selectNmyDeviceList(nmyDevice);
ExcelUtil<NmyDevice> util = new ExcelUtil<NmyDevice>(NmyDevice.class);
util.exportExcel(response, list, "设备数据");
}
/**
* 获取设备详细信息
*/
@PreAuthorize("@ss.hasPermi('device:device:query')")
@GetMapping(value = "/{id}")
public AjaxResult getInfo(@PathVariable("id") Long id)
{
return success(nmyDeviceService.selectNmyDeviceById(id));
}
/**
* 新增设备
*/
@PreAuthorize("@ss.hasPermi('device:device:add')")
@Log(title = "设备", businessType = BusinessType.INSERT)
@PostMapping
public AjaxResult add(@RequestBody NmyDevice nmyDevice)
{
return toAjax(nmyDeviceService.insertNmyDevice(nmyDevice));
}
/**
* 修改设备
*/
@PreAuthorize("@ss.hasPermi('device:device:edit')")
@Log(title = "设备", businessType = BusinessType.UPDATE)
@PutMapping
public AjaxResult edit(@RequestBody NmyDevice nmyDevice)
{
return toAjax(nmyDeviceService.updateNmyDevice(nmyDevice));
}
/**
* 删除设备
*/
@PreAuthorize("@ss.hasPermi('device:device:remove')")
@Log(title = "设备", businessType = BusinessType.DELETE)
@DeleteMapping("/{ids}")
public AjaxResult remove(@PathVariable Long[] ids)
{
return toAjax(nmyDeviceService.deleteNmyDeviceByIds(ids));
}
}
package com.ruoyi.device.domain;
import lombok.Data;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
import com.ruoyi.common.annotation.Excel;
import com.ruoyi.common.core.domain.BaseEntity;
/**
* 设备对象 nmy_device
*
* @author ruoyi
* @date 2025-04-29
*/
@Data
public class NmyDevice extends BaseEntity
{
private static final long serialVersionUID = 1L;
/** $column.columnComment */
private Long id;
/** 用户id */
@Excel(name = "用户id")
private Long userId;
/** 设备唯一标识 */
@Excel(name = "设备唯一标识")
private String deviceFlag;
/** 设备是否在线(0:不在线 1:在线) */
@Excel(name = "设备是否在线(0:不在线 1:在线)")
private String onLine;
/** 设备登陆token值 */
@Excel(name = "设备登陆token值")
private String token;
/** 设备ip */
@Excel(name = "设备ip")
private String deviceIp;
@Excel(name = "服务器存储路径")
private String serverPath;
/** 删除标志(0代表存在 2代表删除) */
private String delFlag;
@Override
public String toString() {
return new ToStringBuilder(this,ToStringStyle.MULTI_LINE_STYLE)
.append("id", getId())
.append("userId", getUserId())
.append("deviceFlag", getDeviceFlag())
.append("onLine", getOnLine())
.append("token", getToken())
.append("deviceIp", getDeviceIp())
.append("delFlag", getDelFlag())
.append("createTime", getCreateTime())
.append("updateTime", getUpdateTime())
.toString();
}
}
package com.ruoyi.device.mapper;
import java.util.List;
import com.ruoyi.device.domain.NmyDevice;
/**
* 设备Mapper接口
*
* @author ruoyi
* @date 2025-04-29
*/
public interface NmyDeviceMapper
{
/**
* 查询设备
*
* @param id 设备主键
* @return 设备
*/
public NmyDevice selectNmyDeviceById(Long id);
/**
* 查询设备列表
*
* @param nmyDevice 设备
* @return 设备集合
*/
public List<NmyDevice> selectNmyDeviceList(NmyDevice nmyDevice);
/**
* 新增设备
*
* @param nmyDevice 设备
* @return 结果
*/
public int insertNmyDevice(NmyDevice nmyDevice);
/**
* 修改设备
*
* @param nmyDevice 设备
* @return 结果
*/
public int updateNmyDevice(NmyDevice nmyDevice);
/**
* 删除设备
*
* @param id 设备主键
* @return 结果
*/
public int deleteNmyDeviceById(Long id);
/**
* 批量删除设备
*
* @param ids 需要删除的数据主键集合
* @return 结果
*/
public int deleteNmyDeviceByIds(Long[] ids);
}
package com.ruoyi.device.service;
import java.util.List;
import com.ruoyi.device.domain.NmyDevice;
/**
* 设备Service接口
*
* @author ruoyi
* @date 2025-04-29
*/
public interface INmyDeviceService
{
/**
* 查询设备
*
* @param id 设备主键
* @return 设备
*/
public NmyDevice selectNmyDeviceById(Long id);
/**
* 查询设备列表
*
* @param nmyDevice 设备
* @return 设备集合
*/
public List<NmyDevice> selectNmyDeviceList(NmyDevice nmyDevice);
/**
* 新增设备
*
* @param nmyDevice 设备
* @return 结果
*/
public int insertNmyDevice(NmyDevice nmyDevice);
/**
* 修改设备
*
* @param nmyDevice 设备
* @return 结果
*/
public int updateNmyDevice(NmyDevice nmyDevice);
/**
* 批量删除设备
*
* @param ids 需要删除的设备主键集合
* @return 结果
*/
public int deleteNmyDeviceByIds(Long[] ids);
/**
* 删除设备信息
*
* @param id 设备主键
* @return 结果
*/
public int deleteNmyDeviceById(Long id);
/**
* 上线
*
* @param deviceId 设备主键
* @return 结果
*/
public void onLine(Long deviceId,String onLine);
}
package com.ruoyi.device.service.impl;
import java.util.List;
import com.ruoyi.common.utils.DateUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.ruoyi.device.mapper.NmyDeviceMapper;
import com.ruoyi.device.domain.NmyDevice;
import com.ruoyi.device.service.INmyDeviceService;
/**
* 设备Service业务层处理
*
* @author ruoyi
* @date 2025-04-29
*/
@Service
public class NmyDeviceServiceImpl implements INmyDeviceService
{
@Autowired
private NmyDeviceMapper nmyDeviceMapper;
/**
* 查询设备
*
* @param id 设备主键
* @return 设备
*/
@Override
public NmyDevice selectNmyDeviceById(Long id)
{
return nmyDeviceMapper.selectNmyDeviceById(id);
}
/**
* 查询设备列表
*
* @param nmyDevice 设备
* @return 设备
*/
@Override
public List<NmyDevice> selectNmyDeviceList(NmyDevice nmyDevice)
{
return nmyDeviceMapper.selectNmyDeviceList(nmyDevice);
}
/**
* 新增设备
*
* @param nmyDevice 设备
* @return 结果
*/
@Override
public int insertNmyDevice(NmyDevice nmyDevice)
{
nmyDevice.setCreateTime(DateUtils.getNowDate());
return nmyDeviceMapper.insertNmyDevice(nmyDevice);
}
/**
* 修改设备
*
* @param nmyDevice 设备
* @return 结果
*/
@Override
public int updateNmyDevice(NmyDevice nmyDevice)
{
nmyDevice.setUpdateTime(DateUtils.getNowDate());
return nmyDeviceMapper.updateNmyDevice(nmyDevice);
}
/**
* 批量删除设备
*
* @param ids 需要删除的设备主键
* @return 结果
*/
@Override
public int deleteNmyDeviceByIds(Long[] ids)
{
return nmyDeviceMapper.deleteNmyDeviceByIds(ids);
}
/**
* 删除设备信息
*
* @param id 设备主键
* @return 结果
*/
@Override
public int deleteNmyDeviceById(Long id)
{
return nmyDeviceMapper.deleteNmyDeviceById(id);
}
/**
* 上线
*
* @param deviceId 设备主键
* @return 结果
*/
@Override
public void onLine(Long deviceId,String online) {
NmyDevice nmyDevice = new NmyDevice();
nmyDevice.setId(deviceId);
nmyDevice.setOnLine(online);
nmyDevice.setUpdateTime(DateUtils.getNowDate());
nmyDeviceMapper.updateNmyDevice(nmyDevice);
}
}
package com.ruoyi.file.dto;
import com.ruoyi.file.enums.MsgFileType;
import com.ruoyi.file.enums.MsgHandlerType;
import lombok.Data;
@Data
public class FileChangeMsg {
private String realPath;
private MsgFileType fileType;
private MsgHandlerType handlerType;
//登陆的token
private String token;
//监控的基础资源路径 后边都是带/结尾的
private String basePath;
}
package com.ruoyi.file.enums;
public enum MsgFileType {
file(0, "文件"),folder(1, "文件夹");
private final Integer code;
private final String info;
MsgFileType(Integer code, String info)
{
this.code = code;
this.info = info;
}
public Integer getCode()
{
return code;
}
public String getInfo()
{
return info;
}
}
package com.ruoyi.file.enums;
public enum MsgHandlerType {
create(0, "创建"), delete(1, "删除"), rename(2, "重命名"),write(3,"修改");
private final Integer code;
private final String info;
MsgHandlerType(Integer code, String info)
{
this.code = code;
this.info = info;
}
public Integer getCode()
{
return code;
}
public String getInfo()
{
return info;
}
}
package com.ruoyi.kafka.consumer;
import com.alibaba.fastjson2.JSONObject;
import com.ruoyi.common.utils.MyFileUtils;
import com.ruoyi.file.dto.FileChangeMsg;
import com.ruoyi.file.enums.MsgFileType;
import com.ruoyi.file.enums.MsgHandlerType;
import com.ruoyi.user.dto.ClientUserOnline;
import com.ruoyi.user.service.TokenApiService;
import com.ruoyi.watch.service.INmyUserWatchSubscribeService;
import com.ruoyi.websocket.dto.WebSocketMsgDto;
import com.ruoyi.websocket.service.WebSocketService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.util.List;
/**
* @return
* @Version: v1.0
* @Author: LiuYan
* @Date 2025-4-23 10:02
*
* 客户端创建文件夹服务端接收消息的消费者
**/
@Service
public class FileChangeConsumer {
@Value("${server.upload.base_path}")
String uploadBasePath;
@Autowired
private WebSocketService webSocketService;
@Autowired
private TokenApiService tokenApiService;
@Autowired
private INmyUserWatchSubscribeService myUserWatchSubscribeService;
@KafkaListener(
topics = "${kafka.FileChangeTopic}", // 动态注入Topic
groupId = "${kafka.group-id}" // 动态注入Group ID
)
public void consume(String message) throws IOException {
//获取基础路径
FileChangeMsg fileChangeMsg = JSONObject.parseObject(message, FileChangeMsg.class);
MsgHandlerType handlerType = fileChangeMsg.getHandlerType();
MsgFileType fileType = fileChangeMsg.getFileType();
String token = fileChangeMsg.getToken();
ClientUserOnline loginUserByToken = tokenApiService.getLoginUserByToken(token);
String serverUuidPath = loginUserByToken.getServerPath();
if (loginUserByToken == null) {
throw new RuntimeException("无效Token: " + token);
}
//发送给需要同步的client
List<WebSocketMsgDto> webSocketSendMsg = myUserWatchSubscribeService.getWebSocketSendMsg(loginUserByToken.getUserId(), loginUserByToken.getDeviceId(), fileChangeMsg.getBasePath());
if(handlerType == MsgHandlerType.create){
if(fileType == MsgFileType.file){
//创建文件,不用管逻辑,在上传文件实现了
} else if (fileType == MsgFileType.folder) {
//创建文件夹
MyFileUtils.createDirectoryUsingPath(uploadBasePath+serverUuidPath+"/"+fileChangeMsg.getRealPath());
//同步给客户端
if (webSocketSendMsg.isEmpty()) {
return;
}
for (WebSocketMsgDto webSocketMsgDto : webSocketSendMsg) {
//文件
webSocketMsgDto.setMsgFileType(MsgFileType.folder.getCode());
//创建
webSocketMsgDto.setMsgHandlerType(MsgHandlerType.create.getCode());
//存储在服务器的路径
webSocketMsgDto.setServerFilePath(uploadBasePath+serverUuidPath+"/");
webSocketMsgDto.setFileName(fileChangeMsg.getRealPath());
}
webSocketService.sendMsg(webSocketSendMsg);
}
} else if (handlerType == MsgHandlerType.delete) {
if(fileType == MsgFileType.file){
//删除文件
MyFileUtils.deleteFile(uploadBasePath+serverUuidPath+"/"+fileChangeMsg.getRealPath());
//同步给客户端
if (webSocketSendMsg.isEmpty()) {
return;
}
for (WebSocketMsgDto webSocketMsgDto : webSocketSendMsg) {
webSocketMsgDto.setMsgFileType(MsgFileType.file.getCode());
webSocketMsgDto.setMsgHandlerType(MsgHandlerType.delete.getCode());
webSocketMsgDto.setFileName(fileChangeMsg.getRealPath());
webSocketMsgDto.setServerFilePath(uploadBasePath+serverUuidPath+"/");
}
webSocketService.sendMsg(webSocketSendMsg);
} else if (fileType == MsgFileType.folder) {
//删除文件夹
MyFileUtils.deleteDirectory(uploadBasePath+fileChangeMsg.getRealPath());
//同步给客户端
if (webSocketSendMsg.isEmpty()) {
return;
}
for (WebSocketMsgDto webSocketMsgDto : webSocketSendMsg) {
webSocketMsgDto.setMsgFileType(MsgFileType.folder.getCode());
webSocketMsgDto.setMsgHandlerType(MsgHandlerType.delete.getCode());
webSocketMsgDto.setFileName(fileChangeMsg.getRealPath());
webSocketMsgDto.setServerFilePath(uploadBasePath+serverUuidPath+"/");
}
webSocketService.sendMsg(webSocketSendMsg);
}
} else if (handlerType == MsgHandlerType.rename) {
// if(fileType == MsgFileType.file){
// //重命名文件
// } else if (fileType == MsgFileType.folder) {
// //重命名文件夹
// }
//截取一下
String fileName = fileChangeMsg.getRealPath();
String[] split = fileName.split("→");
String oldPath = split[0];
String newPath = split[1];
MyFileUtils.renameFile(uploadBasePath+oldPath, uploadBasePath+newPath);
}
System.out.println("Received message: " + message);
}
}
package com.ruoyi.kafka.consumer;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class KafkaConsumerService {
@KafkaListener(
topics = "${kafka.topic}", // 动态注入Topic
groupId = "${kafka.group-id}" // 动态注入Group ID
)
public void consume(String message) {
System.out.println("Received message: " + message);
}
}
package com.ruoyi.kafka.producer;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class KafkaProducerService {
private final KafkaTemplate<String, String> kafkaTemplate;
public KafkaProducerService(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendMessage(String topic, String message) {
kafkaTemplate.send(topic, message);
}
}
package com.ruoyi.user.controller;
import java.util.List;
import javax.servlet.http.HttpServletResponse;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.ruoyi.common.annotation.Log;
import com.ruoyi.common.core.controller.BaseController;
import com.ruoyi.common.core.domain.AjaxResult;
import com.ruoyi.common.enums.BusinessType;
import com.ruoyi.user.domain.NmyUser;
import com.ruoyi.user.service.INmyUserService;
import com.ruoyi.common.utils.poi.ExcelUtil;
import com.ruoyi.common.core.page.TableDataInfo;
/**
* 用户Controller
*
* @author ruoyi
* @date 2025-04-29
*/
@RestController
@RequestMapping("/user/user")
public class NmyUserController extends BaseController
{
@Autowired
private INmyUserService nmyUserService;
/**
* 查询用户列表
*/
@PreAuthorize("@ss.hasPermi('user:user:list')")
@GetMapping("/list")
public TableDataInfo list(NmyUser nmyUser)
{
startPage();
List<NmyUser> list = nmyUserService.selectNmyUserList(nmyUser);
return getDataTable(list);
}
/**
* 导出用户列表
*/
@PreAuthorize("@ss.hasPermi('user:user:export')")
@Log(title = "用户", businessType = BusinessType.EXPORT)
@PostMapping("/export")
public void export(HttpServletResponse response, NmyUser nmyUser)
{
List<NmyUser> list = nmyUserService.selectNmyUserList(nmyUser);
ExcelUtil<NmyUser> util = new ExcelUtil<NmyUser>(NmyUser.class);
util.exportExcel(response, list, "用户数据");
}
/**
* 获取用户详细信息
*/
@PreAuthorize("@ss.hasPermi('user:user:query')")
@GetMapping(value = "/{id}")
public AjaxResult getInfo(@PathVariable("id") Long id)
{
return success(nmyUserService.selectNmyUserById(id));
}
/**
* 新增用户
*/
@PreAuthorize("@ss.hasPermi('user:user:add')")
@Log(title = "用户", businessType = BusinessType.INSERT)
@PostMapping
public AjaxResult add(@RequestBody NmyUser nmyUser)
{
return toAjax(nmyUserService.insertNmyUser(nmyUser));
}
/**
* 修改用户
*/
@PreAuthorize("@ss.hasPermi('user:user:edit')")
@Log(title = "用户", businessType = BusinessType.UPDATE)
@PutMapping
public AjaxResult edit(@RequestBody NmyUser nmyUser)
{
return toAjax(nmyUserService.updateNmyUser(nmyUser));
}
/**
* 删除用户
*/
@PreAuthorize("@ss.hasPermi('user:user:remove')")
@Log(title = "用户", businessType = BusinessType.DELETE)
@DeleteMapping("/{ids}")
public AjaxResult remove(@PathVariable Long[] ids)
{
return toAjax(nmyUserService.deleteNmyUserByIds(ids));
}
}
package com.ruoyi.user.domain;
import lombok.Data;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
import com.ruoyi.common.annotation.Excel;
import com.ruoyi.common.core.domain.BaseEntity;
/**
* 用户对象 nmy_user
*
* @author ruoyi
* @date 2025-04-29
*/
@Data
public class NmyUser extends BaseEntity
{
private static final long serialVersionUID = 1L;
/** $column.columnComment */
private Long id;
/** 电话 */
@Excel(name = "电话")
private String phoneNumber;
/** 密码 */
@Excel(name = "密码")
private String password;
/** 删除标志(0代表存在 2代表删除) */
private String delFlag;
@Override
public String toString() {
return new ToStringBuilder(this,ToStringStyle.MULTI_LINE_STYLE)
.append("id", getId())
.append("phoneNumber", getPhoneNumber())
.append("password", getPassword())
.append("delFlag", getDelFlag())
.append("createTime", getCreateTime())
.append("updateTime", getUpdateTime())
.toString();
}
}
package com.ruoyi.user.dto;
import lombok.Data;
//用户登录成功以后,封装的用户对象,方便根据token获取登陆的用户
@Data
public class ClientUserOnline {
//用户id
private Long userId;
//设备id
private Long deviceId;
//设备唯一标识
private String deviceFlag;
//服务器存储路径
private String serverPath;
//设备ip
private String deviceIp;
//当前登录的token
private String token;
}
package com.ruoyi.user.dto;
import lombok.Data;
import javax.validation.constraints.NotBlank;
/** 登陆需要传的参数 **/
@Data
public class LoginDto {
//手机号
private String phone;
//验证码
private String code;
//设备唯一标识
private String deviceFlag;
//ip
private String deviceIp;
}
package com.ruoyi.user.dto;
import lombok.Data;
@Data
public class RegisterDto {
//手机号
private String phone;
//验证码
private String code;
//设备唯一标识
private String deviceFlag;
//ip
private String deviceIp;
//密码
private String password;
}
package com.ruoyi.user.mapper;
import java.util.List;
import com.ruoyi.user.domain.NmyUser;
/**
* 用户Mapper接口
*
* @author ruoyi
* @date 2025-04-29
*/
public interface NmyUserMapper
{
/**
* 查询用户
*
* @param id 用户主键
* @return 用户
*/
public NmyUser selectNmyUserById(Long id);
/**
* 查询用户列表
*
* @param nmyUser 用户
* @return 用户集合
*/
public List<NmyUser> selectNmyUserList(NmyUser nmyUser);
/**
* 新增用户
*
* @param nmyUser 用户
* @return 结果
*/
public int insertNmyUser(NmyUser nmyUser);
/**
* 修改用户
*
* @param nmyUser 用户
* @return 结果
*/
public int updateNmyUser(NmyUser nmyUser);
/**
* 删除用户
*
* @param id 用户主键
* @return 结果
*/
public int deleteNmyUserById(Long id);
/**
* 批量删除用户
*
* @param ids 需要删除的数据主键集合
* @return 结果
*/
public int deleteNmyUserByIds(Long[] ids);
}
package com.ruoyi.user.service;
import java.util.List;
import com.ruoyi.common.core.domain.AjaxResult;
import com.ruoyi.user.domain.NmyUser;
import com.ruoyi.user.dto.ClientUserOnline;
import com.ruoyi.user.dto.LoginDto;
import com.ruoyi.user.dto.RegisterDto;
/**
* 用户Service接口
*
* @author ruoyi
* @date 2025-04-29
*/
public interface INmyUserService
{
/**
* 查询用户
*
* @param id 用户主键
* @return 用户
*/
public NmyUser selectNmyUserById(Long id);
/**
* 查询用户列表
*
* @param nmyUser 用户
* @return 用户集合
*/
public List<NmyUser> selectNmyUserList(NmyUser nmyUser);
/**
* 新增用户
*
* @param nmyUser 用户
* @return 结果
*/
public int insertNmyUser(NmyUser nmyUser);
/**
* 修改用户
*
* @param nmyUser 用户
* @return 结果
*/
public int updateNmyUser(NmyUser nmyUser);
/**
* 批量删除用户
*
* @param ids 需要删除的用户主键集合
* @return 结果
*/
public int deleteNmyUserByIds(Long[] ids);
/**
* 删除用户信息
*
* @param id 用户主键
* @return 结果
*/
public int deleteNmyUserById(Long id);
public ClientUserOnline getUserOnline(Long deviceId);
}
package com.ruoyi.user.service;
import com.ruoyi.common.core.domain.AjaxResult;
import com.ruoyi.user.dto.LoginDto;
public interface LoginService {
/**
* 登陆
*
* @param loginDto 用户登录参数对象
* @return 结果
*/
public AjaxResult login(LoginDto loginDto);
}
package com.ruoyi.user.service;
import com.ruoyi.common.core.domain.AjaxResult;
import com.ruoyi.user.dto.RegisterDto;
public interface RegisterService {
/**
* 注册
*
* @param registerDto 用户登录参数对象
* @return 结果
*/
public AjaxResult register(RegisterDto registerDto);
}
package com.ruoyi.user.service;
import com.ruoyi.common.constant.Constants;
import com.ruoyi.common.constant.NmyConstans;
import com.ruoyi.common.utils.ServletUtils;
import com.ruoyi.common.utils.StringUtils;
import com.ruoyi.common.utils.uuid.IdUtils;
import com.ruoyi.user.dto.ClientUserOnline;
import io.jsonwebtoken.Claims;
import io.jsonwebtoken.Jwts;
import io.jsonwebtoken.SignatureAlgorithm;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.servlet.http.HttpServletRequest;
import java.util.HashMap;
import java.util.Map;
@Component
public class TokenApiService {
private static final Logger log = LoggerFactory.getLogger(TokenApiService.class);
// 令牌秘钥
@Value("${tokenapi.secret}")
private String secret;
// 令牌自定义标识
@Value("${tokenapi.header}")
private String header;
@Autowired
private INmyUserService nmyUserService;
/**
* 从数据声明生成令牌
*
* @param claims 数据声明
* @return 令牌
*/
private String createToken(Map<String, Object> claims)
{
String token = Jwts.builder()
.setClaims(claims)
.signWith(SignatureAlgorithm.HS512, secret).compact();
return token;
}
/**
* 创建令牌
*
* @param clientUserOnline 用户信息
* @return 令牌
*/
public String createToken(ClientUserOnline clientUserOnline)
{
String token = IdUtils.fastUUID();
Map<String, Object> claims = new HashMap<>();
clientUserOnline.setToken(token);
claims.put(NmyConstans.userId, clientUserOnline.getUserId());
claims.put(NmyConstans.deviceId, clientUserOnline.getDeviceId());
claims.put(NmyConstans.deviceFlag, clientUserOnline.getDeviceFlag());
claims.put(NmyConstans.deviceIp, clientUserOnline.getDeviceIp());
claims.put(NmyConstans.serverPath, clientUserOnline.getServerPath());
claims.put(NmyConstans.token, clientUserOnline.getToken());
return createToken(claims);
}
/**
* 获取请求token
*
* @return token
*/
public String getToken()
{
HttpServletRequest request = ServletUtils.getRequest();
String token = request.getHeader(header);
if (StringUtils.isNotEmpty(token) && token.startsWith(Constants.TOKEN_PREFIX))
{
token = token.replace(Constants.TOKEN_PREFIX, "");
}
return token;
}
/**
* 从令牌中获取数据声明
*
* @param token 令牌
* @return 数据声明
*/
public Claims parseToken(String token)
{
return Jwts.parser()
.setSigningKey(secret)
.parseClaimsJws(token)
.getBody();
}
public ClientUserOnline getLoginUser(){
String token = getToken();
if (StringUtils.isNotEmpty(token))
{
try
{
Claims claims = parseToken(token);
Long deviceId = claims.get(NmyConstans.deviceId, Long.class);
ClientUserOnline userOnline = nmyUserService.getUserOnline(deviceId);
return userOnline;
}
catch (Exception e)
{
log.error(e.getMessage(),e);
}
}
return null;
}
public ClientUserOnline getLoginUserByToken(String token){
if (StringUtils.isNotEmpty(token))
{
try
{
Claims claims = parseToken(token);
Long deviceId = claims.get(NmyConstans.deviceId, Long.class);
ClientUserOnline userOnline = nmyUserService.getUserOnline(deviceId);
return userOnline;
}
catch (Exception e)
{
log.error(e.getMessage(),e);
}
}
return null;
}
}
package com.ruoyi.user.service;
import com.ruoyi.common.constant.Constants;
import com.ruoyi.common.constant.NmyConstans;
import com.ruoyi.common.utils.ServletUtils;
import com.ruoyi.common.utils.StringUtils;
import com.ruoyi.user.dto.ClientUserOnline;
import io.jsonwebtoken.Claims;
import io.jsonwebtoken.Jwts;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpHeaders;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import org.springframework.web.socket.WebSocketSession;
import javax.servlet.http.HttpServletRequest;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@Service
public class WebSocketTokenApiService {
private static final Logger log = LoggerFactory.getLogger(WebSocketTokenApiService.class);
// 令牌秘钥
@Value("${tokenapi.secret}")
private String secret;
// 令牌自定义标识
@Value("${tokenapi.header}")
private String header;
@Autowired
private INmyUserService nmyUserService;
/**
* 从令牌中获取数据声明
*
* @param token 令牌
* @return 数据声明
*/
public Claims parseToken(String token)
{
return Jwts.parser()
.setSigningKey(secret)
.parseClaimsJws(token)
.getBody();
}
public ClientUserOnline getLoginUser(WebSocketSession session){
String token = getToken(session);
if (StringUtils.isNotEmpty(token))
{
try
{
Claims claims = parseToken(token);
Long deviceId = claims.get(NmyConstans.deviceId, Long.class);
ClientUserOnline userOnline = nmyUserService.getUserOnline(deviceId);
return userOnline;
}
catch (Exception e)
{
log.error(e.getMessage(),e);
}
}
return null;
}
/**
* 获取请求token
*
* @return token
*/
public String getToken(WebSocketSession session)
{
Map<String, Object> attributes = session.getAttributes();
HttpHeaders headers = (HttpHeaders) attributes.get("headers");
List<String> strings = headers.get("authorization");
if(CollectionUtils.isEmpty(strings)){
return "";
}
String token = strings.get(0);
if (StringUtils.isNotEmpty(token) && token.startsWith(Constants.TOKEN_PREFIX))
{
token = token.replace(Constants.TOKEN_PREFIX, "");
}
return token;
}
}
package com.ruoyi.user.service.impl;
import com.ruoyi.common.core.domain.AjaxResult;
import com.ruoyi.common.enums.DeviceOnLineStatus;
import com.ruoyi.common.utils.DateUtils;
import com.ruoyi.common.utils.StringUtils;
import com.ruoyi.device.domain.NmyDevice;
import com.ruoyi.device.service.INmyDeviceService;
import com.ruoyi.user.domain.NmyUser;
import com.ruoyi.user.dto.ClientUserOnline;
import com.ruoyi.user.dto.LoginDto;
import com.ruoyi.user.mapper.NmyUserMapper;
import com.ruoyi.user.service.INmyUserService;
import com.ruoyi.user.service.LoginService;
import com.ruoyi.user.service.TokenApiService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import java.util.List;
import java.util.UUID;
@Service("ClientLoginService")
public class ClientLoginServiceImpl implements LoginService {
@Autowired
private INmyDeviceService nmyDeviceService;
@Autowired
private TokenApiService tokenApiService;
@Autowired
private INmyUserService nmyUserService;
@Override
public AjaxResult login(LoginDto loginDto) {
String phone = loginDto.getPhone();
String code = loginDto.getCode();
String deviceIp = loginDto.getDeviceIp();
String deviceFlag = loginDto.getDeviceFlag();
if(StringUtils.isEmpty(phone)){
return AjaxResult.error("手机号未传");
}
if(StringUtils.isEmpty(code)){
return AjaxResult.error("验证码未传");
}
//TODO 先注释功能完善在打开
if(StringUtils.isEmpty(deviceIp)){
return AjaxResult.error("设备ip未传");
}
if(StringUtils.isEmpty(deviceFlag)){
return AjaxResult.error("设备唯一标识未传");
}
//根据手机号查用户存在不存在
NmyUser queryUser = new NmyUser();
queryUser.setPhoneNumber(phone);
List<NmyUser> nmyUsers = nmyUserService.selectNmyUserList(queryUser);
if(CollectionUtils.isEmpty(nmyUsers)){
return AjaxResult.error("用户不存在");
}
// TODO 验证码先写死123456,其实应该在redis中拿取,进行对比
if(!code.equals("123456")){
return AjaxResult.error("登陆失败,验证码错误");
}
//验证通过进行登陆
NmyUser nmyUser = nmyUsers.get(0);
//查询设备的唯一标识在数据库中是否存在,如果不存在需要向数据库插入一条,如果已存在需要更新设备的ip,正常注册的时候就应该保存,但是ip可能会变
NmyDevice nmyDeviceQueryParam = new NmyDevice();
nmyDeviceQueryParam.setDeviceFlag(deviceFlag);
nmyDeviceQueryParam.setUserId(nmyUser.getId());
List<NmyDevice> nmyDevices = nmyDeviceService.selectNmyDeviceList(nmyDeviceQueryParam);
NmyDevice insertDevice = null;
if(CollectionUtils.isEmpty(nmyDevices)){
//数据库中没有这个设备,说明用户可能登陆了新的设备,需要插入
insertDevice = new NmyDevice();
insertDevice.setDeviceFlag(deviceFlag);
insertDevice.setUserId(nmyUser.getId());
insertDevice.setOnLine(DeviceOnLineStatus.on_line.getCode());
insertDevice.setDeviceIp(deviceIp);
insertDevice.setCreateTime(DateUtils.getNowDate());
insertDevice.setUpdateTime(DateUtils.getNowDate());
insertDevice.setServerPath(UUID.randomUUID().toString());
nmyDeviceService.insertNmyDevice(insertDevice);
}else {
//存在需要比对ip变没变,如果有变化需要更新
insertDevice = nmyDevices.get(0);
if(!insertDevice.getDeviceIp().equals(deviceIp)){
//更新ip,并且设置成上线
insertDevice.setDeviceIp(deviceIp);
insertDevice.setOnLine(DeviceOnLineStatus.on_line.getCode());
nmyDeviceService.updateNmyDevice(insertDevice);
}
}
String serverPath = insertDevice.getServerPath();
//生成token
ClientUserOnline clientUserOnline = new ClientUserOnline();
clientUserOnline.setUserId(nmyUser.getId());
clientUserOnline.setDeviceId(insertDevice.getId());
clientUserOnline.setDeviceIp(deviceIp);
clientUserOnline.setServerPath(serverPath);
clientUserOnline.setDeviceFlag(deviceFlag);
String token = tokenApiService.createToken(clientUserOnline);
//更新token
NmyDevice updateToken = new NmyDevice();
updateToken.setToken(token);
updateToken.setId(insertDevice.getId());
int i = nmyDeviceService.updateNmyDevice(updateToken);
clientUserOnline.setToken(token);
return AjaxResult.success(clientUserOnline);
}
}
package com.ruoyi.user.service.impl;
import com.ruoyi.common.core.domain.AjaxResult;
import com.ruoyi.common.enums.DeviceOnLineStatus;
import com.ruoyi.common.utils.StringUtils;
import com.ruoyi.device.domain.NmyDevice;
import com.ruoyi.device.service.INmyDeviceService;
import com.ruoyi.user.domain.NmyUser;
import com.ruoyi.user.dto.RegisterDto;
import com.ruoyi.user.mapper.NmyUserMapper;
import com.ruoyi.user.service.INmyUserService;
import com.ruoyi.user.service.RegisterService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.UUID;
@Service("RegisterService")
public class ClientRegisterServiceImpl implements RegisterService {
@Autowired
private INmyUserService nmyUserService;
@Autowired
private INmyDeviceService nmyDeviceService;
/**
* 注册
*
* @param registerDto 注册
* @return 结果
*/
@Override
public AjaxResult register(RegisterDto registerDto) {
String phone = registerDto.getPhone();
String password = registerDto.getPassword();
String code = registerDto.getCode();
String deviceFlag = registerDto.getDeviceFlag();
String deviceIp = registerDto.getDeviceIp();
if(StringUtils.isEmpty(phone) || StringUtils.isEmpty(password) || StringUtils.isEmpty(code)){
return AjaxResult.error("登陆参数不完整");
}
if(StringUtils.isEmpty(deviceFlag) || StringUtils.isEmpty(deviceIp)){
return AjaxResult.error("设备参数缺失");
}
//todo 手机号校验格式,发送验证码的时候校验手机号是否已注册,验证码校验,密码加密
//保存用户
NmyUser saveUser = new NmyUser();
saveUser.setPhoneNumber(phone);
saveUser.setPassword(password);
int i = nmyUserService.insertNmyUser(saveUser);
//保存设备信息
NmyDevice deviceInfo = new NmyDevice();
deviceInfo.setUserId(saveUser.getId());
deviceInfo.setDeviceIp(deviceIp);
deviceInfo.setDeviceFlag(deviceFlag);
deviceInfo.setOnLine(DeviceOnLineStatus.off_line.getCode());
String serverPath = UUID.randomUUID().toString();
deviceInfo.setServerPath(serverPath);
nmyDeviceService.insertNmyDevice(deviceInfo);
return AjaxResult.success();
}
}
package com.ruoyi.user.service.impl;
import java.util.List;
import java.util.UUID;
import com.ruoyi.common.core.domain.AjaxResult;
import com.ruoyi.common.enums.DeviceOnLineStatus;
import com.ruoyi.common.utils.DateUtils;
import com.ruoyi.common.utils.StringUtils;
import com.ruoyi.device.domain.NmyDevice;
import com.ruoyi.device.mapper.NmyDeviceMapper;
import com.ruoyi.device.service.INmyDeviceService;
import com.ruoyi.system.domain.SysUserOnline;
import com.ruoyi.user.dto.ClientUserOnline;
import com.ruoyi.user.dto.LoginDto;
import com.ruoyi.user.dto.RegisterDto;
import com.ruoyi.user.service.TokenApiService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.ruoyi.user.mapper.NmyUserMapper;
import com.ruoyi.user.domain.NmyUser;
import com.ruoyi.user.service.INmyUserService;
import org.springframework.util.CollectionUtils;
/**
* 用户Service业务层处理
*
* @author ruoyi
* @date 2025-04-29
*/
@Service
public class NmyUserServiceImpl implements INmyUserService
{
@Autowired
private NmyUserMapper nmyUserMapper;
@Autowired
private INmyDeviceService nmyDeviceService;
@Autowired
private TokenApiService tokenApiService;
/**
* 查询用户
*
* @param id 用户主键
* @return 用户
*/
@Override
public NmyUser selectNmyUserById(Long id)
{
return nmyUserMapper.selectNmyUserById(id);
}
/**
* 查询用户列表
*
* @param nmyUser 用户
* @return 用户
*/
@Override
public List<NmyUser> selectNmyUserList(NmyUser nmyUser)
{
return nmyUserMapper.selectNmyUserList(nmyUser);
}
/**
* 新增用户
*
* @param nmyUser 用户
* @return 结果
*/
@Override
public int insertNmyUser(NmyUser nmyUser)
{
nmyUser.setCreateTime(DateUtils.getNowDate());
return nmyUserMapper.insertNmyUser(nmyUser);
}
/**
* 修改用户
*
* @param nmyUser 用户
* @return 结果
*/
@Override
public int updateNmyUser(NmyUser nmyUser)
{
nmyUser.setUpdateTime(DateUtils.getNowDate());
return nmyUserMapper.updateNmyUser(nmyUser);
}
/**
* 批量删除用户
*
* @param ids 需要删除的用户主键
* @return 结果
*/
@Override
public int deleteNmyUserByIds(Long[] ids)
{
return nmyUserMapper.deleteNmyUserByIds(ids);
}
/**
* 删除用户信息
*
* @param id 用户主键
* @return 结果
*/
@Override
public int deleteNmyUserById(Long id)
{
return nmyUserMapper.deleteNmyUserById(id);
}
@Override
public ClientUserOnline getUserOnline(Long deviceId) {
NmyDevice nmyDevice = nmyDeviceService.selectNmyDeviceById(deviceId);
if(nmyDevice == null){
return null;
}
Long userId = nmyDevice.getUserId();
NmyUser nmyUser = nmyUserMapper.selectNmyUserById(userId);
if(nmyUser == null){
return null;
}
ClientUserOnline clientUserOnline = new ClientUserOnline();
clientUserOnline.setUserId(userId);
clientUserOnline.setDeviceId(deviceId);
clientUserOnline.setDeviceIp(nmyDevice.getDeviceIp());
clientUserOnline.setDeviceFlag(nmyDevice.getDeviceFlag());
clientUserOnline.setServerPath(nmyDevice.getServerPath());
clientUserOnline.setToken(nmyDevice.getToken());
return clientUserOnline;
}
}
package com.ruoyi.watch.controller;
import java.util.List;
import javax.servlet.http.HttpServletResponse;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.ruoyi.common.annotation.Log;
import com.ruoyi.common.core.controller.BaseController;
import com.ruoyi.common.core.domain.AjaxResult;
import com.ruoyi.common.enums.BusinessType;
import com.ruoyi.watch.domain.NmyUserWatchBase;
import com.ruoyi.watch.service.INmyUserWatchBaseService;
import com.ruoyi.common.utils.poi.ExcelUtil;
import com.ruoyi.common.core.page.TableDataInfo;
/**
* 用户监控的路径(资源)Controller
*
* @author ruoyi
* @date 2025-04-29
*/
@RestController
@RequestMapping("/watch/watchBase")
public class NmyUserWatchBaseController extends BaseController
{
@Autowired
private INmyUserWatchBaseService nmyUserWatchBaseService;
/**
* 查询用户监控的路径(资源)列表
*/
@PreAuthorize("@ss.hasPermi('watch:watchBase:list')")
@GetMapping("/list")
public TableDataInfo list(NmyUserWatchBase nmyUserWatchBase)
{
startPage();
List<NmyUserWatchBase> list = nmyUserWatchBaseService.selectNmyUserWatchBaseList(nmyUserWatchBase);
return getDataTable(list);
}
/**
* 导出用户监控的路径(资源)列表
*/
@PreAuthorize("@ss.hasPermi('watch:watchBase:export')")
@Log(title = "用户监控的路径(资源)", businessType = BusinessType.EXPORT)
@PostMapping("/export")
public void export(HttpServletResponse response, NmyUserWatchBase nmyUserWatchBase)
{
List<NmyUserWatchBase> list = nmyUserWatchBaseService.selectNmyUserWatchBaseList(nmyUserWatchBase);
ExcelUtil<NmyUserWatchBase> util = new ExcelUtil<NmyUserWatchBase>(NmyUserWatchBase.class);
util.exportExcel(response, list, "用户监控的路径(资源)数据");
}
/**
* 获取用户监控的路径(资源)详细信息
*/
@PreAuthorize("@ss.hasPermi('watch:watchBase:query')")
@GetMapping(value = "/{id}")
public AjaxResult getInfo(@PathVariable("id") Long id)
{
return success(nmyUserWatchBaseService.selectNmyUserWatchBaseById(id));
}
/**
* 新增用户监控的路径(资源)
*/
@PreAuthorize("@ss.hasPermi('watch:watchBase:add')")
@Log(title = "用户监控的路径(资源)", businessType = BusinessType.INSERT)
@PostMapping
public AjaxResult add(@RequestBody NmyUserWatchBase nmyUserWatchBase)
{
return toAjax(nmyUserWatchBaseService.insertNmyUserWatchBase(nmyUserWatchBase));
}
/**
* 修改用户监控的路径(资源)
*/
@PreAuthorize("@ss.hasPermi('watch:watchBase:edit')")
@Log(title = "用户监控的路径(资源)", businessType = BusinessType.UPDATE)
@PutMapping
public AjaxResult edit(@RequestBody NmyUserWatchBase nmyUserWatchBase)
{
return toAjax(nmyUserWatchBaseService.updateNmyUserWatchBase(nmyUserWatchBase));
}
/**
* 删除用户监控的路径(资源)
*/
@PreAuthorize("@ss.hasPermi('watch:watchBase:remove')")
@Log(title = "用户监控的路径(资源)", businessType = BusinessType.DELETE)
@DeleteMapping("/{ids}")
public AjaxResult remove(@PathVariable Long[] ids)
{
return toAjax(nmyUserWatchBaseService.deleteNmyUserWatchBaseByIds(ids));
}
}
package com.ruoyi.watch.controller;
import java.util.List;
import javax.servlet.http.HttpServletResponse;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.ruoyi.common.annotation.Log;
import com.ruoyi.common.core.controller.BaseController;
import com.ruoyi.common.core.domain.AjaxResult;
import com.ruoyi.common.enums.BusinessType;
import com.ruoyi.watch.domain.NmyUserWatchSubscribe;
import com.ruoyi.watch.service.INmyUserWatchSubscribeService;
import com.ruoyi.common.utils.poi.ExcelUtil;
import com.ruoyi.common.core.page.TableDataInfo;
/**
* 订阅的资源Controller
*
* @author ruoyi
* @date 2025-04-29
*/
@RestController
@RequestMapping("/watch/watchSubscribe")
public class NmyUserWatchSubscribeController extends BaseController
{
@Autowired
private INmyUserWatchSubscribeService nmyUserWatchSubscribeService;
/**
* 查询订阅的资源列表
*/
@PreAuthorize("@ss.hasPermi('watch:watchSubscribe:list')")
@GetMapping("/list")
public TableDataInfo list(NmyUserWatchSubscribe nmyUserWatchSubscribe)
{
startPage();
List<NmyUserWatchSubscribe> list = nmyUserWatchSubscribeService.selectNmyUserWatchSubscribeList(nmyUserWatchSubscribe);
return getDataTable(list);
}
/**
* 导出订阅的资源列表
*/
@PreAuthorize("@ss.hasPermi('watch:watchSubscribe:export')")
@Log(title = "订阅的资源", businessType = BusinessType.EXPORT)
@PostMapping("/export")
public void export(HttpServletResponse response, NmyUserWatchSubscribe nmyUserWatchSubscribe)
{
List<NmyUserWatchSubscribe> list = nmyUserWatchSubscribeService.selectNmyUserWatchSubscribeList(nmyUserWatchSubscribe);
ExcelUtil<NmyUserWatchSubscribe> util = new ExcelUtil<NmyUserWatchSubscribe>(NmyUserWatchSubscribe.class);
util.exportExcel(response, list, "订阅的资源数据");
}
/**
* 获取订阅的资源详细信息
*/
@PreAuthorize("@ss.hasPermi('watch:watchSubscribe:query')")
@GetMapping(value = "/{id}")
public AjaxResult getInfo(@PathVariable("id") Long id)
{
return success(nmyUserWatchSubscribeService.selectNmyUserWatchSubscribeById(id));
}
/**
* 新增订阅的资源
*/
@PreAuthorize("@ss.hasPermi('watch:watchSubscribe:add')")
@Log(title = "订阅的资源", businessType = BusinessType.INSERT)
@PostMapping
public AjaxResult add(@RequestBody NmyUserWatchSubscribe nmyUserWatchSubscribe)
{
return toAjax(nmyUserWatchSubscribeService.insertNmyUserWatchSubscribe(nmyUserWatchSubscribe));
}
/**
* 修改订阅的资源
*/
@PreAuthorize("@ss.hasPermi('watch:watchSubscribe:edit')")
@Log(title = "订阅的资源", businessType = BusinessType.UPDATE)
@PutMapping
public AjaxResult edit(@RequestBody NmyUserWatchSubscribe nmyUserWatchSubscribe)
{
return toAjax(nmyUserWatchSubscribeService.updateNmyUserWatchSubscribe(nmyUserWatchSubscribe));
}
/**
* 删除订阅的资源
*/
@PreAuthorize("@ss.hasPermi('watch:watchSubscribe:remove')")
@Log(title = "订阅的资源", businessType = BusinessType.DELETE)
@DeleteMapping("/{ids}")
public AjaxResult remove(@PathVariable Long[] ids)
{
return toAjax(nmyUserWatchSubscribeService.deleteNmyUserWatchSubscribeByIds(ids));
}
}
package com.ruoyi.watch.domain;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
import com.ruoyi.common.annotation.Excel;
import com.ruoyi.common.core.domain.BaseEntity;
/**
* 用户监控的路径(资源)对象 nmy_user_watch_base
*
* @author ruoyi
* @date 2025-04-29
*/
public class NmyUserWatchBase extends BaseEntity
{
private static final long serialVersionUID = 1L;
/** $column.columnComment */
private Long id;
/** 监控的路径 */
@Excel(name = "监控的路径")
private String watchPath;
/** 用户id */
@Excel(name = "用户id")
private Long userId;
/** 设备id */
@Excel(name = "设备id")
private Long deviceId;
/** 删除标志(0代表存在 2代表删除) */
private String delFlag;
public void setId(Long id)
{
this.id = id;
}
public Long getId()
{
return id;
}
public void setWatchPath(String watchPath)
{
this.watchPath = watchPath;
}
public String getWatchPath()
{
return watchPath;
}
public void setUserId(Long userId)
{
this.userId = userId;
}
public Long getUserId()
{
return userId;
}
public void setDeviceId(Long deviceId)
{
this.deviceId = deviceId;
}
public Long getDeviceId()
{
return deviceId;
}
public void setDelFlag(String delFlag)
{
this.delFlag = delFlag;
}
public String getDelFlag()
{
return delFlag;
}
@Override
public String toString() {
return new ToStringBuilder(this,ToStringStyle.MULTI_LINE_STYLE)
.append("id", getId())
.append("watchPath", getWatchPath())
.append("userId", getUserId())
.append("deviceId", getDeviceId())
.append("delFlag", getDelFlag())
.append("createTime", getCreateTime())
.toString();
}
}
package com.ruoyi.watch.domain;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
import com.ruoyi.common.annotation.Excel;
import com.ruoyi.common.core.domain.BaseEntity;
/**
* 订阅的资源对象 nmy_user_watch_subscribe
*
* @author ruoyi
* @date 2025-04-29
*/
public class NmyUserWatchSubscribe extends BaseEntity
{
private static final long serialVersionUID = 1L;
/** $column.columnComment */
private Long id;
/** 监控源id */
@Excel(name = "监控源id")
private Long watchBaseId;
/** 用户id */
@Excel(name = "用户id")
private Long userId;
/** 设备id */
@Excel(name = "设备id")
private Long deviceId;
/** 对应的同步路径 */
@Excel(name = "对应的同步路径")
private String syncPath;
/** 删除标志(0代表存在 2代表删除) */
private String delFlag;
public void setId(Long id)
{
this.id = id;
}
public Long getId()
{
return id;
}
public void setWatchBaseId(Long watchBaseId)
{
this.watchBaseId = watchBaseId;
}
public Long getWatchBaseId()
{
return watchBaseId;
}
public void setUserId(Long userId)
{
this.userId = userId;
}
public Long getUserId()
{
return userId;
}
public void setDeviceId(Long deviceId)
{
this.deviceId = deviceId;
}
public Long getDeviceId()
{
return deviceId;
}
public void setSyncPath(String syncPath)
{
this.syncPath = syncPath;
}
public String getSyncPath()
{
return syncPath;
}
public void setDelFlag(String delFlag)
{
this.delFlag = delFlag;
}
public String getDelFlag()
{
return delFlag;
}
@Override
public String toString() {
return new ToStringBuilder(this,ToStringStyle.MULTI_LINE_STYLE)
.append("id", getId())
.append("watchBaseId", getWatchBaseId())
.append("userId", getUserId())
.append("deviceId", getDeviceId())
.append("syncPath", getSyncPath())
.append("createTime", getCreateTime())
.append("delFlag", getDelFlag())
.toString();
}
}
package com.ruoyi.watch.mapper;
import java.util.List;
import com.ruoyi.watch.domain.NmyUserWatchBase;
/**
* 用户监控的路径(资源)Mapper接口
*
* @author ruoyi
* @date 2025-04-29
*/
public interface NmyUserWatchBaseMapper
{
/**
* 查询用户监控的路径(资源)
*
* @param id 用户监控的路径(资源)主键
* @return 用户监控的路径(资源)
*/
public NmyUserWatchBase selectNmyUserWatchBaseById(Long id);
/**
* 查询用户监控的路径(资源)列表
*
* @param nmyUserWatchBase 用户监控的路径(资源)
* @return 用户监控的路径(资源)集合
*/
public List<NmyUserWatchBase> selectNmyUserWatchBaseList(NmyUserWatchBase nmyUserWatchBase);
/**
* 新增用户监控的路径(资源)
*
* @param nmyUserWatchBase 用户监控的路径(资源)
* @return 结果
*/
public int insertNmyUserWatchBase(NmyUserWatchBase nmyUserWatchBase);
/**
* 修改用户监控的路径(资源)
*
* @param nmyUserWatchBase 用户监控的路径(资源)
* @return 结果
*/
public int updateNmyUserWatchBase(NmyUserWatchBase nmyUserWatchBase);
/**
* 删除用户监控的路径(资源)
*
* @param id 用户监控的路径(资源)主键
* @return 结果
*/
public int deleteNmyUserWatchBaseById(Long id);
/**
* 批量删除用户监控的路径(资源)
*
* @param ids 需要删除的数据主键集合
* @return 结果
*/
public int deleteNmyUserWatchBaseByIds(Long[] ids);
}
package com.ruoyi.watch.mapper;
import java.util.List;
import com.ruoyi.watch.domain.NmyUserWatchSubscribe;
/**
* 订阅的资源Mapper接口
*
* @author ruoyi
* @date 2025-04-29
*/
public interface NmyUserWatchSubscribeMapper
{
/**
* 查询订阅的资源
*
* @param id 订阅的资源主键
* @return 订阅的资源
*/
public NmyUserWatchSubscribe selectNmyUserWatchSubscribeById(Long id);
/**
* 查询订阅的资源列表
*
* @param nmyUserWatchSubscribe 订阅的资源
* @return 订阅的资源集合
*/
public List<NmyUserWatchSubscribe> selectNmyUserWatchSubscribeList(NmyUserWatchSubscribe nmyUserWatchSubscribe);
/**
* 根据baseId 获取在线的 订阅者
*/
public List<NmyUserWatchSubscribe> selectOnLingSubscriberByBaseId(Long watchBaseId);
/**
* 新增订阅的资源
*
* @param nmyUserWatchSubscribe 订阅的资源
* @return 结果
*/
public int insertNmyUserWatchSubscribe(NmyUserWatchSubscribe nmyUserWatchSubscribe);
/**
* 修改订阅的资源
*
* @param nmyUserWatchSubscribe 订阅的资源
* @return 结果
*/
public int updateNmyUserWatchSubscribe(NmyUserWatchSubscribe nmyUserWatchSubscribe);
/**
* 删除订阅的资源
*
* @param id 订阅的资源主键
* @return 结果
*/
public int deleteNmyUserWatchSubscribeById(Long id);
/**
* 批量删除订阅的资源
*
* @param ids 需要删除的数据主键集合
* @return 结果
*/
public int deleteNmyUserWatchSubscribeByIds(Long[] ids);
}
package com.ruoyi.watch.service;
import java.util.List;
import com.ruoyi.common.core.domain.AjaxResult;
import com.ruoyi.watch.domain.NmyUserWatchBase;
/**
* 用户监控的路径(资源)Service接口
*
* @author ruoyi
* @date 2025-04-29
*/
public interface INmyUserWatchBaseService
{
/**
* 查询用户监控的路径(资源)
*
* @param id 用户监控的路径(资源)主键
* @return 用户监控的路径(资源)
*/
public NmyUserWatchBase selectNmyUserWatchBaseById(Long id);
/**
* 查询用户监控的路径(资源)列表
*
* @param nmyUserWatchBase 用户监控的路径(资源)
* @return 用户监控的路径(资源)集合
*/
public List<NmyUserWatchBase> selectNmyUserWatchBaseList(NmyUserWatchBase nmyUserWatchBase);
/**
* 新增用户监控的路径(资源)
*
* @param nmyUserWatchBase 用户监控的路径(资源)
* @return 结果
*/
public int insertNmyUserWatchBase(NmyUserWatchBase nmyUserWatchBase);
/**
* 修改用户监控的路径(资源)
*
* @param nmyUserWatchBase 用户监控的路径(资源)
* @return 结果
*/
public int updateNmyUserWatchBase(NmyUserWatchBase nmyUserWatchBase);
/**
* 批量删除用户监控的路径(资源)
*
* @param ids 需要删除的用户监控的路径(资源)主键集合
* @return 结果
*/
public int deleteNmyUserWatchBaseByIds(Long[] ids);
/**
* 删除用户监控的路径(资源)信息
*
* @param id 用户监控的路径(资源)主键
* @return 结果
*/
public int deleteNmyUserWatchBaseById(Long id);
/**
* 获取设备需要监控的资源
*/
public AjaxResult selectNmyUserWatchBaseList(Long userId, Long deviceId);
}
package com.ruoyi.watch.service;
import java.util.List;
import com.ruoyi.watch.domain.NmyUserWatchSubscribe;
import com.ruoyi.websocket.dto.WebSocketMsgDto;
/**
* 订阅的资源Service接口
*
* @author ruoyi
* @date 2025-04-29
*/
public interface INmyUserWatchSubscribeService
{
/**
* 查询订阅的资源
*
* @param id 订阅的资源主键
* @return 订阅的资源
*/
public NmyUserWatchSubscribe selectNmyUserWatchSubscribeById(Long id);
/**
* 查询订阅的资源列表
*
* @param nmyUserWatchSubscribe 订阅的资源
* @return 订阅的资源集合
*/
public List<NmyUserWatchSubscribe> selectNmyUserWatchSubscribeList(NmyUserWatchSubscribe nmyUserWatchSubscribe);
/**
* 新增订阅的资源
*
* @param nmyUserWatchSubscribe 订阅的资源
* @return 结果
*/
public int insertNmyUserWatchSubscribe(NmyUserWatchSubscribe nmyUserWatchSubscribe);
/**
* 修改订阅的资源
*
* @param nmyUserWatchSubscribe 订阅的资源
* @return 结果
*/
public int updateNmyUserWatchSubscribe(NmyUserWatchSubscribe nmyUserWatchSubscribe);
/**
* 批量删除订阅的资源
*
* @param ids 需要删除的订阅的资源主键集合
* @return 结果
*/
public int deleteNmyUserWatchSubscribeByIds(Long[] ids);
/**
* 删除订阅的资源信息
*
* @param id 订阅的资源主键
* @return 结果
*/
public int deleteNmyUserWatchSubscribeById(Long id);
/**
* 获取sessionKey,也就是要通知哪些客户端同步
*/
public List<WebSocketMsgDto> getWebSocketSendMsg(Long userId, Long deviceId, String changePath);
/**
* 根据baseId 获取在线的 订阅者
*/
public List<NmyUserWatchSubscribe> selectOnLingSubscriberByBaseId(Long watchBaseId);
}
package com.ruoyi.watch.service.impl;
import java.util.ArrayList;
import java.util.List;
import com.ruoyi.common.core.domain.AjaxResult;
import com.ruoyi.common.utils.DateUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.ruoyi.watch.mapper.NmyUserWatchBaseMapper;
import com.ruoyi.watch.domain.NmyUserWatchBase;
import com.ruoyi.watch.service.INmyUserWatchBaseService;
/**
* 用户监控的路径(资源)Service业务层处理
*
* @author ruoyi
* @date 2025-04-29
*/
@Service
public class NmyUserWatchBaseServiceImpl implements INmyUserWatchBaseService
{
@Autowired
private NmyUserWatchBaseMapper nmyUserWatchBaseMapper;
/**
* 查询用户监控的路径(资源)
*
* @param id 用户监控的路径(资源)主键
* @return 用户监控的路径(资源)
*/
@Override
public NmyUserWatchBase selectNmyUserWatchBaseById(Long id)
{
return nmyUserWatchBaseMapper.selectNmyUserWatchBaseById(id);
}
/**
* 查询用户监控的路径(资源)列表
*
* @param nmyUserWatchBase 用户监控的路径(资源)
* @return 用户监控的路径(资源)
*/
@Override
public List<NmyUserWatchBase> selectNmyUserWatchBaseList(NmyUserWatchBase nmyUserWatchBase)
{
return nmyUserWatchBaseMapper.selectNmyUserWatchBaseList(nmyUserWatchBase);
}
/**
* 新增用户监控的路径(资源)
*
* @param nmyUserWatchBase 用户监控的路径(资源)
* @return 结果
*/
@Override
public int insertNmyUserWatchBase(NmyUserWatchBase nmyUserWatchBase)
{
nmyUserWatchBase.setCreateTime(DateUtils.getNowDate());
return nmyUserWatchBaseMapper.insertNmyUserWatchBase(nmyUserWatchBase);
}
/**
* 修改用户监控的路径(资源)
*
* @param nmyUserWatchBase 用户监控的路径(资源)
* @return 结果
*/
@Override
public int updateNmyUserWatchBase(NmyUserWatchBase nmyUserWatchBase)
{
return nmyUserWatchBaseMapper.updateNmyUserWatchBase(nmyUserWatchBase);
}
/**
* 批量删除用户监控的路径(资源)
*
* @param ids 需要删除的用户监控的路径(资源)主键
* @return 结果
*/
@Override
public int deleteNmyUserWatchBaseByIds(Long[] ids)
{
return nmyUserWatchBaseMapper.deleteNmyUserWatchBaseByIds(ids);
}
/**
* 删除用户监控的路径(资源)信息
*
* @param id 用户监控的路径(资源)主键
* @return 结果
*/
@Override
public int deleteNmyUserWatchBaseById(Long id)
{
return nmyUserWatchBaseMapper.deleteNmyUserWatchBaseById(id);
}
@Override
public AjaxResult selectNmyUserWatchBaseList(Long userId, Long deviceId) {
List<String> list = new ArrayList<String>();
NmyUserWatchBase query = new NmyUserWatchBase();
query.setUserId(userId);
query.setDeviceId(deviceId);
List<NmyUserWatchBase> nmyUserWatchBases = nmyUserWatchBaseMapper.selectNmyUserWatchBaseList(query);
if(nmyUserWatchBases!=null && nmyUserWatchBases.size()>0){
nmyUserWatchBases.forEach(nmyUserWatchBase->{
list.add(nmyUserWatchBase.getWatchPath());
});
}
return AjaxResult.success(list);
}
}
package com.ruoyi.watch.service.impl;
import java.util.ArrayList;
import java.util.List;
import com.ruoyi.common.utils.DateUtils;
import com.ruoyi.watch.domain.NmyUserWatchBase;
import com.ruoyi.watch.mapper.NmyUserWatchBaseMapper;
import com.ruoyi.websocket.dto.WebSocketMsgDto;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.ruoyi.watch.mapper.NmyUserWatchSubscribeMapper;
import com.ruoyi.watch.domain.NmyUserWatchSubscribe;
import com.ruoyi.watch.service.INmyUserWatchSubscribeService;
import org.springframework.util.CollectionUtils;
/**
* 订阅的资源Service业务层处理
*
* @author ruoyi
* @date 2025-04-29
*/
@Service
public class NmyUserWatchSubscribeServiceImpl implements INmyUserWatchSubscribeService
{
private static final Logger log = LoggerFactory.getLogger(NmyUserWatchSubscribeServiceImpl.class);
@Autowired
private NmyUserWatchSubscribeMapper nmyUserWatchSubscribeMapper;
@Autowired
private NmyUserWatchBaseMapper nmyUserWatchBaseMapper;
/**
* 查询订阅的资源
*
* @param id 订阅的资源主键
* @return 订阅的资源
*/
@Override
public NmyUserWatchSubscribe selectNmyUserWatchSubscribeById(Long id)
{
return nmyUserWatchSubscribeMapper.selectNmyUserWatchSubscribeById(id);
}
/**
* 查询订阅的资源列表
*
* @param nmyUserWatchSubscribe 订阅的资源
* @return 订阅的资源
*/
@Override
public List<NmyUserWatchSubscribe> selectNmyUserWatchSubscribeList(NmyUserWatchSubscribe nmyUserWatchSubscribe)
{
return nmyUserWatchSubscribeMapper.selectNmyUserWatchSubscribeList(nmyUserWatchSubscribe);
}
/**
* 新增订阅的资源
*
* @param nmyUserWatchSubscribe 订阅的资源
* @return 结果
*/
@Override
public int insertNmyUserWatchSubscribe(NmyUserWatchSubscribe nmyUserWatchSubscribe)
{
nmyUserWatchSubscribe.setCreateTime(DateUtils.getNowDate());
return nmyUserWatchSubscribeMapper.insertNmyUserWatchSubscribe(nmyUserWatchSubscribe);
}
/**
* 修改订阅的资源
*
* @param nmyUserWatchSubscribe 订阅的资源
* @return 结果
*/
@Override
public int updateNmyUserWatchSubscribe(NmyUserWatchSubscribe nmyUserWatchSubscribe)
{
return nmyUserWatchSubscribeMapper.updateNmyUserWatchSubscribe(nmyUserWatchSubscribe);
}
/**
* 批量删除订阅的资源
*
* @param ids 需要删除的订阅的资源主键
* @return 结果
*/
@Override
public int deleteNmyUserWatchSubscribeByIds(Long[] ids)
{
return nmyUserWatchSubscribeMapper.deleteNmyUserWatchSubscribeByIds(ids);
}
/**
* 删除订阅的资源信息
*
* @param id 订阅的资源主键
* @return 结果
*/
@Override
public int deleteNmyUserWatchSubscribeById(Long id)
{
return nmyUserWatchSubscribeMapper.deleteNmyUserWatchSubscribeById(id);
}
@Override
public List<NmyUserWatchSubscribe> selectOnLingSubscriberByBaseId(Long watchBaseId) {
return nmyUserWatchSubscribeMapper.selectOnLingSubscriberByBaseId(watchBaseId);
}
/**
* 获取sessionKey,也就是要通知哪些客户端同步
*/
@Override
public List<WebSocketMsgDto> getWebSocketSendMsg(Long userId, Long deviceId, String changePath) {
List<WebSocketMsgDto> result = new ArrayList<WebSocketMsgDto>();
//获取资源
NmyUserWatchBase nmyUserWatchBase = new NmyUserWatchBase();
nmyUserWatchBase.setUserId(userId);
nmyUserWatchBase.setDeviceId(deviceId);
nmyUserWatchBase.setWatchPath(changePath);
List<NmyUserWatchBase> nmyUserWatchBases = nmyUserWatchBaseMapper.selectNmyUserWatchBaseList(nmyUserWatchBase);
if(CollectionUtils.isEmpty(nmyUserWatchBases)){
log.error("userId:{},deviceId:{},changePath:{} , 没有查出来有基础资源不正确",userId,deviceId,changePath);
return result;
}
NmyUserWatchBase nmyUserWatchBase1 = nmyUserWatchBases.get(0);
Long watchBaseId = nmyUserWatchBase1.getId();
//根据watchBaseId获取订阅他的在线的客户端 //TODO 应该加一个机制,不在线的不推送,如果人家上线了怎么整待定是直接同步还是没有变化就不同步
List<NmyUserWatchSubscribe> nmyUserWatchSubscribes = selectOnLingSubscriberByBaseId(watchBaseId);
//遍历返回结果
nmyUserWatchSubscribes.forEach(nmyUserWatchSubscribe -> {
WebSocketMsgDto webSocketMsgDto = new WebSocketMsgDto();
webSocketMsgDto.setSessionKey(nmyUserWatchSubscribe.getUserId()+"_"+nmyUserWatchSubscribe.getDeviceId());
webSocketMsgDto.setClientFilePath(nmyUserWatchSubscribe.getSyncPath());
result.add(webSocketMsgDto);
});
return result;
}
}
package com.ruoyi.websocket;
import com.ruoyi.websocket.handler.SyncNotificationHandler;
import com.ruoyi.websocket.interceptor.CustomHandshakeInterceptor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
import org.springframework.web.socket.server.standard.ServletServerContainerFactoryBean;
@EnableWebSocket
@Configuration
public class WebSocketConfig implements WebSocketConfigurer {
@Autowired
CustomHandshakeInterceptor customHandshakeInterceptor;
@Autowired
SyncNotificationHandler syncNotificationHandler;
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(syncNotificationHandler, "/ws")
.setAllowedOrigins("*")
.addInterceptors(customHandshakeInterceptor);
}
@Bean
public ServletServerContainerFactoryBean createWebSocketContainer() {
ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean();
// 设置发送超时时间为 5 秒
container.setAsyncSendTimeout(5000L);
return container;
}
}
package com.ruoyi.websocket.dto;
import com.ruoyi.file.enums.MsgFileType;
import com.ruoyi.file.enums.MsgHandlerType;
import lombok.Data;
@Data
public class WebSocketMsgDto {
private String sessionKey;
//对应客户端同步的路径 (绝对路径)
private String clientFilePath;
//在服务器存放的位置 (绝对路径)
private String serverFilePath;
private String fileName;
//创建删除修改重命名
// private MsgHandlerType msgHandlerType;
private Integer msgHandlerType;
//文件还是文件夹
// private MsgFileType msgFileType;
private Integer msgFileType;
}
package com.ruoyi.websocket.handler;
import com.ruoyi.common.enums.DeviceOnLineStatus;
import com.ruoyi.device.service.INmyDeviceService;
import com.ruoyi.user.dto.ClientUserOnline;
import com.ruoyi.user.service.WebSocketTokenApiService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;
import javax.annotation.PreDestroy;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.*;
@Component
public class SyncNotificationHandler extends TextWebSocketHandler {
private static final Logger log = LoggerFactory.getLogger(SyncNotificationHandler.class);
private static final String PING_PONG_MAP_KEY = "pingPongHashMap";
private static final long PING_TIMEOUT = 3000; // 3秒超时
private static final long HEARTBEAT_INTERVAL = 5000; // 5秒心跳间隔
@Autowired
private INmyDeviceService myDeviceService;
@Autowired
private WebSocketTokenApiService tokenApiService;
// 线程安全的会话存储(Key: 客户端ID,Value: WebSocketSession)
private static final ConcurrentHashMap<String, WebSocketSession> sessions = new ConcurrentHashMap<>();
// 线程池 - 使用ScheduledThreadPoolExecutor以便更好地控制线程池
private final ScheduledThreadPoolExecutor executorService = new ScheduledThreadPoolExecutor(
1,
r -> {
Thread thread = new Thread(r);
thread.setName("ws-heartbeat-thread");
thread.setDaemon(true);
return thread;
}
);
// 每一个sessionKey连接,维护一个调度对象
private static final ConcurrentHashMap<String, ScheduledFuture<?>> futureMap = new ConcurrentHashMap<>();
public String getSessionKey(ClientUserOnline clientUserOnline) {
return clientUserOnline.getUserId() + "_" + clientUserOnline.getDeviceId();
}
@Override
public void afterConnectionEstablished(WebSocketSession session) {
log.info("WebSocket连接建立: {}", session.getId());
try {
String token = tokenApiService.getToken(session);
ClientUserOnline loginUser = tokenApiService.getLoginUser(session);
if (loginUser == null) {
log.error("token:{},创建webSocket连接失败,没有找到登陆用户", token);
session.close(CloseStatus.POLICY_VIOLATION.withReason("未登录"));
return;
}
// 初始化ping/pong映射
ConcurrentHashMap<String, Long> pingPongMap = new ConcurrentHashMap<>();
session.getAttributes().put(PING_PONG_MAP_KEY, pingPongMap);
String sessionKey = getSessionKey(loginUser);
// 移除旧的会话(如果存在)
WebSocketSession oldSession = sessions.put(sessionKey, session);
if (oldSession != null && oldSession.isOpen()) {
try {
oldSession.close(CloseStatus.NORMAL.withReason("新连接建立"));
} catch (IOException e) {
log.warn("关闭旧会话失败: {}", sessionKey, e);
}
}
// 发送连接确认
sendMessage(session, "CONNECTED");
// 启动心跳任务
startHeartbeat(sessionKey, session);
// 设置设备为在线状态
myDeviceService.onLine(loginUser.getDeviceId(), DeviceOnLineStatus.on_line.getCode());
} catch (Exception e) {
log.error("处理WebSocket连接建立时出错", e);
try {
session.close(CloseStatus.SERVER_ERROR);
} catch (IOException ex) {
log.warn("关闭WebSocket会话失败", ex);
}
}
}
private void startHeartbeat(String sessionKey, WebSocketSession session) {
// 清理旧的心跳任务(如果存在)
stopHeartbeat(sessionKey);
// 创建心跳任务
ScheduledFuture<?> future = executorService.scheduleAtFixedRate(() -> {
try {
if (!session.isOpen()) {
log.debug("会话已关闭,停止心跳: {}", sessionKey);
stopHeartbeat(sessionKey);
return;
}
String pingId = UUID.randomUUID().toString();
String pingMessage = "ping:" + pingId;
ConcurrentHashMap<String, Long> pingPongMap =
(ConcurrentHashMap<String, Long>) session.getAttributes().get(PING_PONG_MAP_KEY);
if (pingPongMap == null) {
log.warn("pingPongMap不存在,停止心跳: {}", sessionKey);
stopHeartbeat(sessionKey);
return;
}
// 记录发送时间
pingPongMap.put(pingId, System.currentTimeMillis());
// 发送ping消息
log.debug("发送心跳ping: {}", pingId);
sendMessage(session, pingMessage);
// 安排超时检查
executorService.schedule(() -> {
if (pingPongMap.containsKey(pingId)) {
log.warn("心跳超时未收到响应: {}", pingId);
pingPongMap.remove(pingId);
closeSession(session, CloseStatus.SESSION_NOT_RELIABLE);
} else {
log.debug("心跳响应正常: {}", pingId);
}
}, PING_TIMEOUT, TimeUnit.MILLISECONDS);
} catch (Exception e) {
log.error("执行心跳任务时出错: {}", sessionKey, e);
closeSession(session, CloseStatus.SERVER_ERROR);
}
}, 5, HEARTBEAT_INTERVAL, TimeUnit.MILLISECONDS);
// 保存任务引用
futureMap.put(sessionKey, future);
}
private void stopHeartbeat(String sessionKey) {
ScheduledFuture<?> future = futureMap.remove(sessionKey);
if (future != null) {
future.cancel(false);
}
}
private void sendMessage(WebSocketSession session, String message) {
try {
session.sendMessage(new TextMessage(message));
} catch (IOException e) {
log.error("发送WebSocket消息失败: {}", message, e);
closeSession(session, CloseStatus.SERVER_ERROR);
}
}
private void closeSession(WebSocketSession session, CloseStatus status) {
try {
if (session.isOpen()) {
session.close(status);
}
} catch (IOException e) {
log.warn("关闭WebSocket会话失败", e);
}
}
@PreDestroy
public void destroy() {
log.info("关闭WebSocket处理器,清理资源");
// 停止所有心跳任务
futureMap.forEach((sessionKey, future) -> {
if (future != null) {
future.cancel(true);
}
});
futureMap.clear();
// 关闭所有会话
sessions.forEach((sessionKey, session) -> {
closeSession(session, CloseStatus.SERVICE_RESTARTED);
});
sessions.clear();
// 关闭线程池
executorService.shutdown();
try {
if (!executorService.awaitTermination(30, TimeUnit.SECONDS)) {
executorService.shutdownNow();
if (!executorService.awaitTermination(30, TimeUnit.SECONDS)) {
log.error("线程池未能正常关闭");
}
}
} catch (InterruptedException e) {
executorService.shutdownNow();
Thread.currentThread().interrupt();
}
}
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) {
String payload = message.getPayload();
log.debug("收到WebSocket消息: {}", payload);
try {
if (payload.startsWith("pong:")) {
handlePongMessage(session, payload);
} else {
// 处理其他类型的消息
handleCustomMessage(session, payload);
}
} catch (Exception e) {
log.error("处理WebSocket消息时出错", e);
closeSession(session, CloseStatus.SERVER_ERROR);
}
}
private void handlePongMessage(WebSocketSession session, String payload) {
String[] parts = payload.split(":");
if (parts.length == 2) {
String pingId = parts[1];
ConcurrentHashMap<String, Long> pingPongMap =
(ConcurrentHashMap<String, Long>) session.getAttributes().get(PING_PONG_MAP_KEY);
if (pingPongMap != null && pingPongMap.containsKey(pingId)) {
long responseTime = System.currentTimeMillis() - pingPongMap.remove(pingId);
log.debug("收到pong响应: {} (响应时间: {}ms)", pingId, responseTime);
} else {
log.warn("收到未知pingId的pong响应: {}", pingId);
}
}
}
private void handleCustomMessage(WebSocketSession session, String payload) {
// 处理自定义消息的逻辑
log.info("收到自定义消息: {}", payload);
// 可以根据需求添加消息处理逻辑
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
log.info("WebSocket连接关闭: {} (状态: {})", session.getId(), status);
try {
ClientUserOnline loginUser = tokenApiService.getLoginUser(session);
if (loginUser != null) {
String sessionKey = getSessionKey(loginUser);
// 停止心跳任务
stopHeartbeat(sessionKey);
// 移除会话
sessions.remove(sessionKey);
// 设置设备为离线状态
myDeviceService.onLine(loginUser.getDeviceId(), DeviceOnLineStatus.off_line.getCode());
log.info("{}:已离线", sessionKey);
}
} catch (Exception e) {
log.error("处理WebSocket连接关闭时出错", e);
}
}
// 核心方法:向指定客户端发送同步通知
public void notifyClient(String sessionKey, String message) {
WebSocketSession session = sessions.get(sessionKey);
if (session != null && session.isOpen()) {
try {
log.debug("向客户端发送消息: {} => {}", sessionKey, message);
session.sendMessage(new TextMessage(message));
} catch (IOException e) {
log.error("向客户端发送消息失败: {}", sessionKey, e);
// 清理无效会话
sessions.remove(sessionKey);
stopHeartbeat(sessionKey);
}
} else {
log.warn("无法向客户端发送消息: 会话不存在或已关闭: {}", sessionKey);
}
}
}
package com.ruoyi.websocket.interceptor;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.HandshakeInterceptor;
import java.util.List;
import java.util.Map;
//自定义握手阶段,需要进行身份验证 TODO
@Component
public class CustomHandshakeInterceptor implements HandshakeInterceptor {
@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
// 在这里可以添加自定义的握手处理逻辑,比如身份验证
System.out.println("正在进行握手处理...");
// 获取请求头
org.springframework.http.HttpHeaders headers = request.getHeaders();
// 将请求头信息存储到 attributes 中
attributes.put("headers", headers);
return true;
}
@Override
public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) {
// 握手完成后的处理逻辑
System.out.println("握手完成");
}
}
package com.ruoyi.websocket.service;
import com.google.gson.Gson;
import com.ruoyi.websocket.dto.WebSocketMsgDto;
import com.ruoyi.websocket.handler.SyncNotificationHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import java.util.List;
@Service
public class WebSocketService {
@Autowired
private SyncNotificationHandler syncNotificationHandler;
@Async("webSocketSendMsgThreadPoll")
public void sendMsg(List<WebSocketMsgDto> webSocketSendMsg) {
for (WebSocketMsgDto webSocketMsgDto : webSocketSendMsg) {
Gson gson = new Gson();
String msg = gson.toJson(webSocketMsgDto);
syncNotificationHandler.notifyClient(webSocketMsgDto.getSessionKey(),msg);
}
}
}
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper
PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.ruoyi.device.mapper.NmyDeviceMapper">
<resultMap type="com.ruoyi.device.domain.NmyDevice" id="NmyDeviceResult">
<result property="id" column="id" />
<result property="userId" column="user_id" />
<result property="deviceFlag" column="device_flag" />
<result property="onLine" column="on_line" />
<result property="token" column="token" />
<result property="deviceIp" column="device_ip" />
<result property="delFlag" column="del_flag" />
<result property="createTime" column="create_time" />
<result property="updateTime" column="update_time" />
<result property="serverPath" column="server_path" />
</resultMap>
<sql id="selectNmyDeviceVo">
select id, user_id, device_flag, on_line, token, device_ip, del_flag, create_time, update_time,server_path from nmy_device
</sql>
<select id="selectNmyDeviceList" parameterType="com.ruoyi.device.domain.NmyDevice" resultMap="NmyDeviceResult">
<include refid="selectNmyDeviceVo"/>
<where>
<if test="userId != null "> and user_id = #{userId}</if>
<if test="onLine != null and onLine != ''"> and on_line = #{onLine}</if>
<if test="token != null and token != ''"> and token = #{token}</if>
<if test="deviceIp != null and deviceIp != ''"> and device_ip = #{deviceIp}</if>
<if test="deviceFlag != null and deviceFlag != ''"> and device_flag = #{deviceFlag}</if>
<if test="serverPath != null and serverPath != ''"> and server_path = #{serverPath}</if>
</where>
and del_flag = '0'
</select>
<select id="selectNmyDeviceById" parameterType="Long" resultMap="NmyDeviceResult">
<include refid="selectNmyDeviceVo"/>
where id = #{id}
</select>
<insert id="insertNmyDevice" parameterType="com.ruoyi.device.domain.NmyDevice" useGeneratedKeys="true" keyProperty="id">
insert into nmy_device
<trim prefix="(" suffix=")" suffixOverrides=",">
<if test="userId != null">user_id,</if>
<if test="deviceFlag != null">device_flag,</if>
<if test="onLine != null">on_line,</if>
<if test="token != null">token,</if>
<if test="deviceIp != null">device_ip,</if>
<if test="delFlag != null">del_flag,</if>
<if test="createTime != null">create_time,</if>
<if test="updateTime != null">update_time,</if>
<if test="serverPath != null">server_path,</if>
</trim>
<trim prefix="values (" suffix=")" suffixOverrides=",">
<if test="userId != null">#{userId},</if>
<if test="deviceFlag != null">#{deviceFlag},</if>
<if test="onLine != null">#{onLine},</if>
<if test="token != null">#{token},</if>
<if test="deviceIp != null">#{deviceIp},</if>
<if test="delFlag != null">#{delFlag},</if>
<if test="createTime != null">#{createTime},</if>
<if test="updateTime != null">#{updateTime},</if>
<if test="serverPath != null">#{serverPath},</if>
</trim>
</insert>
<update id="updateNmyDevice" parameterType="com.ruoyi.device.domain.NmyDevice">
update nmy_device
<trim prefix="SET" suffixOverrides=",">
<if test="userId != null">user_id = #{userId},</if>
<if test="deviceFlag != null">device_flag = #{deviceFlag},</if>
<if test="onLine != null">on_line = #{onLine},</if>
<if test="token != null">token = #{token},</if>
<if test="deviceIp != null">device_ip = #{deviceIp},</if>
<if test="delFlag != null">del_flag = #{delFlag},</if>
<if test="createTime != null">create_time = #{createTime},</if>
<if test="updateTime != null">update_time = #{updateTime},</if>
<if test="serverPath != null">server_path = #{serverPath},</if>
</trim>
where id = #{id}
</update>
<!-- <delete id="deleteNmyDeviceById" parameterType="Long">-->
<!-- delete from nmy_device where id = #{id}-->
<!-- </delete>-->
<update id="deleteNmyDeviceById" parameterType="Long">
update nmy_device set del_flag = '2' where id = #{id}
</update>
<!-- <delete id="deleteNmyDeviceByIds" parameterType="String">-->
<!-- delete from nmy_device where id in -->
<!-- <foreach item="id" collection="array" open="(" separator="," close=")">-->
<!-- #{id}-->
<!-- </foreach>-->
<!-- </delete>-->
<update id="deleteNmyDeviceByIds" parameterType="String">
update nmy_device set del_flag = '2' where id in
<foreach item="id" collection="array" open="(" separator="," close=")">
#{id}
</foreach>
</update>
</mapper>
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper
PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.ruoyi.user.mapper.NmyUserMapper">
<resultMap type="com.ruoyi.user.domain.NmyUser" id="NmyUserResult">
<result property="id" column="id" />
<result property="phoneNumber" column="phone_number" />
<result property="password" column="password" />
<result property="delFlag" column="del_flag" />
<result property="createTime" column="create_time" />
<result property="updateTime" column="update_time" />
</resultMap>
<sql id="selectNmyUserVo">
select id, phone_number, password, del_flag, create_time, update_time from nmy_user
</sql>
<select id="selectNmyUserList" parameterType="com.ruoyi.user.domain.NmyUser" resultMap="NmyUserResult">
<include refid="selectNmyUserVo"/>
<where>
<if test="phoneNumber != null and phoneNumber != ''"> and phone_number = #{phoneNumber}</if>
<if test="password != null and password != ''"> and password = #{password}</if>
</where>
and del_flag = '0'
</select>
<select id="selectNmyUserById" parameterType="Long" resultMap="NmyUserResult">
<include refid="selectNmyUserVo"/>
where id = #{id}
</select>
<insert id="insertNmyUser" parameterType="com.ruoyi.user.domain.NmyUser" useGeneratedKeys="true" keyProperty="id">
insert into nmy_user
<trim prefix="(" suffix=")" suffixOverrides=",">
<if test="id != null">id,</if>
<if test="phoneNumber != null">phone_number,</if>
<if test="password != null">password,</if>
<if test="delFlag != null">del_flag,</if>
<if test="createTime != null">create_time,</if>
<if test="updateTime != null">update_time,</if>
</trim>
<trim prefix="values (" suffix=")" suffixOverrides=",">
<if test="id != null">#{id},</if>
<if test="phoneNumber != null">#{phoneNumber},</if>
<if test="password != null">#{password},</if>
<if test="delFlag != null">#{delFlag},</if>
<if test="createTime != null">#{createTime},</if>
<if test="updateTime != null">#{updateTime},</if>
</trim>
</insert>
<update id="updateNmyUser" parameterType="com.ruoyi.user.domain.NmyUser">
update nmy_user
<trim prefix="SET" suffixOverrides=",">
<if test="phoneNumber != null">phone_number = #{phoneNumber},</if>
<if test="password != null">password = #{password},</if>
<if test="delFlag != null">del_flag = #{delFlag},</if>
<if test="createTime != null">create_time = #{createTime},</if>
<if test="updateTime != null">update_time = #{updateTime},</if>
</trim>
where id = #{id}
</update>
<!-- <delete id="deleteNmyUserById" parameterType="Long">-->
<!-- delete from nmy_user where id = #{id}-->
<!-- </delete>-->
<update id="deleteNmyUserById" parameterType="Long">
update nmy_user set del_flag = '2' where id = #{id}
</update>
<!-- <delete id="deleteNmyUserByIds" parameterType="String">-->
<!-- delete from nmy_user where id in -->
<!-- <foreach item="id" collection="array" open="(" separator="," close=")">-->
<!-- #{id}-->
<!-- </foreach>-->
<!-- </delete>-->
<update id="deleteNmyUserByIds" parameterType="String">
update nmy_user set del_flag = '2' where id in
<foreach item="id" collection="array" open="(" separator="," close=")">
#{id}
</foreach>
</update>
</mapper>
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper
PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.ruoyi.watch.mapper.NmyUserWatchBaseMapper">
<resultMap type="com.ruoyi.watch.domain.NmyUserWatchBase" id="NmyUserWatchBaseResult">
<result property="id" column="id" />
<result property="watchPath" column="watch_path" />
<result property="userId" column="user_id" />
<result property="deviceId" column="device_id" />
<result property="delFlag" column="del_flag" />
<result property="createTime" column="create_time" />
</resultMap>
<sql id="selectNmyUserWatchBaseVo">
select id, watch_path, user_id, device_id, del_flag, create_time from nmy_user_watch_base
</sql>
<select id="selectNmyUserWatchBaseList" parameterType="com.ruoyi.watch.domain.NmyUserWatchBase" resultMap="NmyUserWatchBaseResult">
<include refid="selectNmyUserWatchBaseVo"/>
<where>
<if test="watchPath != null and watchPath != ''"> and watch_path = #{watchPath}</if>
<if test="userId != null "> and user_id = #{userId}</if>
<if test="deviceId != null "> and device_id = #{deviceId}</if>
</where>
and del_flag = '0'
</select>
<select id="selectNmyUserWatchBaseById" parameterType="Long" resultMap="NmyUserWatchBaseResult">
<include refid="selectNmyUserWatchBaseVo"/>
where id = #{id}
</select>
<insert id="insertNmyUserWatchBase" parameterType="com.ruoyi.watch.domain.NmyUserWatchBase">
insert into nmy_user_watch_base
<trim prefix="(" suffix=")" suffixOverrides=",">
<if test="id != null">id,</if>
<if test="watchPath != null">watch_path,</if>
<if test="userId != null">user_id,</if>
<if test="deviceId != null">device_id,</if>
<if test="delFlag != null">del_flag,</if>
<if test="createTime != null">create_time,</if>
</trim>
<trim prefix="values (" suffix=")" suffixOverrides=",">
<if test="id != null">#{id},</if>
<if test="watchPath != null">#{watchPath},</if>
<if test="userId != null">#{userId},</if>
<if test="deviceId != null">#{deviceId},</if>
<if test="delFlag != null">#{delFlag},</if>
<if test="createTime != null">#{createTime},</if>
</trim>
</insert>
<update id="updateNmyUserWatchBase" parameterType="com.ruoyi.watch.domain.NmyUserWatchBase">
update nmy_user_watch_base
<trim prefix="SET" suffixOverrides=",">
<if test="watchPath != null">watch_path = #{watchPath},</if>
<if test="userId != null">user_id = #{userId},</if>
<if test="deviceId != null">device_id = #{deviceId},</if>
<if test="delFlag != null">del_flag = #{delFlag},</if>
<if test="createTime != null">create_time = #{createTime},</if>
</trim>
where id = #{id}
</update>
<!-- <delete id="deleteNmyUserWatchBaseById" parameterType="Long">-->
<!-- delete from nmy_user_watch_base where id = #{id}-->
<!-- </delete>-->
<update id="deleteNmyUserWatchBaseById" parameterType="Long">
update nmy_user_watch_base set del_flag = '2' where id = #{id}
</update>
<!-- <delete id="deleteNmyUserWatchBaseByIds" parameterType="String">-->
<!-- delete from nmy_user_watch_base where id in -->
<!-- <foreach item="id" collection="array" open="(" separator="," close=")">-->
<!-- #{id}-->
<!-- </foreach>-->
<!-- </delete>-->
<update id="deleteNmyUserWatchBaseByIds" parameterType="String">
update nmy_user_watch_base set del_flag = '2' where id in
<foreach item="id" collection="array" open="(" separator="," close=")">
#{id}
</foreach>
</update>
</mapper>
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper
PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.ruoyi.watch.mapper.NmyUserWatchSubscribeMapper">
<resultMap type="com.ruoyi.watch.domain.NmyUserWatchSubscribe" id="NmyUserWatchSubscribeResult">
<result property="id" column="id" />
<result property="watchBaseId" column="watch_base_id" />
<result property="userId" column="user_id" />
<result property="deviceId" column="device_id" />
<result property="syncPath" column="sync_path" />
<result property="createTime" column="create_time" />
<result property="delFlag" column="del_flag" />
</resultMap>
<sql id="selectNmyUserWatchSubscribeVo">
select id, watch_base_id, user_id, device_id, sync_path, create_time, del_flag from nmy_user_watch_subscribe
</sql>
<select id="selectNmyUserWatchSubscribeList" parameterType="com.ruoyi.watch.domain.NmyUserWatchSubscribe" resultMap="NmyUserWatchSubscribeResult">
<include refid="selectNmyUserWatchSubscribeVo"/>
<where>
<if test="watchBaseId != null "> and watch_base_id = #{watchBaseId}</if>
<if test="userId != null "> and user_id = #{userId}</if>
<if test="deviceId != null "> and device_id = #{deviceId}</if>
<if test="syncPath != null and syncPath != ''"> and sync_path = #{syncPath}</if>
</where>
and del_flag = '0'
</select>
<select id="selectOnLingSubscriberByBaseId" parameterType="long" resultMap="NmyUserWatchSubscribeResult">
SELECT
ws.id,
ws.watch_base_id,
ws.user_id,
ws.device_id,
ws.sync_path,
ws.create_time
FROM
nmy_user_watch_subscribe ws
LEFT JOIN nmy_user_watch_base wb ON ws.watch_base_id = wb.id
LEFT JOIN nmy_device d on d.id = ws.device_id
WHERE
ws.watch_base_id = #{watchBaseId} and d.on_line = '1'
</select>
<select id="selectNmyUserWatchSubscribeById" parameterType="Long" resultMap="NmyUserWatchSubscribeResult">
<include refid="selectNmyUserWatchSubscribeVo"/>
where id = #{id}
</select>
<insert id="insertNmyUserWatchSubscribe" parameterType="com.ruoyi.watch.domain.NmyUserWatchSubscribe">
insert into nmy_user_watch_subscribe
<trim prefix="(" suffix=")" suffixOverrides=",">
<if test="id != null">id,</if>
<if test="watchBaseId != null">watch_base_id,</if>
<if test="userId != null">user_id,</if>
<if test="deviceId != null">device_id,</if>
<if test="syncPath != null">sync_path,</if>
<if test="createTime != null">create_time,</if>
<if test="delFlag != null">del_flag,</if>
</trim>
<trim prefix="values (" suffix=")" suffixOverrides=",">
<if test="id != null">#{id},</if>
<if test="watchBaseId != null">#{watchBaseId},</if>
<if test="userId != null">#{userId},</if>
<if test="deviceId != null">#{deviceId},</if>
<if test="syncPath != null">#{syncPath},</if>
<if test="createTime != null">#{createTime},</if>
<if test="delFlag != null">#{delFlag},</if>
</trim>
</insert>
<update id="updateNmyUserWatchSubscribe" parameterType="com.ruoyi.watch.domain.NmyUserWatchSubscribe">
update nmy_user_watch_subscribe
<trim prefix="SET" suffixOverrides=",">
<if test="watchBaseId != null">watch_base_id = #{watchBaseId},</if>
<if test="userId != null">user_id = #{userId},</if>
<if test="deviceId != null">device_id = #{deviceId},</if>
<if test="syncPath != null">sync_path = #{syncPath},</if>
<if test="createTime != null">create_time = #{createTime},</if>
<if test="delFlag != null">del_flag = #{delFlag},</if>
</trim>
where id = #{id}
</update>
<!-- <delete id="deleteNmyUserWatchSubscribeById" parameterType="Long">-->
<!-- delete from nmy_user_watch_subscribe where id = #{id}-->
<!-- </delete>-->
<update id="deleteNmyUserWatchSubscribeById" parameterType="Long">
update nmy_user_watch_subscribe set del_flag = '2' where id = #{id}
</update>
<!-- <delete id="deleteNmyUserWatchSubscribeByIds" parameterType="String">-->
<!-- delete from nmy_user_watch_subscribe where id in -->
<!-- <foreach item="id" collection="array" open="(" separator="," close=")">-->
<!-- #{id}-->
<!-- </foreach>-->
<!-- </delete>-->
<update id="deleteNmyUserWatchSubscribeByIds" parameterType="String">
update nmy_user_watch_subscribe set del_flag = '2' where id in
<foreach item="id" collection="array" open="(" separator="," close=")">
#{id}
</foreach>
</update>
</mapper>
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment