LogStreamController.java 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. package com.backendsys.modules.log.controller;
  2. import cn.hutool.core.convert.Convert;
  3. import cn.hutool.core.util.StrUtil;
  4. import com.backendsys.exception.CustomException;
  5. import com.backendsys.modules.common.config.security.annotations.Anonymous;
  6. import com.backendsys.modules.common.utils.Result;
  7. import com.backendsys.modules.log.utils.DingTalkPushUtil;
  8. import com.backendsys.modules.sse.emitter.SseEmitterManager;
  9. import com.backendsys.modules.sse.utils.SseEmitterUTF8;
  10. import com.backendsys.modules.sse.utils.SseUtil;
  11. import lombok.extern.slf4j.Slf4j;
  12. import org.springframework.beans.factory.annotation.Autowired;
  13. import org.springframework.beans.factory.annotation.Value;
  14. import org.springframework.http.MediaType;
  15. import org.springframework.http.codec.ServerSentEvent;
  16. import org.springframework.web.bind.annotation.GetMapping;
  17. import org.springframework.web.bind.annotation.RestController;
  18. import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
  19. import reactor.core.publisher.Flux;
  20. import java.io.BufferedReader;
  21. import java.io.IOException;
  22. import java.io.InputStreamReader;
  23. import java.nio.charset.Charset;
  24. import java.nio.charset.StandardCharsets;
  25. import java.time.Duration;
  26. import java.time.LocalTime;
  27. import java.util.concurrent.TimeUnit;
  28. import java.util.concurrent.atomic.AtomicBoolean;
  29. import java.util.concurrent.atomic.AtomicLong;
  30. @Slf4j
  31. @RestController
  32. public class LogStreamController {
  33. @Autowired
  34. private SseUtil sseUtil;
  35. @Value("${log-stream.enable}")
  36. private Boolean isEnable;
  37. @Value("${log-stream.charset}")
  38. private String charset;
  39. @Value("${log-stream.exec}")
  40. private String exec;
  41. @Value("${log-stream.sign}")
  42. private String signValue;
  43. @Autowired
  44. private DingTalkPushUtil dingTalkPushUtil;
  45. @Anonymous
  46. @GetMapping("/api/test/sendDingMsg")
  47. public Result sendDingMsg() {
  48. // DingTalkPushUtil.testSendTextMsg();
  49. // DingTalkPushUtil.testSendLinkMsg();
  50. dingTalkPushUtil.testSendLinkMsg();
  51. return Result.success();
  52. }
  53. /**
  54. * [SSE] 消息监听
  55. */
  56. @Anonymous
  57. @GetMapping(value = "/api/log/stream/watch", produces = "text/event-stream")
  58. public SseEmitter stream(String sign) {
  59. if (!isEnable) return null;
  60. if (StrUtil.isEmpty(sign)) return null;
  61. if (!signValue.equals(sign)) return null;
  62. String userId = Convert.toStr(1L);
  63. SseEmitterUTF8 emitter = new SseEmitterUTF8(Long.MAX_VALUE);
  64. SseEmitterManager manager = SseEmitterManager.getInstance();
  65. try {
  66. // 如果存在,则关闭
  67. sseUtil.closeIfExist(userId);
  68. // 创建新连接
  69. manager.addEmitter(userId, emitter);
  70. emitter.send(SseEmitter.event().data("Connected successfully! (连接成功)"));
  71. new Thread(() -> {
  72. try {
  73. Process process = Runtime.getRuntime().exec(exec); // "ping 127.0.0.1" | "docker logs -f backendsys";
  74. BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream(), charset));
  75. String line;
  76. while ((line = reader.readLine()) != null) {
  77. emitter.send(SseEmitter.event().data(line));
  78. }
  79. // emitter.complete(); // 完成后关闭连接
  80. } catch (IOException e) {
  81. System.out.println(e.getMessage());
  82. // 不关闭,一直监听
  83. // emitter.completeWithError(e);
  84. }
  85. }).start();
  86. } catch (IOException e) {
  87. // emitter.complete(); // 关闭连接
  88. manager.emitters.remove(emitter);
  89. }
  90. return emitter;
  91. }
  92. /**
  93. * [SSE] 测试发送
  94. */
  95. @Anonymous
  96. @GetMapping("/api/log/stream/send")
  97. public String send() {
  98. String message = "{\"message\": \"Hello World 中文\"}";
  99. sseUtil.send(message);
  100. return "success";
  101. }
  102. }