|
@@ -4,18 +4,14 @@ import cn.hutool.core.convert.Convert;
|
|
import cn.hutool.json.JSONArray;
|
|
import cn.hutool.json.JSONArray;
|
|
import cn.hutool.json.JSONObject;
|
|
import cn.hutool.json.JSONObject;
|
|
import cn.hutool.json.JSONUtil;
|
|
import cn.hutool.json.JSONUtil;
|
|
-import com.backendsys.modules.ai.chat.entity.ChatSseMessage;
|
|
|
|
import com.backendsys.modules.common.config.security.utils.SecurityUtil;
|
|
import com.backendsys.modules.common.config.security.utils.SecurityUtil;
|
|
import com.backendsys.modules.crt.dao.CrtGenerateImageDao;
|
|
import com.backendsys.modules.crt.dao.CrtGenerateImageDao;
|
|
import com.backendsys.modules.crt.entity.CrtGenerateImage;
|
|
import com.backendsys.modules.crt.entity.CrtGenerateImage;
|
|
import com.backendsys.modules.sdk.comfyui.enums.TypeEnums;
|
|
import com.backendsys.modules.sdk.comfyui.enums.TypeEnums;
|
|
import com.backendsys.modules.sdk.comfyui.service.ComfyUISocketService;
|
|
import com.backendsys.modules.sdk.comfyui.service.ComfyUISocketService;
|
|
-import com.backendsys.modules.sdk.douyincloud.tos.service.DouyinTosService;
|
|
|
|
-import com.backendsys.modules.sdk.tencentcloud.cos.service.TencentCosService;
|
|
|
|
import com.backendsys.modules.sse.entity.SseResponse;
|
|
import com.backendsys.modules.sse.entity.SseResponse;
|
|
import com.backendsys.modules.sse.entity.SseResponseEnum;
|
|
import com.backendsys.modules.sse.entity.SseResponseEnum;
|
|
import com.backendsys.modules.sse.utils.SseUtil;
|
|
import com.backendsys.modules.sse.utils.SseUtil;
|
|
-import com.backendsys.modules.system.service.SysCommonService;
|
|
|
|
import com.backendsys.modules.upload.entity.SysFileResult;
|
|
import com.backendsys.modules.upload.entity.SysFileResult;
|
|
import com.backendsys.modules.upload.service.SysFileService;
|
|
import com.backendsys.modules.upload.service.SysFileService;
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
@@ -32,14 +28,13 @@ import reactor.netty.http.client.HttpClient;
|
|
import java.net.URI;
|
|
import java.net.URI;
|
|
import java.time.Duration;
|
|
import java.time.Duration;
|
|
import java.util.ArrayList;
|
|
import java.util.ArrayList;
|
|
-import java.util.Arrays;
|
|
|
|
import java.util.List;
|
|
import java.util.List;
|
|
import java.util.Map;
|
|
import java.util.Map;
|
|
import java.util.concurrent.CompletableFuture;
|
|
import java.util.concurrent.CompletableFuture;
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
|
|
|
@Service
|
|
@Service
|
|
-public class ComfyUISocketServiceImpl implements ComfyUISocketService {
|
|
|
|
|
|
+public class ComfyuiSocketServiceImpl implements ComfyUISocketService {
|
|
|
|
|
|
@Autowired
|
|
@Autowired
|
|
private SseUtil sseUtil;
|
|
private SseUtil sseUtil;
|
|
@@ -74,30 +69,30 @@ public class ComfyUISocketServiceImpl implements ComfyUISocketService {
|
|
* [ComfyUI] 创建 WebSocket 监听连接
|
|
* [ComfyUI] 创建 WebSocket 监听连接
|
|
*/
|
|
*/
|
|
@Override
|
|
@Override
|
|
- public Mono<Void> connect(String clientId, Integer port) {
|
|
|
|
|
|
+ public Mono<Void> connect(String client_id, Integer port) {
|
|
|
|
|
|
String wsUrl = "ws://" + COMFYUI_HOST + ":" + port + "/ws";
|
|
String wsUrl = "ws://" + COMFYUI_HOST + ":" + port + "/ws";
|
|
return Mono.defer(() -> {
|
|
return Mono.defer(() -> {
|
|
- if (sessions.containsKey(clientId)) {
|
|
|
|
- return Mono.error(new IllegalStateException("Connection already exists for client: " + clientId));
|
|
|
|
|
|
+ if (sessions.containsKey(client_id)) {
|
|
|
|
+ return Mono.error(new IllegalStateException("Connection already exists for client: " + client_id));
|
|
}
|
|
}
|
|
// 动态创建带有认证头的客户端
|
|
// 动态创建带有认证头的客户端
|
|
WebSocketClient clientWithAuth = createWebSocketClientWithToken(COMFYUI_TOKEN);
|
|
WebSocketClient clientWithAuth = createWebSocketClientWithToken(COMFYUI_TOKEN);
|
|
- return clientWithAuth.execute(URI.create(wsUrl + "?clientId=" + clientId), session -> {
|
|
|
|
|
|
+ return clientWithAuth.execute(URI.create(wsUrl + "?clientId=" + client_id), session -> {
|
|
// 保存会话
|
|
// 保存会话
|
|
- sessions.put(clientId, session);
|
|
|
|
|
|
+ sessions.put(client_id, session);
|
|
// 接收消息
|
|
// 接收消息
|
|
Flux<String> incomingMessages = session.receive()
|
|
Flux<String> incomingMessages = session.receive()
|
|
.map(WebSocketMessage::getPayloadAsText)
|
|
.map(WebSocketMessage::getPayloadAsText)
|
|
.doOnNext(message -> {
|
|
.doOnNext(message -> {
|
|
- System.out.println("(doOnNext) Received from " + clientId + ": " + message);
|
|
|
|
|
|
+ System.out.println("(doOnNext) Received from " + client_id + ": " + message);
|
|
})
|
|
})
|
|
.doOnError(e -> {
|
|
.doOnError(e -> {
|
|
- System.err.println("(doOnError) Error for " + clientId + ": " + e.getMessage());
|
|
|
|
|
|
+ System.err.println("(doOnError) Error for " + client_id + ": " + e.getMessage());
|
|
})
|
|
})
|
|
.doFinally(signal -> {
|
|
.doFinally(signal -> {
|
|
- System.out.println("(doFinally) Connection closed for " + clientId + ": " + signal);
|
|
|
|
- sessions.remove(clientId);
|
|
|
|
|
|
+ System.out.println("(doFinally) Connection closed for " + client_id + ": " + signal);
|
|
|
|
+ sessions.remove(client_id);
|
|
});
|
|
});
|
|
// 需要返回一个Mono<Void>来表示处理完成
|
|
// 需要返回一个Mono<Void>来表示处理完成
|
|
return incomingMessages.then();
|
|
return incomingMessages.then();
|
|
@@ -109,120 +104,72 @@ public class ComfyUISocketServiceImpl implements ComfyUISocketService {
|
|
* [ComfyUI] 创建 WebSocket 监听连接 (转发到 SSE)
|
|
* [ComfyUI] 创建 WebSocket 监听连接 (转发到 SSE)
|
|
*/
|
|
*/
|
|
@Override
|
|
@Override
|
|
- public Mono<Void> connectToSse(String clientId, Integer port) {
|
|
|
|
- return connectToSse(clientId, port, null);
|
|
|
|
|
|
+ public Mono<Void> connectToSse(String client_id, Integer port) {
|
|
|
|
+ return connectToSse(client_id, port, null);
|
|
}
|
|
}
|
|
@Override
|
|
@Override
|
|
- public Mono<Void> connectToSse(String clientId, Integer port, Map<String, Object> params) {
|
|
|
|
|
|
+ public Mono<Void> connectToSse(String client_id, Integer port, Map<String, Object> params) {
|
|
|
|
|
|
- CrtGenerateImage entity = new CrtGenerateImage();
|
|
|
|
-
|
|
|
|
- // Websocket 获取不到上下文信息,所以 user_id 需要从外部传递
|
|
|
|
|
|
+ // 由于 Websocket 获取不到上下文信息,所以 user_id 不能在 socket 周期获取
|
|
Long user_id = SecurityUtil.getUserId();
|
|
Long user_id = SecurityUtil.getUserId();
|
|
- entity.setUser_id(user_id);
|
|
|
|
|
|
|
|
String wsUrl = "ws://" + COMFYUI_HOST + ":" + port + "/ws";
|
|
String wsUrl = "ws://" + COMFYUI_HOST + ":" + port + "/ws";
|
|
return Mono.defer(() -> {
|
|
return Mono.defer(() -> {
|
|
- if (sessions.containsKey(clientId)) {
|
|
|
|
- return Mono.error(new IllegalStateException("Connection already exists for client: " + clientId));
|
|
|
|
|
|
+ if (sessions.containsKey(client_id)) {
|
|
|
|
+ return Mono.error(new IllegalStateException("Connection already exists for client: " + client_id));
|
|
}
|
|
}
|
|
// 动态创建带有认证头的客户端
|
|
// 动态创建带有认证头的客户端
|
|
WebSocketClient clientWithAuth = createWebSocketClientWithToken(COMFYUI_TOKEN);
|
|
WebSocketClient clientWithAuth = createWebSocketClientWithToken(COMFYUI_TOKEN);
|
|
- return clientWithAuth.execute(URI.create(wsUrl + "?clientId=" + clientId), session -> {
|
|
|
|
|
|
+ return clientWithAuth.execute(URI.create(wsUrl + "?clientId=" + client_id), session -> {
|
|
// 保存会话
|
|
// 保存会话
|
|
- sessions.put(clientId, session);
|
|
|
|
|
|
+ sessions.put(client_id, session);
|
|
// 接收消息
|
|
// 接收消息
|
|
|
|
|
|
System.out.println("------ wsUrl: " + wsUrl + " ------");
|
|
System.out.println("------ wsUrl: " + wsUrl + " ------");
|
|
- System.out.println("------ connectToSse clientId: " + clientId + ", user_id: " + user_id + " ------");
|
|
|
|
|
|
+ System.out.println("------ connectToSse client_id: " + client_id + ", user_id: " + user_id + " ------");
|
|
|
|
|
|
Flux<String> incomingMessages = session.receive()
|
|
Flux<String> incomingMessages = session.receive()
|
|
.map(WebSocketMessage::getPayloadAsText)
|
|
.map(WebSocketMessage::getPayloadAsText)
|
|
.doOnNext(message -> {
|
|
.doOnNext(message -> {
|
|
- System.out.println("(doOnNext) Received from " + clientId + ": " + message);
|
|
|
|
|
|
+ System.out.println("(doOnNext) Received from " + client_id + ": " + message);
|
|
|
|
|
|
JSONObject data = JSONUtil.parseObj(message);
|
|
JSONObject data = JSONUtil.parseObj(message);
|
|
- String type = Convert.toStr(data.get("type"));
|
|
|
|
|
|
|
|
- // == [任务执行成功] =======================================================
|
|
|
|
- // { "type": "executed", "data": { "output": { "images": [{ "filename": "ComfyUI_00117_.png" }] } } }
|
|
|
|
|
|
+ // == [任务执行完成] =======================================================
|
|
|
|
+ // { "type": "executed", .. }
|
|
|
|
+ String type = Convert.toStr(data.get("type"));
|
|
if (TypeEnums.EXECUTED.getValue().equals(type)) {
|
|
if (TypeEnums.EXECUTED.getValue().equals(type)) {
|
|
|
|
|
|
JSONObject dataChildren = JSONUtil.parseObj(data.get("data"));
|
|
JSONObject dataChildren = JSONUtil.parseObj(data.get("data"));
|
|
JSONObject output = JSONUtil.parseObj(dataChildren.get("output"));
|
|
JSONObject output = JSONUtil.parseObj(dataChildren.get("output"));
|
|
|
|
|
|
- // -- [生成图片] ------------------------------------------------------
|
|
|
|
|
|
+ // -- [生成图片 -> 转存图片 -> 新增记录] ------------------------------------------------------
|
|
// 由于图片地址不是公开的,需要加 Token 访问,因此不能公开返回原始图片地址,比如:
|
|
// 由于图片地址不是公开的,需要加 Token 访问,因此不能公开返回原始图片地址,比如:
|
|
// http://43.128.1.201:8000/api/view?filename=fenjing_1_00012_.png&token=$2b$12$.MR4qGaFetN1FPQzbfyIrehsyjnPJ12xAZhR/l7KZpLkUPQTCG4gy
|
|
// http://43.128.1.201:8000/api/view?filename=fenjing_1_00012_.png&token=$2b$12$.MR4qGaFetN1FPQzbfyIrehsyjnPJ12xAZhR/l7KZpLkUPQTCG4gy
|
|
-
|
|
|
|
|
|
+ // 因此,直接在 ComfyUI 的物理目录上构建 nginx 静态资源访问目录
|
|
Object imagesObj = output.get("images");
|
|
Object imagesObj = output.get("images");
|
|
if (imagesObj != null) {
|
|
if (imagesObj != null) {
|
|
- JSONArray images = JSONUtil.parseArray(imagesObj);
|
|
|
|
- // [{"filename": "ComfyUI_00122_.png", "subfolder": "", "type": "output"}]
|
|
|
|
- // http://43.128.1.201:8001/api/view?filename=ComfyUI_00117_.png
|
|
|
|
- // http://43.128.1.201:8001/api/view?filename=ComfyUI_00117_.png&preview=1
|
|
|
|
- List<String> images_path = new ArrayList<>();
|
|
|
|
- if (images.size() > 0) {
|
|
|
|
- for (int i = 0; i < images.size(); i++) {
|
|
|
|
- JSONObject image = images.getJSONObject(i);
|
|
|
|
- String filename = image.getStr("filename");
|
|
|
|
-
|
|
|
|
- // ComfyUI + Nginx 输出域名转发
|
|
|
|
-// String filepath = "http://" + COMFYUI_HOST + ":" + port + "/api/view?filename=" + filename;
|
|
|
|
-// String filepath_with_token = filepath + "&token=" + COMFYUI_TOKEN;
|
|
|
|
-
|
|
|
|
- // ComfyUI 输出的临时目录
|
|
|
|
- String filepath = "http://o.daogu.ai/" + port + "/" + filename;
|
|
|
|
-
|
|
|
|
- // -- [图片转存储存桶] -------------------------------------
|
|
|
|
- SysFileResult result = sysFileService.urlToUploadFile(filepath, user_id);
|
|
|
|
- System.out.println("urlToUploadFile (result) = " + result);
|
|
|
|
-
|
|
|
|
- // -- [记录到生成图片记录表] --------------------------------
|
|
|
|
- if (params != null) {
|
|
|
|
-
|
|
|
|
- // 创建一个 CompletableFuture 来执行异步任务
|
|
|
|
- CompletableFuture.runAsync(() -> {
|
|
|
|
-
|
|
|
|
- Long drama_project_storyboard_id = Convert.toLong(params.get("drama_project_storyboard_id"));
|
|
|
|
- String prompt_id = Convert.toStr(dataChildren.get("prompt_id"));
|
|
|
|
- if (drama_project_storyboard_id != null) {
|
|
|
|
- entity.setDrama_project_storyboard_id(drama_project_storyboard_id);
|
|
|
|
- entity.setPrompt_id(prompt_id);
|
|
|
|
- entity.setUrl_origin(filepath);
|
|
|
|
- entity.setUrl(result.getUrl());
|
|
|
|
- entity.setTarget(result.getTarget());
|
|
|
|
- crtGenerateImageDao.insert(entity);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- });
|
|
|
|
-
|
|
|
|
- }
|
|
|
|
- // ------------------------------------------------------
|
|
|
|
-
|
|
|
|
- images_path.add(filepath);
|
|
|
|
-
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
|
|
+ String prompt_id = Convert.toStr(dataChildren.get("prompt_id"));
|
|
|
|
+ Long drama_project_storyboard_id = Convert.toLong(params.get("drama_project_storyboard_id"));
|
|
|
|
+ List<String> images_path = imagesToRecord(user_id, prompt_id, port, imagesObj, drama_project_storyboard_id);
|
|
output.put("images_path", images_path);
|
|
output.put("images_path", images_path);
|
|
dataChildren.put("output", output);
|
|
dataChildren.put("output", output);
|
|
data.put("data", dataChildren);
|
|
data.put("data", dataChildren);
|
|
}
|
|
}
|
|
- // ------------------------------------------------------------------
|
|
|
|
|
|
|
|
}
|
|
}
|
|
- // ======================================================================
|
|
|
|
|
|
+ // ========================================================================
|
|
|
|
|
|
sseUtil.send(user_id, new SseResponse(SseResponseEnum.COMFYUI, data).toJsonStr());
|
|
sseUtil.send(user_id, new SseResponse(SseResponseEnum.COMFYUI, data).toJsonStr());
|
|
})
|
|
})
|
|
.doOnError(e -> {
|
|
.doOnError(e -> {
|
|
- System.err.println("(doOnError) Error for " + clientId + ": " + e.getMessage());
|
|
|
|
|
|
+ System.err.println("(doOnError) Error for " + client_id + ": " + e.getMessage());
|
|
sseUtil.send(user_id, new SseResponse(SseResponseEnum.COMFYUI, e.getMessage()).toJsonStr());
|
|
sseUtil.send(user_id, new SseResponse(SseResponseEnum.COMFYUI, e.getMessage()).toJsonStr());
|
|
})
|
|
})
|
|
.doFinally(signal -> {
|
|
.doFinally(signal -> {
|
|
- System.out.println("(doFinally) Connection closed for " + clientId + ": " + signal);
|
|
|
|
|
|
+ System.out.println("(doFinally) Connection closed for " + client_id + ": " + signal);
|
|
sseUtil.send(user_id, new SseResponse(SseResponseEnum.COMFYUI, signal).toJsonStr());
|
|
sseUtil.send(user_id, new SseResponse(SseResponseEnum.COMFYUI, signal).toJsonStr());
|
|
- sessions.remove(clientId);
|
|
|
|
|
|
+ sessions.remove(client_id);
|
|
System.out.println("---------------------------------------------------------");
|
|
System.out.println("---------------------------------------------------------");
|
|
});
|
|
});
|
|
// 需要返回一个Mono<Void>来表示处理完成
|
|
// 需要返回一个Mono<Void>来表示处理完成
|
|
@@ -233,23 +180,23 @@ public class ComfyUISocketServiceImpl implements ComfyUISocketService {
|
|
|
|
|
|
/**
|
|
/**
|
|
* [ComfyUI] 断开 WebSocket 监听连接
|
|
* [ComfyUI] 断开 WebSocket 监听连接
|
|
- * @param clientId 客户端ID
|
|
|
|
|
|
+ * @param client_id 客户端ID
|
|
* @return Mono<Void> 表示断开操作
|
|
* @return Mono<Void> 表示断开操作
|
|
*/
|
|
*/
|
|
@Override
|
|
@Override
|
|
- public Mono<Void> disconnect(String clientId) {
|
|
|
|
|
|
+ public Mono<Void> disconnect(String client_id) {
|
|
return Mono.fromRunnable(() -> {
|
|
return Mono.fromRunnable(() -> {
|
|
- WebSocketSession session = sessions.get(clientId);
|
|
|
|
|
|
+ WebSocketSession session = sessions.get(client_id);
|
|
if (session != null) {
|
|
if (session != null) {
|
|
- System.out.println("disconnect success! clientId: " + clientId);
|
|
|
|
|
|
+ System.out.println("disconnect success! client_id: " + client_id);
|
|
|
|
|
|
Long user_id = SecurityUtil.getUserId();
|
|
Long user_id = SecurityUtil.getUserId();
|
|
if (user_id != null) {
|
|
if (user_id != null) {
|
|
- sseUtil.send(user_id, new SseResponse(SseResponseEnum.COMFYUI, "disconnect success! clientId: " + clientId).toJsonStr());
|
|
|
|
|
|
+ sseUtil.send(user_id, new SseResponse(SseResponseEnum.COMFYUI, "disconnect success! client_id: " + client_id).toJsonStr());
|
|
}
|
|
}
|
|
|
|
|
|
session.close().subscribe();
|
|
session.close().subscribe();
|
|
- sessions.remove(clientId);
|
|
|
|
|
|
+ sessions.remove(client_id);
|
|
}
|
|
}
|
|
});
|
|
});
|
|
}
|
|
}
|
|
@@ -262,4 +209,43 @@ public class ComfyUISocketServiceImpl implements ComfyUISocketService {
|
|
return Flux.fromIterable(sessions.keySet());
|
|
return Flux.fromIterable(sessions.keySet());
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ // 转存图片
|
|
|
|
+ private List<String> imagesToRecord(Long user_id, String prompt_id, Integer port, Object imagesObj, Long drama_project_storyboard_id) {
|
|
|
|
+ JSONArray images = JSONUtil.parseArray(imagesObj);
|
|
|
|
+ List<String> images_path = new ArrayList<>();
|
|
|
|
+ if (images.size() > 0) {
|
|
|
|
+ for (int i = 0; i < images.size(); i++) {
|
|
|
|
+ JSONObject image = images.getJSONObject(i);
|
|
|
|
+ String filename = image.getStr("filename");
|
|
|
|
+ String filepath = "http://o.daogu.ai/" + port + "/" + filename; // 生成图片临时目录
|
|
|
|
+
|
|
|
|
+ // -- [图片转存储存桶] -------------------------------------
|
|
|
|
+ SysFileResult result = sysFileService.urlToUploadFile(filepath, user_id);
|
|
|
|
+ System.out.println("urlToUploadFile (result) = " + result);
|
|
|
|
+
|
|
|
|
+ // -- [记录到生成图片记录表] --------------------------------
|
|
|
|
+ // 创建一个 CompletableFuture 来执行异步任务
|
|
|
|
+ CompletableFuture.runAsync(() -> {
|
|
|
|
+
|
|
|
|
+ if (drama_project_storyboard_id != null) {
|
|
|
|
+ CrtGenerateImage entity = new CrtGenerateImage();
|
|
|
|
+ entity.setUser_id(user_id);
|
|
|
|
+ entity.setDrama_project_storyboard_id(drama_project_storyboard_id);
|
|
|
|
+ entity.setPrompt_id(prompt_id);
|
|
|
|
+ entity.setUrl_origin(filepath);
|
|
|
|
+ entity.setUrl(result.getUrl());
|
|
|
|
+ entity.setTarget(result.getTarget());
|
|
|
|
+ crtGenerateImageDao.insert(entity);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ });
|
|
|
|
+ // ------------------------------------------------------
|
|
|
|
+
|
|
|
|
+ images_path.add(filepath);
|
|
|
|
+
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ return images_path;
|
|
|
|
+ }
|
|
|
|
+
|
|
}
|
|
}
|