package com.backendsys.aspect; import jakarta.servlet.http.HttpServletRequest; import org.aspectj.lang.JoinPoint; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.AfterThrowing; import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; import org.springframework.web.context.request.RequestContextHolder; import org.springframework.web.context.request.ServletRequestAttributes; import java.util.List; import java.util.UUID; /** * @author Jenson * * @GetMapping("testQueue") * @QueuingPoll * public String testQueue(@RequestParam Integer tenantId, @RequestParam Integer time) throws InterruptedException { */ @Aspect @Component public class QueuingPollAspect { @Autowired private RedisTemplate redisTemplate; @Around(value = "@annotation(com.backendsys.aspect.QueuingPoll)") public Object translateReturning(ProceedingJoinPoint proceedingJoinPoint) throws Throwable { ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes(); String tenantId = "null"; if (attributes != null) { HttpServletRequest request = attributes.getRequest(); tenantId = request.getParameter("tenantId"); } // Thread currentThread = Thread.currentThread(); // 线程 ID 是唯一的,并且在其生命周期内保持不变。 当一个线程终止时,这个线程 ID 可能会被重用。 // String threadId = String.valueOf(currentThread.getId()); // 在多实例多情况下线程ID可能会导致重复,所以使用UUID String uuid = UUID.randomUUID().toString().replaceAll("-", ""); // 相同的租户放入同一个redis队列里,实现同租户串行不同的租户并行 String key = "jenson:list-thread:" + tenantId; redisTemplate.opsForList().rightPush(key, uuid); boolean waitFlag = Boolean.TRUE; while (waitFlag) { waitFlag = Boolean.FALSE; // System.out.println(threadName + "轮询查看是否轮到自己"); List top = redisTemplate.opsForList().range(key, 0, 0); if (top != null && top.size() > 0) { // redis 里有数据 if (!uuid.equals(top.get(0))) { // 队列顶部不是该接口,线程等待 waitFlag = Boolean.TRUE; } } if (waitFlag) { // 根据接口执行平均时长来适度调整休眠时间,休眠时会让出cpu给其他的线程 Thread.sleep(100); } } Object result = proceedingJoinPoint.proceed(); // 执行结束,推出队列顶端元素 redisTemplate.opsForList().leftPop(key); return result; } @AfterThrowing(value = "@annotation(com.backendsys.aspect.QueuingPoll)", throwing = "e") public void throwingAdvice(JoinPoint joinPoint, Exception e) { ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes(); String tenantId = "null"; if (attributes != null) { HttpServletRequest request = attributes.getRequest(); tenantId = request.getParameter("tenantId"); } String key = "jenson:list-thread:" + tenantId; // 抛出错误时也要推出队列顶端元素,否则后面的接口就堵死了 redisTemplate.opsForList().leftPop(key); } }