|
@@ -0,0 +1,202 @@
|
|
|
|
+package com.backendsys.modules.queue.service.impl;
|
|
|
|
+
|
|
|
|
+import cn.hutool.core.util.ObjectUtil;
|
|
|
|
+import cn.hutool.core.util.StrUtil;
|
|
|
|
+import com.backendsys.modules.queue.entity.Entire;
|
|
|
|
+import com.backendsys.modules.queue.service.TaskService;
|
|
|
|
+import org.springframework.data.domain.Range;
|
|
|
|
+import jakarta.annotation.PostConstruct;
|
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
|
+import org.springframework.beans.factory.annotation.Value;
|
|
|
|
+import org.springframework.data.redis.RedisSystemException;
|
|
|
|
+import org.springframework.data.redis.connection.Limit;
|
|
|
|
+import org.springframework.data.redis.connection.stream.*;
|
|
|
|
+import org.springframework.data.redis.core.RedisTemplate;
|
|
|
|
+import org.springframework.data.redis.core.StreamOperations;
|
|
|
|
+import org.springframework.data.redis.stream.StreamListener;
|
|
|
|
+import org.springframework.data.redis.stream.StreamMessageListenerContainer;
|
|
|
|
+import org.springframework.stereotype.Service;
|
|
|
|
+
|
|
|
|
+import java.time.Duration;
|
|
|
|
+import java.util.Collections;
|
|
|
|
+import java.util.List;
|
|
|
|
+import java.util.Map;
|
|
|
|
+import java.util.stream.Collectors;
|
|
|
|
+
|
|
|
|
+import static com.volcengine.model.tls.Const.STREAM;
|
|
|
|
+
|
|
|
|
+/**
|
|
|
|
+ * 任务队列服务
|
|
|
|
+ */
|
|
|
|
+@Service
|
|
|
|
+public class TaskServiceImpl implements TaskService, StreamListener<String, MapRecord<String,String,String>> {
|
|
|
|
+
|
|
|
|
+ @Autowired
|
|
|
|
+ private StreamMessageListenerContainer<String, MapRecord<String,String,String>> container;
|
|
|
|
+
|
|
|
|
+ @Autowired
|
|
|
|
+ private RedisTemplate redisTemplate;
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * 初始化消费者组
|
|
|
|
+ */
|
|
|
|
+ public void initConsumerGroup(String stream_key, String group_name) {
|
|
|
|
+ System.out.println("initConsumerGroup: [stream_key = " + stream_key + ", group_name = " + group_name + "]");
|
|
|
|
+ try {
|
|
|
|
+ redisTemplate.opsForStream().createGroup(stream_key, group_name);
|
|
|
|
+ } catch (RedisSystemException e) {
|
|
|
|
+ if (!e.getCause().getMessage().contains("BUSYGROUP")) {
|
|
|
|
+ throw e; // 忽略"组已存在"错误
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ container.receive(Consumer.from(group_name, "my-consumer"),
|
|
|
|
+ StreamOffset.create(STREAM, ReadOffset.lastConsumed()), this);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * 获取任务队列 (显示前5条)
|
|
|
|
+ */
|
|
|
|
+ @Override
|
|
|
|
+ public List<MapRecord<String, String, String>> getTaskList(String stream_key) {
|
|
|
|
+ return redisTemplate.opsForStream()
|
|
|
|
+ .reverseRange(stream_key, Range.unbounded(), Limit.limit().count(5));
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * 获取任务队列 (未消费)
|
|
|
|
+ */
|
|
|
|
+ @Override
|
|
|
|
+ public List<MapRecord<String, String, String>> getTaskPaddingList(String stream_key) {
|
|
|
|
+ String group_name = stream_key + ":group";
|
|
|
|
+
|
|
|
|
+ // 1. 先确保流和消费者组存在,只需建一次
|
|
|
|
+ String stream = "my-stream";
|
|
|
|
+ String group = "my-group";
|
|
|
|
+ try { // 如果组已存在会抛异常,忽略即可
|
|
|
|
+ redisTemplate.opsForStream().createGroup(stream, ReadOffset.latest(), group);
|
|
|
|
+ } catch (Exception ignored) {}
|
|
|
|
+
|
|
|
|
+ // 2. 轮询消费
|
|
|
|
+ StreamOperations<String, String, String> ops = redisTemplate.opsForStream();
|
|
|
|
+ while (true) {
|
|
|
|
+ List<MapRecord<String, String, String>> list = ops.read(
|
|
|
|
+ Consumer.from(group, "sync-consumer"),
|
|
|
|
+ StreamReadOptions.empty().count(5).block(Duration.ofSeconds(3)),
|
|
|
|
+ StreamOffset.create(stream, ReadOffset.lastConsumed())
|
|
|
|
+ );
|
|
|
|
+ if (list.isEmpty()) continue;
|
|
|
|
+
|
|
|
|
+ for (MapRecord<String, String, String> r : list) {
|
|
|
|
+ System.out.println("收到消息 id=" + r.getId() + " body=" + r.getValue());
|
|
|
|
+ // 3. ack
|
|
|
|
+ redisTemplate.opsForStream().acknowledge(stream, group, r.getId());
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * 消费任务 (不移除)
|
|
|
|
+ */
|
|
|
|
+ @Override
|
|
|
|
+ public void consumeTask(String stream_key, String task_id) {
|
|
|
|
+ String group_name = stream_key + ":group";
|
|
|
|
+
|
|
|
|
+ // 用组里任意消费者读一条(不 ACK)
|
|
|
|
+ List<MapRecord<String, String, String>> records = redisTemplate.opsForStream()
|
|
|
|
+ .read(Consumer.from(group_name, "my-consumer"), StreamOffset.create(stream_key, ReadOffset.from(">")));
|
|
|
|
+ System.out.println("records = " + records);
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+// Long acknowledged = redisTemplate.opsForStream().acknowledge(stream_key, stream_key + ":group", task_id);
|
|
|
|
+// System.out.println("acknowledged = " + acknowledged);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * 添加任务
|
|
|
|
+ */
|
|
|
|
+ @Override
|
|
|
|
+ public RecordId addTask(String stream_key, Map<String, Object> task_data) {
|
|
|
|
+ ObjectRecord<String, Map<String, Object>> task = StreamRecords.newRecord()
|
|
|
|
+ .ofObject(task_data)
|
|
|
|
+ .withStreamKey(stream_key);
|
|
|
|
+ // 添加任务
|
|
|
|
+ return redisTemplate.opsForStream().add(task);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * 移除任务
|
|
|
|
+ */
|
|
|
|
+ @Override
|
|
|
|
+ public void removeTask(String stream_key, String task_id) {
|
|
|
|
+ redisTemplate.opsForStream().delete(stream_key, task_id);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public void onMessage(MapRecord<String, String, String> message) {
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+// // 1. 安全获取消息
|
|
|
|
+// List<MapRecord<String, String, String>> task_list = redisTemplate.opsForStream()
|
|
|
|
+// .read(Consumer.from(stream_key + ":group", "consumer-1"),
|
|
|
|
+// StreamReadOptions.empty().count(1),
|
|
|
|
+// StreamOffset.create(stream_key, ReadOffset.lastConsumed()));
|
|
|
|
+
|
|
|
|
+// System.out.println("(consumeTask) task_list = " + task_list);
|
|
|
|
+
|
|
|
|
+// if (task_list != null && !task_list.isEmpty()) {
|
|
|
|
+// // 2. 处理消息
|
|
|
|
+// MapRecord<String, String, String> record = task_list.get(0);
|
|
|
|
+// try {
|
|
|
|
+ // 实际业务处理(如调用文生图API)
|
|
|
|
+ // processTask(record.getValue());
|
|
|
|
+
|
|
|
|
+// // 3. 确认消息(ACK)
|
|
|
|
+// redisTemplate.opsForStream().acknowledge(stream_key, stream_key + ":group", record.getId());
|
|
|
|
+
|
|
|
|
+ // 在ACK后追加删除操作
|
|
|
|
+// redisTemplate.opsForStream().delete(stream_key, record.getId());
|
|
|
|
+
|
|
|
|
+// } catch (Exception e) {
|
|
|
|
+// System.out.println("任务处理失败: " + record.getId() + ", " + e.getMessage());
|
|
|
|
+// }
|
|
|
|
+// }
|
|
|
|
+
|
|
|
|
+// }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * 获取任务队列
|
|
|
|
+ */
|
|
|
|
+// @Override
|
|
|
|
+// public List<MapRecord<String, String, String>> getTaskList(String stream_key) {
|
|
|
|
+// return getTaskList(stream_key, null);
|
|
|
|
+// }
|
|
|
|
+// @Override
|
|
|
|
+// public List<MapRecord<String, String, String>> getTaskList(String stream_key, Entire entire) {
|
|
|
|
+//
|
|
|
|
+// // 1. 读取全部数据
|
|
|
|
+// List<MapRecord<String, String, String>> task_list = redisTemplate.opsForStream().read(StreamOffset.fromStart(stream_key));
|
|
|
|
+//
|
|
|
|
+// // 2. 根据 entire 过滤
|
|
|
|
+// if (entire != null) {
|
|
|
|
+// if (StrUtil.isNotEmpty(entire.getKey()) && ObjectUtil.isNotEmpty(entire.getValue())) {
|
|
|
|
+// task_list.stream().filter(task -> {
|
|
|
|
+// return entire.getValue().equals(task.getValue().get(entire.getKey()));
|
|
|
|
+// }).collect(Collectors.toList());
|
|
|
|
+// }
|
|
|
|
+// }
|
|
|
|
+// return task_list;
|
|
|
|
+// }
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+}
|