业务场景

前台注册时,给用户的邮箱异步发送邮件用于激活账户.服务器资源紧张不能引入MQ队列来实现,所以用JUC写了个消费者功能废话不多说直接上代码

项目结构

Spring Boot Web项目ConcurrentLinkedQueue+ReentrantLock+Condition实现消费者功能(类似MQ队列)

代码

消费者核心代码
/**  * 描述: 消费者线程  * date: 2020/6/22 0022  **/ @Slf4j public class ConsumerTask implements Runnable {      public void run() {         doConsumer();     }      private void doConsumer() {         ConsumerTaskHolder.lock.lock();         try{             while (true){                  while (ConsumerTaskHolder.queue.size() == 0){                     log.info("队列中无数据等待数据中......");                     ConsumerTaskHolder.notEmpty.await();                 }                  //消费邮箱                 String email = ConsumerTaskHolder.queue.poll();                 log.info("收到邮箱:{},发送邮件......",email);                 // 处理逻辑 发送邮件..... some action             }          } catch (InterruptedException e) {             e.printStackTrace();         } finally {             ConsumerTaskHolder.lock.unlock();         }     } } 
生产者代码
/**  * 描述: 提供者主要用于给队列推送数据  * date: 2020/6/22 0022  **/ public class ProviderTask {      /**      * 给队列推送数据      * @param email      */     public static void pushQueue(String email){         ConsumerTaskHolder.lock.lock();         try {             ConsumerTaskHolder.queue.add(email);             //唤醒消费者线程处理任务啦             ConsumerTaskHolder.notEmpty.signalAll();         }finally {             ConsumerTaskHolder.lock.unlock();         }     } } 
生产者和消费者共同使用的队列等资源代码
/**  * 描述: 消费者相关资源  * date: 2020/6/22 0022  **/ public class ConsumerTaskHolder {     public static Lock lock = new ReentrantLock();     public static Condition notEmpty = lock.newCondition();     /**      * 存放数据的队列      */     public static ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue();  } 
项目启动加载线程池代码
/**  * 描述: 线程池启动加载  * date: 2020/6/22 0022  **/ @Slf4j @Component @Order(0) public class ConsumerThreadPool implements ApplicationRunner {     /**      * 线程池      */     public static final ExecutorService executorService = Executors.newFixedThreadPool(5);      public void run(ApplicationArguments args) {         log.info("加载线程池中.........");         try {             ConsumerTask consumerTask = new ConsumerTask();             executorService.submit(consumerTask);             log.info("线程池启动完成......");         } catch (Exception e) {             log.error("线程池启动报错了.......");             e.printStackTrace();         }     }       /**      * 项目销毁前执行      */     @PreDestroy     public void destroy() {         try {             executorService.shutdown();//优雅的关闭             log.info("关闭线程池...."); //          List<Runnable> runnables = executorService.shutdownNow();//直接关闭 //          log.debug("未执行完的任务,"+runnables);         } catch (Exception e) {             log.error("线程池关闭失败...", e);             e.printStackTrace();         }      }   } 

项目启动后如图

Spring Boot Web项目ConcurrentLinkedQueue+ReentrantLock+Condition实现消费者功能(类似MQ队列)

Controller和Service代码

@RequestMapping("/system") @Controller public class SystemController {      @Autowired     UserService userService;      @GetMapping("/{path}")     public String index(@PathVariable  String path){         return path;     }      @ResponseBody     @PostMapping("/register")     public Map register(User user){         Map resultMap = new HashMap();         userService.register(user);         return resultMap;     } } 
@Slf4j @Service public class UserServiceImpl implements UserService {      public void register(User user) {         String email = user.getEmail();         //给队列推送邮箱账号过去,队列处理发送邮件         ProviderTask.pushQueue(email);          //some action         log.info("保存用户信息到数据库");     } } 

前台页面推送数据

Spring Boot Web项目ConcurrentLinkedQueue+ReentrantLock+Condition实现消费者功能(类似MQ队列)

控制台效果如图

Spring Boot Web项目ConcurrentLinkedQueue+ReentrantLock+Condition实现消费者功能(类似MQ队列)

队列正常消费接收到邮箱号就处理发送邮件.

总结

项目启动后消费者就会等待数据,生产者推送一个数据过去后唤醒消费者,消费者线程接收到唤醒信号去消费数据.反复如此,由于是Demo所以代码比较简单,实际业务中的问题自行完善即可.

项目地址

项目地址