package com.backendsys.modules.log.controller; import cn.hutool.core.convert.Convert; import cn.hutool.core.util.StrUtil; import com.backendsys.exception.CustomException; import com.backendsys.modules.common.config.security.annotations.Anonymous; import com.backendsys.modules.common.utils.Result; import com.backendsys.modules.log.utils.DingTalkPushUtil; import com.backendsys.modules.sse.emitter.SseEmitterManager; import com.backendsys.modules.sse.utils.SseEmitterUTF8; import com.backendsys.modules.sse.utils.SseUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.http.MediaType; import org.springframework.http.codec.ServerSentEvent; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; import reactor.core.publisher.Flux; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.time.Duration; import java.time.LocalTime; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @Slf4j @RestController public class LogStreamController { @Autowired private SseUtil sseUtil; @Value("${log-stream.enable}") private Boolean isEnable; @Value("${log-stream.charset}") private String charset; @Value("${log-stream.exec}") private String exec; @Value("${log-stream.sign}") private String signValue; @Autowired private DingTalkPushUtil dingTalkPushUtil; @Anonymous @GetMapping("/api/test/sendDingMsg") public Result sendDingMsg() { // DingTalkPushUtil.testSendTextMsg(); // DingTalkPushUtil.testSendLinkMsg(); dingTalkPushUtil.testSendLinkMsg(); return Result.success(); } /** * [SSE] 消息监听 */ @Anonymous @GetMapping(value = "/api/log/stream/watch", produces = "text/event-stream") public SseEmitter stream(String sign) { if (!isEnable) return null; if (StrUtil.isEmpty(sign)) return null; if (!signValue.equals(sign)) return null; String userId = Convert.toStr(1L); SseEmitterUTF8 emitter = new SseEmitterUTF8(Long.MAX_VALUE); SseEmitterManager manager = SseEmitterManager.getInstance(); try { // 如果存在,则关闭 sseUtil.closeIfExist(userId); // 创建新连接 manager.addEmitter(userId, emitter); emitter.send(SseEmitter.event().data("Connected successfully! (连接成功)")); new Thread(() -> { try { Process process = Runtime.getRuntime().exec(exec); // "ping 127.0.0.1" | "docker logs -f backendsys"; BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream(), charset)); String line; while ((line = reader.readLine()) != null) { emitter.send(SseEmitter.event().data(line)); } // emitter.complete(); // 完成后关闭连接 } catch (IOException e) { System.out.println(e.getMessage()); // 不关闭,一直监听 // emitter.completeWithError(e); } }).start(); } catch (IOException e) { // emitter.complete(); // 关闭连接 manager.emitters.remove(emitter); } return emitter; } /** * [SSE] 测试发送 */ @Anonymous @GetMapping("/api/log/stream/send") public String send() { String message = "{\"message\": \"Hello World 中文\"}"; sseUtil.send(message); return "success"; } }