1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889 |
- package com.backendsys.aspect;
- import jakarta.servlet.http.HttpServletRequest;
- import org.aspectj.lang.JoinPoint;
- import org.aspectj.lang.ProceedingJoinPoint;
- import org.aspectj.lang.annotation.AfterThrowing;
- import org.aspectj.lang.annotation.Around;
- import org.aspectj.lang.annotation.Aspect;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.data.redis.core.RedisTemplate;
- import org.springframework.stereotype.Component;
- import org.springframework.web.context.request.RequestContextHolder;
- import org.springframework.web.context.request.ServletRequestAttributes;
- import java.util.List;
- import java.util.UUID;
- /**
- * @author Jenson
- *
- * @GetMapping("testQueue")
- * @QueuingPoll
- * public String testQueue(@RequestParam Integer tenantId, @RequestParam Integer time) throws InterruptedException {
- */
- @Aspect
- @Component
- public class QueuingPollAspect {
- @Autowired
- private RedisTemplate<String, String> redisTemplate;
- @Around(value = "@annotation(com.backendsys.aspect.QueuingPoll)")
- public Object translateReturning(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
- ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
- String tenantId = "null";
- if (attributes != null) {
- HttpServletRequest request = attributes.getRequest();
- tenantId = request.getParameter("tenantId");
- }
- // Thread currentThread = Thread.currentThread();
- // 线程 ID 是唯一的,并且在其生命周期内保持不变。 当一个线程终止时,这个线程 ID 可能会被重用。
- // String threadId = String.valueOf(currentThread.getId());
- // 在多实例多情况下线程ID可能会导致重复,所以使用UUID
- String uuid = UUID.randomUUID().toString().replaceAll("-", "");
- // 相同的租户放入同一个redis队列里,实现同租户串行不同的租户并行
- String key = "jenson:list-thread:" + tenantId;
- redisTemplate.opsForList().rightPush(key, uuid);
- boolean waitFlag = Boolean.TRUE;
- while (waitFlag) {
- waitFlag = Boolean.FALSE;
- // System.out.println(threadName + "轮询查看是否轮到自己");
- List<String> top = redisTemplate.opsForList().range(key, 0, 0);
- if (top != null && top.size() > 0) {
- // redis 里有数据
- if (!uuid.equals(top.get(0))) {
- // 队列顶部不是该接口,线程等待
- waitFlag = Boolean.TRUE;
- }
- }
- if (waitFlag) {
- // 根据接口执行平均时长来适度调整休眠时间,休眠时会让出cpu给其他的线程
- Thread.sleep(100);
- }
- }
- Object result = proceedingJoinPoint.proceed();
- // 执行结束,推出队列顶端元素
- redisTemplate.opsForList().leftPop(key);
- return result;
- }
- @AfterThrowing(value = "@annotation(com.backendsys.aspect.QueuingPoll)", throwing = "e")
- public void throwingAdvice(JoinPoint joinPoint, Exception e) {
- ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
- String tenantId = "null";
- if (attributes != null) {
- HttpServletRequest request = attributes.getRequest();
- tenantId = request.getParameter("tenantId");
- }
- String key = "jenson:list-thread:" + tenantId;
- // 抛出错误时也要推出队列顶端元素,否则后面的接口就堵死了
- redisTemplate.opsForList().leftPop(key);
- }
- }
|