|
@@ -1,26 +1,34 @@
|
|
|
package com.backendsys.modules.common.utils;
|
|
|
|
|
|
+import org.reactivestreams.Publisher;
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
import org.springframework.core.io.buffer.DataBuffer;
|
|
|
import org.springframework.core.io.buffer.DataBufferUtils;
|
|
|
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
|
|
|
import org.springframework.http.HttpHeaders;
|
|
|
+import org.springframework.http.HttpMethod;
|
|
|
import org.springframework.http.MediaType;
|
|
|
import org.springframework.stereotype.Component;
|
|
|
import org.springframework.util.MultiValueMap;
|
|
|
-import org.springframework.web.reactive.function.client.ClientResponse;
|
|
|
-import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
|
|
|
-import org.springframework.web.reactive.function.client.ExchangeStrategies;
|
|
|
-import org.springframework.web.reactive.function.client.WebClient;
|
|
|
+import org.springframework.web.reactive.function.BodyInserters;
|
|
|
+import org.springframework.web.reactive.function.client.*;
|
|
|
import org.springframework.web.util.UriComponentsBuilder;
|
|
|
import reactor.core.publisher.Flux;
|
|
|
import reactor.core.publisher.Mono;
|
|
|
|
|
|
+import java.io.IOException;
|
|
|
+import java.net.URI;
|
|
|
+import java.net.URLDecoder;
|
|
|
import java.nio.ByteBuffer;
|
|
|
import java.nio.charset.StandardCharsets;
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.LinkedHashMap;
|
|
|
+import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
import java.util.concurrent.CompletableFuture;
|
|
|
import java.util.function.Consumer;
|
|
|
import java.util.function.Function;
|
|
|
+import java.util.stream.Collectors;
|
|
|
|
|
|
|
|
|
public class WebClientUtil {
|
|
@@ -28,13 +36,38 @@ public class WebClientUtil {
|
|
|
// 请求/响应拦截器
|
|
|
static ExchangeFilterFunction logFilter = (request, next) -> {
|
|
|
System.out.println("[logRequestFilter] Request: " + request.method() + " " + request.url());
|
|
|
- return next.exchange(request).flatMap(response -> {
|
|
|
|
|
|
-// System.out.println("[logRequestFilter] Response status code: " + response.statusCode());
|
|
|
-// // 记录响应体内容,但不直接订阅
|
|
|
-// return response.bodyToMono(String.class)
|
|
|
-// .doOnNext(body -> System.out.println("[logRequestFilter] Response body: " + body))
|
|
|
-// .thenReturn(response);
|
|
|
+ // 为什么不能更简单?根本原因是响应式流的 单次消费原则
|
|
|
+ // 在响应式编程中处理请求体内容相对复杂,因为请求体是一个只能被消费一次的流
|
|
|
+ // 所以,这里不做参数拦截
|
|
|
+
|
|
|
+
|
|
|
+// // 获取请求方法
|
|
|
+// HttpMethod method = request.method();
|
|
|
+
|
|
|
+// // 对于 GET 请求,记录 URL 参数
|
|
|
+// if (method.equals(HttpMethod.GET)) {
|
|
|
+// String query = request.url().getQuery();
|
|
|
+// if (query != null) {
|
|
|
+// Map<String, List<String>> queryParams = new LinkedHashMap<>();
|
|
|
+// for (String param : query.split("&")) {
|
|
|
+// String[] pair = param.split("=");
|
|
|
+// String key = URLDecoder.decode(pair[0], StandardCharsets.UTF_8);
|
|
|
+// String value = pair.length > 1 ? URLDecoder.decode(pair[1], StandardCharsets.UTF_8) : "";
|
|
|
+// queryParams.computeIfAbsent(key, k -> new ArrayList<>()).add(value);
|
|
|
+// }
|
|
|
+// System.out.println("[logRequestFilter] Request Url Params: " + queryParams);
|
|
|
+// }
|
|
|
+// }
|
|
|
+
|
|
|
+// // 对于 POST、PUT、DELETE 请求,记录 JSON 正文
|
|
|
+// if (method.equals(HttpMethod.POST) || method.equals(HttpMethod.PUT) || method.equals(HttpMethod.DELETE)) {
|
|
|
+// request.bodyToMono(String.class)
|
|
|
+// .doOnNext(body -> System.out.println("[Request Body] " + body))
|
|
|
+// .subscribe(); // 重要:单独订阅
|
|
|
+// }
|
|
|
+
|
|
|
+ return next.exchange(request).flatMap(response -> {
|
|
|
|
|
|
// 读取并缓存整个响应体
|
|
|
return DataBufferUtils.join(response.bodyToFlux(DataBuffer.class))
|
|
@@ -46,7 +79,7 @@ public class WebClientUtil {
|
|
|
|
|
|
// 记录响应体
|
|
|
String bodyStr = new String(bodyBytes, StandardCharsets.UTF_8);
|
|
|
- System.out.println("[logRequestFilter] Response body: " + bodyStr);
|
|
|
+ System.out.println("[logRequestFilter] Response Body: " + bodyStr);
|
|
|
|
|
|
// 重新构建完整的响应
|
|
|
return Mono.just(ClientResponse.create(response.statusCode())
|
|
@@ -60,11 +93,7 @@ public class WebClientUtil {
|
|
|
};
|
|
|
|
|
|
// 实例化 WebClient
|
|
|
-// private static final WebClient webClient = WebClient.builder().build();
|
|
|
private static final WebClient webClient = WebClient.builder()
|
|
|
- .exchangeStrategies(ExchangeStrategies.builder()
|
|
|
- .codecs(configurer -> configurer.defaultCodecs().enableLoggingRequestDetails(true))
|
|
|
- .build())
|
|
|
.filter(logFilter) // 确保 logFilter 在缓冲后使用
|
|
|
.build();
|
|
|
|
|
@@ -78,15 +107,21 @@ public class WebClientUtil {
|
|
|
String uri = UriComponentsBuilder.fromUriString(url).queryParams(params).toUriString();
|
|
|
return webClient.get().uri(uri).headers(httpHeaders).accept(MediaType.APPLICATION_JSON).retrieve().bodyToMono(responseType);
|
|
|
}
|
|
|
+ private static <T> Mono<T> doPostInternal(String url, MultiValueMap<String, String> requestBody, Consumer<HttpHeaders> httpHeaders, Class<T> responseType) {
|
|
|
+ return webClient.post().uri(url).headers(httpHeaders).accept(MediaType.APPLICATION_JSON).bodyValue(requestBody).retrieve().bodyToMono(responseType);
|
|
|
+ }
|
|
|
|
|
|
/**
|
|
|
- * 同步调用示例:
|
|
|
* Mono<JSONObject> responseMono = WebClientUtil.doGet(url);
|
|
|
* Mono<JSONObject> responseMono = WebClientUtil.doGet(url, JSONObject.class);
|
|
|
* Mono<JSONObject> responseMono = WebClientUtil.doGet(url, params, JSONObject.class);
|
|
|
* Mono<JSONObject> responseMono = WebClientUtil.doGet(config.URL + "/v1/images/generations", params, kLingUtil.getHttpHeaders(), JSONObject.class);
|
|
|
+ * 同步调用示例:
|
|
|
* return JSONUtil.parseObj(responseMono.block());
|
|
|
+ * 异步调用示例:
|
|
|
+ * responseMono.subscribe(response -> System.out.println("Response: " + response));
|
|
|
*/
|
|
|
+
|
|
|
// 无参数、无自定义头、无响应类型
|
|
|
public static Mono<Object> doGet(String url) {
|
|
|
return doGetInternal(url, null, null, DEFAULT_RESPONSE_TYPE);
|
|
@@ -105,11 +140,11 @@ public class WebClientUtil {
|
|
|
}
|
|
|
// 有参数、无自定义头、无响应类型
|
|
|
public static Mono<Object> doGet(String url, MultiValueMap<String, String> params) {
|
|
|
- return doGetInternal(url, params, headers -> {}, DEFAULT_RESPONSE_TYPE);
|
|
|
+ return doGetInternal(url, params, null, DEFAULT_RESPONSE_TYPE);
|
|
|
}
|
|
|
// 有参数、无自定义头、有响应类型
|
|
|
public static <T> Mono<T> doGet(String url, MultiValueMap<String, String> params, Class<T> responseType) {
|
|
|
- return doGetInternal(url, params, headers -> {}, responseType);
|
|
|
+ return doGetInternal(url, params, null, responseType);
|
|
|
}
|
|
|
// 有参数、有自定义头、无响应类型
|
|
|
public static Mono<Object> doGet(String url, MultiValueMap<String, String> params, Consumer<HttpHeaders> httpHeaders) {
|
|
@@ -121,43 +156,54 @@ public class WebClientUtil {
|
|
|
}
|
|
|
|
|
|
|
|
|
- /**
|
|
|
- * 异步调用示例:
|
|
|
- * Mono<JSONObject> responseMono = WebClientUtil.doGet(url, JSONObject.class);
|
|
|
- * Mono<JSONObject> responseMono = WebClientUtil.doGet(url, params, JSONObject.class);
|
|
|
- * responseMono.subscribe(
|
|
|
- * response -> System.out.println("Response: " + response),
|
|
|
- * e -> e.printStackTrace()
|
|
|
- * );
|
|
|
- */
|
|
|
- public static <T> CompletableFuture<T> doGetAsync(String url, Class<T> responseType) {
|
|
|
- return webClient.get()
|
|
|
- .uri(url)
|
|
|
- .accept(MediaType.APPLICATION_JSON)
|
|
|
- .retrieve()
|
|
|
- .bodyToMono(responseType)
|
|
|
- .toFuture();
|
|
|
+
|
|
|
+ // 无参数、无自定义头、无响应类型
|
|
|
+ public static Mono<Object> doPost(String url) {
|
|
|
+ return doPostInternal(url, null, null, Object.class);
|
|
|
}
|
|
|
- public static <T> CompletableFuture<T> doGetAsync(String url, MultiValueMap<String, String> params, Class<T> responseType) {
|
|
|
- String uri = UriComponentsBuilder.fromUriString(url).queryParams(params).toUriString();
|
|
|
- return webClient.get()
|
|
|
- .uri(uri)
|
|
|
- .accept(MediaType.APPLICATION_JSON)
|
|
|
- .retrieve()
|
|
|
- .bodyToMono(responseType)
|
|
|
- .toFuture();
|
|
|
+
|
|
|
+ // 无参数、无自定义头、有响应类型
|
|
|
+ public static <T> Mono<T> doPost(String url, Class<T> responseType) {
|
|
|
+ return doPostInternal(url, null, null, responseType);
|
|
|
}
|
|
|
|
|
|
- public static <T> Mono<T> doPost(String url, Object request, Class<T> responseType) {
|
|
|
- return webClient.post()
|
|
|
- .uri(url)
|
|
|
- .contentType(MediaType.APPLICATION_JSON)
|
|
|
- .accept(MediaType.APPLICATION_JSON)
|
|
|
- .bodyValue(request)
|
|
|
- .retrieve()
|
|
|
- .bodyToMono(responseType);
|
|
|
+ // 无参数、有自定义头、无响应类型
|
|
|
+ public static Mono<Object> doPost(String url, Consumer<HttpHeaders> httpHeaders) {
|
|
|
+ return doPostInternal(url, null, httpHeaders, Object.class);
|
|
|
}
|
|
|
|
|
|
+ // 无参数、有自定义头、有响应类型
|
|
|
+ public static <T> Mono<T> doPost(String url, Consumer<HttpHeaders> httpHeaders, Class<T> responseType) {
|
|
|
+ return doPostInternal(url, null, httpHeaders, responseType);
|
|
|
+ }
|
|
|
+
|
|
|
+ // 有参数、无自定义头、无响应类型
|
|
|
+ public static Mono<Object> doPost(String url, MultiValueMap<String, String> params) {
|
|
|
+ return doPostInternal(url, params, null, Object.class);
|
|
|
+ }
|
|
|
+
|
|
|
+ // 有参数、无自定义头、有响应类型
|
|
|
+ public static <T> Mono<T> doPost(String url, MultiValueMap<String, String> params, Class<T> responseType) {
|
|
|
+ return doPostInternal(url, params, null, responseType);
|
|
|
+ }
|
|
|
+
|
|
|
+ // 有参数、有自定义头、无响应类型
|
|
|
+ public static Mono<Object> doPost(String url, MultiValueMap<String, String> params, Consumer<HttpHeaders> httpHeaders) {
|
|
|
+ return doPostInternal(url, params, httpHeaders, Object.class);
|
|
|
+ }
|
|
|
+
|
|
|
+ // 有参数、有自定义头、有响应类型
|
|
|
+ public static <T> Mono<T> doPost(String url, MultiValueMap<String, String> params, Consumer<HttpHeaders> httpHeaders, Class<T> responseType) {
|
|
|
+ return doPostInternal(url, params, httpHeaders, responseType);
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
public static <T> Mono<T> doPut(String url, Object request, Class<T> responseType) {
|
|
|
return webClient.put()
|
|
|
.uri(url)
|