|
@@ -1,121 +1,65 @@
|
|
package com.backendsys.modules.sdk.comfyui.service.impl;
|
|
package com.backendsys.modules.sdk.comfyui.service.impl;
|
|
|
|
|
|
|
|
+import cn.hutool.core.convert.Convert;
|
|
|
|
+import com.backendsys.modules.common.Filter.WebClientFilter;
|
|
|
|
+import com.backendsys.modules.sdk.comfyui.entity.CFPromptResponse;
|
|
import com.backendsys.modules.sdk.comfyui.service.ComfyUIService;
|
|
import com.backendsys.modules.sdk.comfyui.service.ComfyUIService;
|
|
-import com.tencentcloudapi.tione.v20211111.models.ChatCompletionResponse;
|
|
|
|
-import org.apache.http.client.methods.HttpPost;
|
|
|
|
-import org.apache.http.impl.client.CloseableHttpClient;
|
|
|
|
-import org.apache.http.impl.client.HttpClients;
|
|
|
|
import org.springframework.beans.factory.annotation.Value;
|
|
import org.springframework.beans.factory.annotation.Value;
|
|
|
|
+import org.springframework.http.MediaType;
|
|
import org.springframework.stereotype.Service;
|
|
import org.springframework.stereotype.Service;
|
|
-import org.springframework.web.reactive.socket.WebSocketMessage;
|
|
|
|
-import org.springframework.web.reactive.socket.WebSocketSession;
|
|
|
|
-import org.springframework.web.reactive.socket.client.ReactorNettyWebSocketClient;
|
|
|
|
-import org.springframework.web.reactive.socket.client.WebSocketClient;
|
|
|
|
-import reactor.core.publisher.Flux;
|
|
|
|
-import reactor.core.publisher.FluxSink;
|
|
|
|
|
|
+import org.springframework.util.LinkedMultiValueMap;
|
|
|
|
+import org.springframework.util.MultiValueMap;
|
|
|
|
+import org.springframework.web.reactive.function.client.WebClient;
|
|
|
|
+import org.springframework.web.util.UriComponentsBuilder;
|
|
import reactor.core.publisher.Mono;
|
|
import reactor.core.publisher.Mono;
|
|
-import reactor.netty.http.client.HttpClient;
|
|
|
|
-import reactor.util.retry.Retry;
|
|
|
|
|
|
|
|
-import java.net.URI;
|
|
|
|
-import java.nio.ByteBuffer;
|
|
|
|
-import java.nio.charset.StandardCharsets;
|
|
|
|
-import java.time.Duration;
|
|
|
|
-import java.util.Base64;
|
|
|
|
-import java.util.Map;
|
|
|
|
-import java.util.concurrent.ConcurrentHashMap;
|
|
|
|
|
|
+import java.util.UUID;
|
|
|
|
|
|
@Service
|
|
@Service
|
|
public class ComfyUIServiceImpl implements ComfyUIService {
|
|
public class ComfyUIServiceImpl implements ComfyUIService {
|
|
|
|
|
|
- @Value("${comfyui.token}")
|
|
|
|
- private String COMFYUI_TOKEN = "";
|
|
|
|
-
|
|
|
|
-// // 单例 WebSocketClient(线程安全)
|
|
|
|
-// private WebSocketClient webSocketClient;
|
|
|
|
-
|
|
|
|
- // 管理多个连接
|
|
|
|
- private final Map<String, WebSocketSession> sessions = new ConcurrentHashMap<>();
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * 创建带有认证头的 WebSocketClient
|
|
|
|
- * @param token 认证令牌
|
|
|
|
- * @return 配置好的 WebSocketClient
|
|
|
|
- */
|
|
|
|
- private WebSocketClient createWebSocketClientWithToken(String token) {
|
|
|
|
- HttpClient httpClient = HttpClient.create()
|
|
|
|
- .headers(headers -> headers.add("Authorization", "Bearer " + token))
|
|
|
|
- .responseTimeout(Duration.ofSeconds(30)); // 30秒超时
|
|
|
|
- return new ReactorNettyWebSocketClient(httpClient);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * [ComfyUI] 创建 WebSocket 监听连接
|
|
|
|
- * @param clientId 客户端ID(用于标识连接)
|
|
|
|
- * @param wsUrl WebSocket 地址
|
|
|
|
- * @return Mono<Void> 表示连接操作
|
|
|
|
- */
|
|
|
|
- @Override
|
|
|
|
- public Mono<Void> connect(String clientId, String wsUrl) {
|
|
|
|
-
|
|
|
|
- return Mono.defer(() -> {
|
|
|
|
- if (sessions.containsKey(clientId)) {
|
|
|
|
- return Mono.error(new IllegalStateException("Connection already exists for client: " + clientId));
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- // 动态创建带有认证头的客户端
|
|
|
|
- WebSocketClient clientWithAuth = createWebSocketClientWithToken(COMFYUI_TOKEN);
|
|
|
|
- return clientWithAuth.execute(URI.create(wsUrl + "?clientId=" + clientId), session -> {
|
|
|
|
- // 保存会话
|
|
|
|
- sessions.put(clientId, session);
|
|
|
|
-
|
|
|
|
- // 接收消息
|
|
|
|
- Flux<String> incomingMessages = session.receive()
|
|
|
|
- .map(WebSocketMessage::getPayloadAsText)
|
|
|
|
- .doOnNext(message -> {
|
|
|
|
-
|
|
|
|
- System.out.println("(doOnNext) Received from " + clientId + ": " + message);
|
|
|
|
-// // 转发到消息总线
|
|
|
|
-// messageSink.tryEmitNext(message);
|
|
|
|
- })
|
|
|
|
- .doOnError(e -> {
|
|
|
|
- System.err.println("(doOnError) Error for " + clientId + ": " + e.getMessage());
|
|
|
|
- })
|
|
|
|
- .doFinally(signal -> {
|
|
|
|
- System.out.println("(doFinally) Connection closed for " + clientId + ": " + signal);
|
|
|
|
- sessions.remove(clientId);
|
|
|
|
- });
|
|
|
|
-
|
|
|
|
- // 需要返回一个Mono<Void>来表示处理完成
|
|
|
|
- return incomingMessages.then();
|
|
|
|
- });
|
|
|
|
- });
|
|
|
|
|
|
|
|
|
|
+ @Value("${comfyui.token}")
|
|
|
|
+ private String COMFYUI_TOKEN;
|
|
|
|
+ private final String BASE_URL = "http://43.128.1.201:8007";
|
|
|
|
+
|
|
|
|
+ private WebClient webClient;
|
|
|
|
+ public WebClient getWebClient() {
|
|
|
|
+ if (webClient == null) {
|
|
|
|
+ webClient = WebClient.builder().baseUrl(BASE_URL).filter(WebClientFilter.logFilter).build();
|
|
|
|
+ }
|
|
|
|
+ return webClient;
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
- * [ComfyUI] 断开 WebSocket 监听连接
|
|
|
|
- * @param clientId 客户端ID
|
|
|
|
- * @return Mono<Void> 表示断开操作
|
|
|
|
|
|
+ * [ComfyUI] 执行任务
|
|
*/
|
|
*/
|
|
@Override
|
|
@Override
|
|
- public Mono<Void> disconnect(String clientId) {
|
|
|
|
- return Mono.fromRunnable(() -> {
|
|
|
|
- WebSocketSession session = sessions.get(clientId);
|
|
|
|
- if (session != null) {
|
|
|
|
- System.out.println("disconnect success! clientId: " + clientId);
|
|
|
|
- session.close().subscribe();
|
|
|
|
- sessions.remove(clientId);
|
|
|
|
- }
|
|
|
|
- });
|
|
|
|
- }
|
|
|
|
|
|
+ public Mono<CFPromptResponse> prompt(String prompt) {
|
|
|
|
+
|
|
|
|
+ String client_id = Convert.toStr(UUID.randomUUID());
|
|
|
|
+
|
|
|
|
+ MultiValueMap<String, String> params = new LinkedMultiValueMap<>();
|
|
|
|
+ params.add("client_id", client_id); // Unix时间戳、单位ms
|
|
|
|
+ params.add("prompt", prompt);
|
|
|
|
+
|
|
|
|
+ System.out.println("params = " + params);
|
|
|
|
+
|
|
|
|
+ String url = "/prompt?token=" + COMFYUI_TOKEN;
|
|
|
|
+ String uri = UriComponentsBuilder.fromUriString(url).toUriString();
|
|
|
|
+ WebClient webClient = getWebClient();
|
|
|
|
+ return webClient.get()
|
|
|
|
+ .uri(uri)
|
|
|
|
+ .accept(MediaType.APPLICATION_JSON)
|
|
|
|
+ .exchangeToMono(response -> {
|
|
|
|
+ return response.bodyToMono(CFPromptResponse.class);
|
|
|
|
+// if (response.statusCode().is2xxSuccessful()) {
|
|
|
|
+// return response.bodyToMono(CFPromptResponse.class); // 成功响应
|
|
|
|
+// } else {
|
|
|
|
+// return response.bodyToMono(CFPromptResponse.class).map(e -> KLingUtil.mapErrorResponse(e));
|
|
|
|
+// }
|
|
|
|
+ });
|
|
|
|
|
|
- /**
|
|
|
|
- * 获取所有活动的连接ID
|
|
|
|
- * @return 连接ID集合
|
|
|
|
- */
|
|
|
|
- public Flux<String> getActiveConnections() {
|
|
|
|
- return Flux.fromIterable(sessions.keySet());
|
|
|
|
}
|
|
}
|
|
|
|
|
|
}
|
|
}
|