123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118 |
- package com.backendsys.modules.log.controller;
- import cn.hutool.core.convert.Convert;
- import cn.hutool.core.util.StrUtil;
- import com.backendsys.exception.CustomException;
- import com.backendsys.modules.common.config.security.annotations.Anonymous;
- import com.backendsys.modules.common.utils.Result;
- import com.backendsys.modules.log.utils.DingTalkPushUtil;
- import com.backendsys.modules.sse.emitter.SseEmitterManager;
- import com.backendsys.modules.sse.utils.SseEmitterUTF8;
- import com.backendsys.modules.sse.utils.SseUtil;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.http.MediaType;
- import org.springframework.http.codec.ServerSentEvent;
- import org.springframework.web.bind.annotation.GetMapping;
- import org.springframework.web.bind.annotation.RestController;
- import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
- import reactor.core.publisher.Flux;
- import java.io.BufferedReader;
- import java.io.IOException;
- import java.io.InputStreamReader;
- import java.nio.charset.Charset;
- import java.nio.charset.StandardCharsets;
- import java.time.Duration;
- import java.time.LocalTime;
- import java.util.concurrent.TimeUnit;
- import java.util.concurrent.atomic.AtomicBoolean;
- import java.util.concurrent.atomic.AtomicLong;
- @Slf4j
- @RestController
- public class LogStreamController {
- @Autowired
- private SseUtil sseUtil;
- @Value("${log-stream.enable}")
- private Boolean isEnable;
- @Value("${log-stream.charset}")
- private String charset;
- @Value("${log-stream.exec}")
- private String exec;
- @Value("${log-stream.sign}")
- private String signValue;
- @Autowired
- private DingTalkPushUtil dingTalkPushUtil;
- @Anonymous
- @GetMapping("/api/test/sendDingMsg")
- public Result sendDingMsg() {
- // DingTalkPushUtil.testSendTextMsg();
- // DingTalkPushUtil.testSendLinkMsg();
- dingTalkPushUtil.testSendLinkMsg();
- return Result.success();
- }
- /**
- * [SSE] 消息监听
- */
- @Anonymous
- @GetMapping(value = "/api/log/stream/watch", produces = "text/event-stream")
- public SseEmitter stream(String sign) {
- if (!isEnable) return null;
- if (StrUtil.isEmpty(sign)) return null;
- if (!signValue.equals(sign)) return null;
- String userId = Convert.toStr(1L);
- SseEmitterUTF8 emitter = new SseEmitterUTF8(Long.MAX_VALUE);
- SseEmitterManager manager = SseEmitterManager.getInstance();
- try {
- // 如果存在,则关闭
- sseUtil.closeIfExist(userId);
- // 创建新连接
- manager.addEmitter(userId, emitter);
- emitter.send(SseEmitter.event().data("Connected successfully! (连接成功)"));
- new Thread(() -> {
- try {
- Process process = Runtime.getRuntime().exec(exec); // "ping 127.0.0.1" | "docker logs -f backendsys";
- BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream(), charset));
- String line;
- while ((line = reader.readLine()) != null) {
- emitter.send(SseEmitter.event().data(line));
- }
- // emitter.complete(); // 完成后关闭连接
- } catch (IOException e) {
- System.out.println(e.getMessage());
- // 不关闭,一直监听
- // emitter.completeWithError(e);
- }
- }).start();
- } catch (IOException e) {
- // emitter.complete(); // 关闭连接
- manager.emitters.remove(emitter);
- }
- return emitter;
- }
- /**
- * [SSE] 测试发送
- */
- @Anonymous
- @GetMapping("/api/log/stream/send")
- public String send() {
- String message = "{\"message\": \"Hello World 中文\"}";
- sseUtil.send(message);
- return "success";
- }
- }
|