|
@@ -1,6 +1,12 @@
|
|
package com.backendsys.modules.sdk.comfyui.service.impl;
|
|
package com.backendsys.modules.sdk.comfyui.service.impl;
|
|
|
|
|
|
|
|
+import com.backendsys.modules.ai.chat.entity.ChatSseMessage;
|
|
|
|
+import com.backendsys.modules.common.config.security.utils.SecurityUtil;
|
|
import com.backendsys.modules.sdk.comfyui.service.ComfyUISocketService;
|
|
import com.backendsys.modules.sdk.comfyui.service.ComfyUISocketService;
|
|
|
|
+import com.backendsys.modules.sse.entity.SseResponse;
|
|
|
|
+import com.backendsys.modules.sse.entity.SseResponseEnum;
|
|
|
|
+import com.backendsys.modules.sse.utils.SseUtil;
|
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
import org.springframework.beans.factory.annotation.Value;
|
|
import org.springframework.beans.factory.annotation.Value;
|
|
import org.springframework.stereotype.Service;
|
|
import org.springframework.stereotype.Service;
|
|
import org.springframework.web.reactive.socket.WebSocketMessage;
|
|
import org.springframework.web.reactive.socket.WebSocketMessage;
|
|
@@ -19,6 +25,9 @@ import java.util.concurrent.ConcurrentHashMap;
|
|
@Service
|
|
@Service
|
|
public class ComfyUISocketServiceImpl implements ComfyUISocketService {
|
|
public class ComfyUISocketServiceImpl implements ComfyUISocketService {
|
|
|
|
|
|
|
|
+ @Autowired
|
|
|
|
+ private SseUtil sseUtil;
|
|
|
|
+
|
|
@Value("${comfyui.token}")
|
|
@Value("${comfyui.token}")
|
|
private String COMFYUI_TOKEN;
|
|
private String COMFYUI_TOKEN;
|
|
|
|
|
|
@@ -41,32 +50,23 @@ public class ComfyUISocketServiceImpl implements ComfyUISocketService {
|
|
|
|
|
|
/**
|
|
/**
|
|
* [ComfyUI] 创建 WebSocket 监听连接
|
|
* [ComfyUI] 创建 WebSocket 监听连接
|
|
- * @param clientId 客户端ID(用于标识连接)
|
|
|
|
- * @param wsUrl WebSocket 地址
|
|
|
|
- * @return Mono<Void> 表示连接操作
|
|
|
|
*/
|
|
*/
|
|
@Override
|
|
@Override
|
|
public Mono<Void> connect(String clientId, String wsUrl) {
|
|
public Mono<Void> connect(String clientId, String wsUrl) {
|
|
-
|
|
|
|
return Mono.defer(() -> {
|
|
return Mono.defer(() -> {
|
|
if (sessions.containsKey(clientId)) {
|
|
if (sessions.containsKey(clientId)) {
|
|
return Mono.error(new IllegalStateException("Connection already exists for client: " + clientId));
|
|
return Mono.error(new IllegalStateException("Connection already exists for client: " + clientId));
|
|
}
|
|
}
|
|
-
|
|
|
|
// 动态创建带有认证头的客户端
|
|
// 动态创建带有认证头的客户端
|
|
WebSocketClient clientWithAuth = createWebSocketClientWithToken(COMFYUI_TOKEN);
|
|
WebSocketClient clientWithAuth = createWebSocketClientWithToken(COMFYUI_TOKEN);
|
|
return clientWithAuth.execute(URI.create(wsUrl + "?clientId=" + clientId), session -> {
|
|
return clientWithAuth.execute(URI.create(wsUrl + "?clientId=" + clientId), session -> {
|
|
// 保存会话
|
|
// 保存会话
|
|
sessions.put(clientId, session);
|
|
sessions.put(clientId, session);
|
|
-
|
|
|
|
// 接收消息
|
|
// 接收消息
|
|
Flux<String> incomingMessages = session.receive()
|
|
Flux<String> incomingMessages = session.receive()
|
|
.map(WebSocketMessage::getPayloadAsText)
|
|
.map(WebSocketMessage::getPayloadAsText)
|
|
.doOnNext(message -> {
|
|
.doOnNext(message -> {
|
|
-
|
|
|
|
System.out.println("(doOnNext) Received from " + clientId + ": " + message);
|
|
System.out.println("(doOnNext) Received from " + clientId + ": " + message);
|
|
-// // 转发到消息总线
|
|
|
|
-// messageSink.tryEmitNext(message);
|
|
|
|
})
|
|
})
|
|
.doOnError(e -> {
|
|
.doOnError(e -> {
|
|
System.err.println("(doOnError) Error for " + clientId + ": " + e.getMessage());
|
|
System.err.println("(doOnError) Error for " + clientId + ": " + e.getMessage());
|
|
@@ -75,12 +75,54 @@ public class ComfyUISocketServiceImpl implements ComfyUISocketService {
|
|
System.out.println("(doFinally) Connection closed for " + clientId + ": " + signal);
|
|
System.out.println("(doFinally) Connection closed for " + clientId + ": " + signal);
|
|
sessions.remove(clientId);
|
|
sessions.remove(clientId);
|
|
});
|
|
});
|
|
-
|
|
|
|
// 需要返回一个Mono<Void>来表示处理完成
|
|
// 需要返回一个Mono<Void>来表示处理完成
|
|
return incomingMessages.then();
|
|
return incomingMessages.then();
|
|
});
|
|
});
|
|
});
|
|
});
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * [ComfyUI] 创建 WebSocket 监听连接 (转发到 SSE)
|
|
|
|
+ */
|
|
|
|
+ @Override
|
|
|
|
+ public Mono<Void> connectToSse(String clientId, String wsUrl) {
|
|
|
|
+
|
|
|
|
+ Long user_id = SecurityUtil.getUserId();
|
|
|
|
+
|
|
|
|
+ return Mono.defer(() -> {
|
|
|
|
+ if (sessions.containsKey(clientId)) {
|
|
|
|
+ return Mono.error(new IllegalStateException("Connection already exists for client: " + clientId));
|
|
|
|
+ }
|
|
|
|
+ // 动态创建带有认证头的客户端
|
|
|
|
+ WebSocketClient clientWithAuth = createWebSocketClientWithToken(COMFYUI_TOKEN);
|
|
|
|
+ return clientWithAuth.execute(URI.create(wsUrl + "?clientId=" + clientId), session -> {
|
|
|
|
+ // 保存会话
|
|
|
|
+ sessions.put(clientId, session);
|
|
|
|
+ // 接收消息
|
|
|
|
+
|
|
|
|
+// // [SSE] 发送消息
|
|
|
|
+// ChatSseMessage chatSearchSseMessage = new ChatSseMessage("SEARCH", context, null, history_code);
|
|
|
|
|
|
|
|
+
|
|
|
|
+ Flux<String> incomingMessages = session.receive()
|
|
|
|
+ .map(WebSocketMessage::getPayloadAsText)
|
|
|
|
+ .doOnNext(message -> {
|
|
|
|
+ System.out.println("(doOnNext) Received from " + clientId + ": " + message);
|
|
|
|
+ sseUtil.send(user_id, new SseResponse(SseResponseEnum.COMFYUI, message).toJsonStr());
|
|
|
|
+ })
|
|
|
|
+ .doOnError(e -> {
|
|
|
|
+ System.err.println("(doOnError) Error for " + clientId + ": " + e.getMessage());
|
|
|
|
+ sseUtil.send(user_id, new SseResponse(SseResponseEnum.COMFYUI, e.getMessage()).toJsonStr());
|
|
|
|
+ })
|
|
|
|
+ .doFinally(signal -> {
|
|
|
|
+ System.out.println("(doFinally) Connection closed for " + clientId + ": " + signal);
|
|
|
|
+ sseUtil.send(user_id, new SseResponse(SseResponseEnum.COMFYUI, signal).toJsonStr());
|
|
|
|
+ sessions.remove(clientId);
|
|
|
|
+ });
|
|
|
|
+ // 需要返回一个Mono<Void>来表示处理完成
|
|
|
|
+ return incomingMessages.then();
|
|
|
|
+ });
|
|
|
|
+ });
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -94,6 +136,12 @@ public class ComfyUISocketServiceImpl implements ComfyUISocketService {
|
|
WebSocketSession session = sessions.get(clientId);
|
|
WebSocketSession session = sessions.get(clientId);
|
|
if (session != null) {
|
|
if (session != null) {
|
|
System.out.println("disconnect success! clientId: " + clientId);
|
|
System.out.println("disconnect success! clientId: " + clientId);
|
|
|
|
+
|
|
|
|
+ Long user_id = SecurityUtil.getUserId();
|
|
|
|
+ if (user_id != null) {
|
|
|
|
+ sseUtil.send(user_id, new SseResponse(SseResponseEnum.COMFYUI, "disconnect success! clientId: " + clientId).toJsonStr());
|
|
|
|
+ }
|
|
|
|
+
|
|
session.close().subscribe();
|
|
session.close().subscribe();
|
|
sessions.remove(clientId);
|
|
sessions.remove(clientId);
|
|
}
|
|
}
|