Przeglądaj źródła

完成WebClient工具类封装;Dev 可灵

tsurumure 3 miesięcy temu
rodzic
commit
990daa0a68

+ 9 - 0
src/main/java/com/backendsys/exception/GlobalExceptionHandler.java

@@ -21,6 +21,7 @@ import org.springframework.core.MethodParameter;
 import org.springframework.dao.DuplicateKeyException;
 import org.springframework.data.redis.RedisConnectionFailureException;
 import org.springframework.http.MediaType;
+import org.springframework.http.ResponseEntity;
 import org.springframework.http.converter.HttpMessageConverter;
 import org.springframework.http.converter.HttpMessageNotReadableException;
 import org.springframework.http.server.ServerHttpRequest;
@@ -34,6 +35,7 @@ import org.springframework.web.bind.annotation.ExceptionHandler;
 import org.springframework.web.bind.annotation.RestControllerAdvice;
 import org.springframework.web.context.request.RequestContextHolder;
 import org.springframework.web.context.request.ServletRequestAttributes;
+import org.springframework.web.reactive.function.client.WebClientResponseException;
 import org.springframework.web.servlet.NoHandlerFoundException;
 import org.springframework.web.servlet.mvc.method.annotation.ResponseBodyAdvice;
 
@@ -405,5 +407,12 @@ public class GlobalExceptionHandler implements ResponseBodyAdvice<Object> {
 //        return Result.error(ResultEnum.SERVICE_EXCEPTION.getCode(), e.getMessage());
 //    }
 
+    @ExceptionHandler(WebClientResponseException.class)
+    public Result handleWebClientResponseException(WebClientResponseException e) {
+        System.out.println("****** WebClientResponseException.class: ******");
+        printErrorException(e);
+        return Result.error(ResultEnum.SERVICE_EXCEPTION.getCode(), resultEnumService.getMessage(ResultEnum.SERVICE_EXCEPTION), e.getMessage());
+    }
+
 }
 

+ 21 - 0
src/main/java/com/backendsys/modules/common/config/WebClientConfig.java

@@ -0,0 +1,21 @@
+//package com.backendsys.modules.common.config;
+//
+//import org.springframework.context.annotation.Bean;
+//import org.springframework.context.annotation.Configuration;
+//import org.springframework.web.reactive.function.client.WebClient;
+//
+//@Configuration
+//public class WebClientConfig {
+//
+//    @Bean
+//    public WebClient.Builder webClientBuilder() {
+//        return WebClient.builder();
+//    }
+//
+//    @Bean
+//    public WebClient webClient(WebClient.Builder builder) {
+//        return builder.build();
+//    }
+//
+//    // 可以根据需要配置多个 WebClient 实例
+//}

+ 178 - 0
src/main/java/com/backendsys/modules/common/utils/WebClientUtil.java

@@ -0,0 +1,178 @@
+package com.backendsys.modules.common.utils;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.core.io.buffer.DataBuffer;
+import org.springframework.core.io.buffer.DataBufferUtils;
+import org.springframework.core.io.buffer.DefaultDataBufferFactory;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.MediaType;
+import org.springframework.stereotype.Component;
+import org.springframework.util.MultiValueMap;
+import org.springframework.web.reactive.function.client.ClientResponse;
+import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
+import org.springframework.web.reactive.function.client.ExchangeStrategies;
+import org.springframework.web.reactive.function.client.WebClient;
+import org.springframework.web.util.UriComponentsBuilder;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+
+public class WebClientUtil {
+
+    // 请求/响应拦截器
+    static ExchangeFilterFunction logFilter = (request, next) -> {
+        System.out.println("[logRequestFilter] Request: " + request.method() + " " + request.url());
+        return next.exchange(request).flatMap(response -> {
+
+//            System.out.println("[logRequestFilter] Response status code: " + response.statusCode());
+//            // 记录响应体内容,但不直接订阅
+//            return response.bodyToMono(String.class)
+//                .doOnNext(body -> System.out.println("[logRequestFilter] Response body: " + body))
+//                .thenReturn(response);
+
+            // 读取并缓存整个响应体
+            return DataBufferUtils.join(response.bodyToFlux(DataBuffer.class))
+                .flatMap(dataBuffer -> {
+                    // 提取字节数组并释放缓冲区
+                    byte[] bodyBytes = new byte[dataBuffer.readableByteCount()];
+                    dataBuffer.read(bodyBytes);
+                    DataBufferUtils.release(dataBuffer);
+
+                    // 记录响应体
+                    String bodyStr = new String(bodyBytes, StandardCharsets.UTF_8);
+                    System.out.println("[logRequestFilter] Response body: " + bodyStr);
+
+                    // 重新构建完整的响应
+                    return Mono.just(ClientResponse.create(response.statusCode())
+                        .headers(headers -> headers.addAll(response.headers().asHttpHeaders()))
+                        .body(Flux.just(DefaultDataBufferFactory.sharedInstance.wrap(bodyBytes)))
+                        .build());
+                });
+
+        });
+
+    };
+
+    // 实例化 WebClient
+//    private static final WebClient webClient = WebClient.builder().build();
+    private static final WebClient webClient = WebClient.builder()
+            .exchangeStrategies(ExchangeStrategies.builder()
+                .codecs(configurer -> configurer.defaultCodecs().enableLoggingRequestDetails(true))
+                .build())
+            .filter(logFilter)  // 确保 logFilter 在缓冲后使用
+            .build();
+
+    // 默认的响应类型
+    private static final Class<Object> DEFAULT_RESPONSE_TYPE = Object.class;
+
+    // [Get] 公共逻辑 (参数示例)
+    // - 自定义头部: Consumer<HttpHeaders> httpHeaders = (headers) -> { headers.add("Authorization", "Bearer " + getToken()); };
+    // - 自定义返回类型: Class<T> responseType = JSONObject.class;
+    private static <T> Mono<T> doGetInternal(String url, MultiValueMap<String, String> params, Consumer<HttpHeaders> httpHeaders, Class<T> responseType) {
+        String uri = UriComponentsBuilder.fromUriString(url).queryParams(params).toUriString();
+        return webClient.get().uri(uri).headers(httpHeaders).accept(MediaType.APPLICATION_JSON).retrieve().bodyToMono(responseType);
+    }
+
+    /**
+     * 同步调用示例:
+     * Mono<JSONObject> responseMono = WebClientUtil.doGet(url);
+     * Mono<JSONObject> responseMono = WebClientUtil.doGet(url, JSONObject.class);
+     * Mono<JSONObject> responseMono = WebClientUtil.doGet(url, params, JSONObject.class);
+     * Mono<JSONObject> responseMono = WebClientUtil.doGet(config.URL + "/v1/images/generations", params, kLingUtil.getHttpHeaders(), JSONObject.class);
+     * return JSONUtil.parseObj(responseMono.block());
+     */
+    // 无参数、无自定义头、无响应类型
+    public static Mono<Object> doGet(String url) {
+        return doGetInternal(url, null, null, DEFAULT_RESPONSE_TYPE);
+    }
+    // 无参数、无自定义头
+    public static <T> Mono<T> doGet(String url, Class<T> responseType) {
+        return doGetInternal(url, null, null, responseType);
+    }
+    // 无参数、有自定义头、无响应类型
+    public static Mono<Object> doGet(String url, Consumer<HttpHeaders> httpHeaders) {
+        return doGetInternal(url, null, httpHeaders, DEFAULT_RESPONSE_TYPE);
+    }
+    // 无参数、有自定义头、有响应类型
+    public static <T> Mono<T> doGet(String url, Consumer<HttpHeaders> httpHeaders, Class<T> responseType) {
+        return doGetInternal(url, null, httpHeaders, responseType);
+    }
+    // 有参数、无自定义头、无响应类型
+    public static Mono<Object> doGet(String url, MultiValueMap<String, String> params) {
+        return doGetInternal(url, params, headers -> {}, DEFAULT_RESPONSE_TYPE);
+    }
+    // 有参数、无自定义头、有响应类型
+    public static <T> Mono<T> doGet(String url, MultiValueMap<String, String> params, Class<T> responseType) {
+        return doGetInternal(url, params, headers -> {}, responseType);
+    }
+    // 有参数、有自定义头、无响应类型
+    public static Mono<Object> doGet(String url, MultiValueMap<String, String> params, Consumer<HttpHeaders> httpHeaders) {
+        return doGetInternal(url, params, httpHeaders, DEFAULT_RESPONSE_TYPE);
+    }
+    // 有参数、有自定义头、有响应类型
+    public static <T> Mono<T> doGet(String url, MultiValueMap<String, String> params, Consumer<HttpHeaders> httpHeaders, Class<T> responseType) {
+        return doGetInternal(url, params, httpHeaders, responseType);
+    }
+
+
+    /**
+     * 异步调用示例:
+     * Mono<JSONObject> responseMono = WebClientUtil.doGet(url, JSONObject.class);
+     * Mono<JSONObject> responseMono = WebClientUtil.doGet(url, params, JSONObject.class);
+     * responseMono.subscribe(
+     * 		response -> System.out.println("Response: " + response),
+     * 		e -> e.printStackTrace()
+     * );
+     */
+    public static <T> CompletableFuture<T> doGetAsync(String url, Class<T> responseType) {
+        return webClient.get()
+            .uri(url)
+            .accept(MediaType.APPLICATION_JSON)
+            .retrieve()
+            .bodyToMono(responseType)
+            .toFuture();
+    }
+    public static <T> CompletableFuture<T> doGetAsync(String url, MultiValueMap<String, String> params, Class<T> responseType) {
+        String uri = UriComponentsBuilder.fromUriString(url).queryParams(params).toUriString();
+        return webClient.get()
+            .uri(uri)
+            .accept(MediaType.APPLICATION_JSON)
+            .retrieve()
+            .bodyToMono(responseType)
+            .toFuture();
+    }
+
+    public static <T> Mono<T> doPost(String url, Object request, Class<T> responseType) {
+        return webClient.post()
+            .uri(url)
+            .contentType(MediaType.APPLICATION_JSON)
+            .accept(MediaType.APPLICATION_JSON)
+            .bodyValue(request)
+            .retrieve()
+            .bodyToMono(responseType);
+    }
+
+    public static <T> Mono<T> doPut(String url, Object request, Class<T> responseType) {
+        return webClient.put()
+            .uri(url)
+            .contentType(MediaType.APPLICATION_JSON)
+            .accept(MediaType.APPLICATION_JSON)
+            .bodyValue(request)
+            .retrieve()
+            .bodyToMono(responseType);
+    }
+
+    public static Mono<Void> doDelete(String url) {
+        return webClient.delete()
+            .uri(url)
+            .retrieve()
+            .bodyToMono(Void.class);
+    }
+
+}

+ 29 - 1
src/main/java/com/backendsys/modules/sdk/klingai/controller/KLingDemoController.java

@@ -1,20 +1,48 @@
 package com.backendsys.modules.sdk.klingai.controller;
 
+import cn.hutool.json.JSONUtil;
 import com.backendsys.modules.common.config.security.annotations.Anonymous;
+import com.backendsys.modules.common.utils.Result;
+import com.backendsys.modules.sdk.klingai.entity.KLResponse;
 import com.backendsys.modules.sdk.klingai.service.KLingService;
+import com.backendsys.modules.sdk.klingai.utils.KLingUtil;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.web.bind.annotation.GetMapping;
 import org.springframework.web.bind.annotation.RestController;
+import reactor.core.publisher.Mono;
 
 @RestController
 public class KLingDemoController {
 
+    @Autowired
+    private KLingUtil kLingUtil;
     @Autowired
     private KLingService klingService;
 
     @GetMapping("/api/klingai/getToken")
     public String getToken() {
-        return klingService.getToken();
+        return kLingUtil.getToken();
+    }
+
+    /**
+     * 【图像生成】查询任务(单个)(同步)
+     */
+    @GetMapping("/api/klingai/queryImageTask")
+    public Result queryImageTask(String task_id) {
+        Mono<KLResponse> responseMono = klingService.queryImageTask(task_id);
+        return Result.success().put("data", responseMono.block());
+    }
+    /**
+     * 【图像生成】查询任务(单个)(异步)
+     */
+    @GetMapping("/api/klingai/queryImageTaskAsync")
+    public Result queryImageTaskAsync(String task_id) {
+        Mono<KLResponse> responseMono = klingService.queryImageTask(task_id);
+        responseMono.subscribe(response -> {
+            System.out.println("异步结果: " + response);
+            System.out.println("异步结果 (data): " + response.getData());
+        });
+        return Result.success();
     }
 
 }

+ 11 - 0
src/main/java/com/backendsys/modules/sdk/klingai/entity/KLResponse.java

@@ -0,0 +1,11 @@
+package com.backendsys.modules.sdk.klingai.entity;
+
+import lombok.Data;
+
+@Data
+public class KLResponse {
+    private Integer code;
+    private String message;
+    private String request_id;
+    private Object data;
+}

+ 4 - 5
src/main/java/com/backendsys/modules/sdk/klingai/service/KLingService.java

@@ -1,15 +1,14 @@
 package com.backendsys.modules.sdk.klingai.service;
 
 import cn.hutool.json.JSONObject;
+import com.backendsys.modules.sdk.klingai.entity.KLResponse;
+import reactor.core.publisher.Mono;
 
 public interface KLingService {
 
-    // 获取接口鉴权Token
-    String getToken();
-
     // 【图像生成】创建任务
-    JSONObject generationImage();
+    Mono<JSONObject> generationImage();
 
     // 【图像生成】查询任务(单个)
-    JSONObject queryImage(String taskId);
+    Mono<KLResponse> queryImageTask(String taskId);
 }

+ 17 - 63
src/main/java/com/backendsys/modules/sdk/klingai/service/impl/KLingServiceImpl.java

@@ -1,92 +1,46 @@
 package com.backendsys.modules.sdk.klingai.service.impl;
 
-import cn.hutool.core.convert.Convert;
 import cn.hutool.json.JSONObject;
-import com.auth0.jwt.JWT;
-import com.auth0.jwt.algorithms.Algorithm;
-import com.backendsys.modules.common.config.redis.utils.RedisUtil;
+import com.backendsys.modules.common.utils.WebClientUtil;
 import com.backendsys.modules.sdk.klingai.config.KLingConfig;
+import com.backendsys.modules.sdk.klingai.entity.KLResponse;
 import com.backendsys.modules.sdk.klingai.service.KLingService;
 
-import okhttp3.OkHttpClient;
+import com.backendsys.modules.sdk.klingai.utils.KLingUtil;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
-
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
+import org.springframework.util.LinkedMultiValueMap;
+import org.springframework.util.MultiValueMap;
+import reactor.core.publisher.Mono;
 
 @Service
 public class KLingServiceImpl implements KLingService {
 
     @Autowired
     private KLingConfig config;
-    @Autowired
-    private RedisUtil redisUtil;
-
-
-    private final OkHttpClient client;
-    public KLingServiceImpl() {
-        client = new OkHttpClient();
-    }
-
-    /**
-     * 接口鉴权:https://docs.qingque.cn/d/home/eZQAyImcbaS0fz-8ANjXvU5ed?identityId=1oEG9JKKMFv#section=h.9mwdken9otn8
-     */
-    public String sign() {
-        try {
-            Date expiredAt = new Date(System.currentTimeMillis() + 1800*1000); // 有效时间,此处示例代表当前时间+1800s(30min)
-            Date notBefore = new Date(System.currentTimeMillis() - 5*1000); //开始生效的时间,此处示例代表当前时间-5秒
-            Algorithm algo = Algorithm.HMAC256(config.SECRET_KEY);
-            Map<String, Object> header = new HashMap<>();
-            header.put("alg", "HS256");
-            return JWT.create()
-                .withIssuer(config.ACCESS_KEY)
-                .withHeader(header)
-                .withExpiresAt(expiredAt)
-                .withNotBefore(notBefore)
-                .sign(algo);
-        } catch (Exception e) {
-            e.printStackTrace();
-            return null;
-        }
-    }
-
-    /**
-     * 获取接口鉴权Token (增加缓存:10秒)
-     */
-    @Override
-    public String getToken() {
-        String token = "";
-        String cacheKey = "klingai:token";
-        String cacheValue = redisUtil.getCacheObject(cacheKey);
-        if (cacheValue == null) {
-            token = sign();
-            if (token != null) {
-                Integer timeout = Convert.toInt(config.TOKEN_DURATION_TIME);
-                redisUtil.setCacheObject(cacheKey, token, timeout, TimeUnit.MILLISECONDS);
-            }
-        } else {
-            token = cacheValue;
-        }
-        return token;
-    }
 
+    @Autowired
+    private KLingUtil kLingUtil;
 
     /**
      * 【图像生成】创建任务
      */
     @Override
-    public JSONObject generationImage() {
+    public Mono<JSONObject> generationImage() {
         return null;
     }
 
     /**
      * 【图像生成】查询任务(单个)
+     * Mono<Object> responseMono = klingService.queryImageTask(task_id);
+     * responseMono.block();                                                // 同步发起订阅
+     * responseMono.subscribe(response -> System.out.println(response));    // 异步发起订阅
+     * responseMono.subscribe().dispose();                                  // 中断/取消订阅
      */
-    public JSONObject queryImage(String taskId) {
-        return null;
+    public Mono<KLResponse> queryImageTask(String task_id) {
+        MultiValueMap<String, String> params = new LinkedMultiValueMap<>();
+        params.add("task_id", task_id);
+        return WebClientUtil.doGet(config.URL + "/v1/images/generations", params, kLingUtil.getHttpHeaders(), KLResponse.class);
     }
 
 

+ 76 - 0
src/main/java/com/backendsys/modules/sdk/klingai/utils/KLingUtil.java

@@ -0,0 +1,76 @@
+package com.backendsys.modules.sdk.klingai.utils;
+
+import cn.hutool.core.convert.Convert;
+import com.auth0.jwt.JWT;
+import com.auth0.jwt.algorithms.Algorithm;
+import com.backendsys.modules.common.config.redis.utils.RedisUtil;
+import com.backendsys.modules.sdk.klingai.config.KLingConfig;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.HttpHeaders;
+import org.springframework.stereotype.Component;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
+@Component
+public class KLingUtil {
+
+    @Autowired
+    private KLingConfig config;
+    @Autowired
+    private RedisUtil redisUtil;
+
+    /**
+     * 接口鉴权:https://docs.qingque.cn/d/home/eZQAyImcbaS0fz-8ANjXvU5ed?identityId=1oEG9JKKMFv#section=h.9mwdken9otn8
+     */
+    public String sign() {
+        try {
+            Date expiredAt = new Date(System.currentTimeMillis() + 1800*1000); // 有效时间,此处示例代表当前时间+1800s(30min)
+            Date notBefore = new Date(System.currentTimeMillis() - 5*1000); //开始生效的时间,此处示例代表当前时间-5秒
+            Algorithm algo = Algorithm.HMAC256(config.SECRET_KEY);
+            Map<String, Object> header = new HashMap<>();
+            header.put("alg", "HS256");
+            return JWT.create()
+                .withIssuer(config.ACCESS_KEY)
+                .withHeader(header)
+                .withExpiresAt(expiredAt)
+                .withNotBefore(notBefore)
+                .sign(algo);
+        } catch (Exception e) {
+            e.printStackTrace();
+            return null;
+        }
+    }
+
+    /**
+     * 获取接口鉴权Token (增加缓存:10秒)
+     */
+    public String getToken() {
+        String token = "";
+        String cacheKey = "klingai:token";
+        String cacheValue = redisUtil.getCacheObject(cacheKey);
+        if (cacheValue == null) {
+            token = sign();
+            if (token != null) {
+                Integer timeout = Convert.toInt(config.TOKEN_DURATION_TIME);
+                redisUtil.setCacheObject(cacheKey, token, timeout, TimeUnit.MILLISECONDS);
+            }
+        } else {
+            token = cacheValue;
+        }
+        return token;
+    }
+
+    /**
+     * 获取 Token 拼接好的 HttpHeaders
+     */
+    public Consumer<HttpHeaders> getHttpHeaders() {
+        Consumer<HttpHeaders> httpHeaders = (headers) -> {
+            headers.add("Authorization", "Bearer " + getToken());
+        };
+        return httpHeaders;
+    }
+}