QueuingPollAspect.java 3.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. package com.backendsys.aspect;
  2. import jakarta.servlet.http.HttpServletRequest;
  3. import org.aspectj.lang.JoinPoint;
  4. import org.aspectj.lang.ProceedingJoinPoint;
  5. import org.aspectj.lang.annotation.AfterThrowing;
  6. import org.aspectj.lang.annotation.Around;
  7. import org.aspectj.lang.annotation.Aspect;
  8. import org.springframework.beans.factory.annotation.Autowired;
  9. import org.springframework.data.redis.core.RedisTemplate;
  10. import org.springframework.stereotype.Component;
  11. import org.springframework.web.context.request.RequestContextHolder;
  12. import org.springframework.web.context.request.ServletRequestAttributes;
  13. import java.util.List;
  14. import java.util.UUID;
  15. /**
  16. * @author Jenson
  17. *
  18. * @GetMapping("testQueue")
  19. * @QueuingPoll
  20. * public String testQueue(@RequestParam Integer tenantId, @RequestParam Integer time) throws InterruptedException {
  21. */
  22. @Aspect
  23. @Component
  24. public class QueuingPollAspect {
  25. @Autowired
  26. private RedisTemplate<String, String> redisTemplate;
  27. @Around(value = "@annotation(com.backendsys.aspect.QueuingPoll)")
  28. public Object translateReturning(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
  29. ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
  30. String tenantId = "null";
  31. if (attributes != null) {
  32. HttpServletRequest request = attributes.getRequest();
  33. tenantId = request.getParameter("tenantId");
  34. }
  35. // Thread currentThread = Thread.currentThread();
  36. // 线程 ID 是唯一的,并且在其生命周期内保持不变。 当一个线程终止时,这个线程 ID 可能会被重用。
  37. // String threadId = String.valueOf(currentThread.getId());
  38. // 在多实例多情况下线程ID可能会导致重复,所以使用UUID
  39. String uuid = UUID.randomUUID().toString().replaceAll("-", "");
  40. // 相同的租户放入同一个redis队列里,实现同租户串行不同的租户并行
  41. String key = "jenson:list-thread:" + tenantId;
  42. redisTemplate.opsForList().rightPush(key, uuid);
  43. boolean waitFlag = Boolean.TRUE;
  44. while (waitFlag) {
  45. waitFlag = Boolean.FALSE;
  46. // System.out.println(threadName + "轮询查看是否轮到自己");
  47. List<String> top = redisTemplate.opsForList().range(key, 0, 0);
  48. if (top != null && top.size() > 0) {
  49. // redis 里有数据
  50. if (!uuid.equals(top.get(0))) {
  51. // 队列顶部不是该接口,线程等待
  52. waitFlag = Boolean.TRUE;
  53. }
  54. }
  55. if (waitFlag) {
  56. // 根据接口执行平均时长来适度调整休眠时间,休眠时会让出cpu给其他的线程
  57. Thread.sleep(100);
  58. }
  59. }
  60. Object result = proceedingJoinPoint.proceed();
  61. // 执行结束,推出队列顶端元素
  62. redisTemplate.opsForList().leftPop(key);
  63. return result;
  64. }
  65. @AfterThrowing(value = "@annotation(com.backendsys.aspect.QueuingPoll)", throwing = "e")
  66. public void throwingAdvice(JoinPoint joinPoint, Exception e) {
  67. ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
  68. String tenantId = "null";
  69. if (attributes != null) {
  70. HttpServletRequest request = attributes.getRequest();
  71. tenantId = request.getParameter("tenantId");
  72. }
  73. String key = "jenson:list-thread:" + tenantId;
  74. // 抛出错误时也要推出队列顶端元素,否则后面的接口就堵死了
  75. redisTemplate.opsForList().leftPop(key);
  76. }
  77. }