JUC中的CompletableFuture异步编排

时间:2024-8-31    作者:老大夫    分类: 乐尚代驾


二、CompletableFuture异步编排

1、CompletableFuture异步编排

1.1、CompletableFuture介绍

  • 问题:司机结束代驾服务页面非常复杂,数据的获取都需要远程调用,必然需要花费更多的时间。

假如司机结束代驾服务的每个查询,需要如下标注的时间才能完成

  1. 获取订单信息 1s
  2. 计算防止刷单 0.5s
  3. 计算订单实际里程 0.5s
  4. 计算订单实际代驾费用 1s
  5. ......
  • 那么,司机需要4s后才能结束代驾服务。很显然是不能接受的。如果有多个线程同时完成这多步操作,也许只需要1.1s即可完成响应。

  • 使用CompletableFuture可用于线程异步编排,使原本串行执行的代码,变为并行执行,提高代码执行速度。

1.2、CompletableFuture使用

说明:使用CompletableFuture异步编排大多方法都会有一个重载方法,会多出一个executor参数,用来传来自定义的线程池,如果不传就会使用默认的线程池。

1.2.1、创建异步编排对象
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor);

public static CompletableFuture<Void> runAsync(Runnable runnable);
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);
1.2.2、线程串行方法
// 使线程串行执行,无入参,无返回值
public CompletableFuture<Void> thenRun(Runnable action);
public CompletableFuture<Void> thenRunAsync(Runnable action);
public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor);

// 使线程串行执行,有入参,无返回值
public CompletableFuture<Void> thenAccept(Consumer<? super T> action);
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor);

// 使线程串行执行,有入参,有返回值
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn);
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn);
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor);
1.2.3、多任务组合
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs);
1.2.4、代码示例
package com.atguigu.daijia.driver;

import lombok.SneakyThrows;

import java.util.concurrent.*;

public class CompletableFutureTest5 {

    @SneakyThrows
    public static void main(String[] args) {
        //动态获取服务器核数
        int processors = Runtime.getRuntime().availableProcessors();
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                processors+1, // 核心线程个数 io:2n ,cpu: n+1  n:内核数据
                processors+1,
                0,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(10),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy()
        );

        CompletableFuture<String> future01 = CompletableFuture.supplyAsync(() -> "任务1", executor);
        CompletableFuture<String> future02 = CompletableFuture.supplyAsync(() -> "任务2", executor);
        CompletableFuture<String> future03 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "任务3";
        }, executor);

        // 串联起若干个线程任务, 没有返回值
        CompletableFuture<Void> all = CompletableFuture.allOf(future01, future02, future03);
        // 等待所有线程执行完成
        // .join()和.get()都会阻塞并获取线程的执行情况
        // .join()会抛出未经检查的异常,不会强制开发者处理异常 .get()会抛出检查异常,需要开发者处理
        all.join();
        all.get();
    }
}   

2、结束代驾

2.1、ThreadPoolConfig

全局自定义线程池配置

package com.atguigu.daijia.driver.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * 全局自定义线程池配置
 */
@Configuration
public class ThreadPoolConfig {

    @Bean
    public ThreadPoolExecutor threadPoolExecutor(){
        //动态获取服务器核数
        int processors = Runtime.getRuntime().availableProcessors();
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                processors+1, // 核心线程个数 io:2n ,cpu: n+1  n:内核数据
                processors+1,
                0,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(3),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy()
        );
        //  返回线程池对象
        return threadPoolExecutor;
    }
}

2.2、OrderServiceImpl

@Autowired
private ThreadPoolExecutor threadPoolExecutor;

@SneakyThrows
@Override
public Boolean endDrive(OrderFeeForm orderFeeForm) {
   //1.获取订单信息
   CompletableFuture<OrderInfo> orderInfoCompletableFuture = CompletableFuture.supplyAsync(() -> {
      OrderInfo orderInfo = orderInfoFeignClient.getOrderInfo(orderFeeForm.getOrderId()).getData();
      return orderInfo;
   }, threadPoolExecutor);

   //2.防止刷单,计算司机的经纬度与代驾的终点经纬度是否在2公里范围内
   CompletableFuture<OrderServiceLastLocationVo> orderServiceLastLocationVoCompletableFuture = CompletableFuture.supplyAsync((() -> {
      OrderServiceLastLocationVo orderServiceLastLocationVo = locationFeignClient.getOrderServiceLastLocation(orderFeeForm.getOrderId()).getData();
      return orderServiceLastLocationVo;
   }), threadPoolExecutor);

   //合并
   CompletableFuture.allOf(orderInfoCompletableFuture,
         orderServiceLastLocationVoCompletableFuture
   ).join();

   //获取数据
   OrderInfo orderInfo = orderInfoCompletableFuture.get();
   //2.1.判断刷单
   OrderServiceLastLocationVo orderServiceLastLocationVo = orderServiceLastLocationVoCompletableFuture.get();
   //司机的位置与代驾终点位置的距离
   double distance = LocationUtil.getDistance(orderInfo.getEndPointLatitude().doubleValue(), orderInfo.getEndPointLongitude().doubleValue(), orderServiceLastLocationVo.getLatitude().doubleValue(), orderServiceLastLocationVo.getLongitude().doubleValue());
   if(distance > SystemConstant.DRIVER_START_LOCATION_DISTION) {
      throw new GuiguException(ResultCodeEnum.DRIVER_END_LOCATION_DISTION_ERROR);
   }

   //3.计算订单实际里程
   CompletableFuture<BigDecimal> realDistanceCompletableFuture = CompletableFuture.supplyAsync(() -> {
      BigDecimal realDistance = locationFeignClient.calculateOrderRealDistance(orderFeeForm.getOrderId()).getData();
      log.info("结束代驾,订单实际里程:{}", realDistance);
      return realDistance;
   }, threadPoolExecutor);

   //4.计算代驾实际费用
   CompletableFuture<FeeRuleResponseVo> feeRuleResponseVoCompletableFuture = realDistanceCompletableFuture.thenApplyAsync((realDistance)->{
      FeeRuleRequestForm feeRuleRequestForm = new FeeRuleRequestForm();
      feeRuleRequestForm.setDistance(realDistance);
      feeRuleRequestForm.setStartTime(orderInfo.getStartServiceTime());
      //等候时间
      Integer waitMinute = Math.abs((int) ((orderInfo.getArriveTime().getTime() - orderInfo.getAcceptTime().getTime()) / (1000 * 60)));
      feeRuleRequestForm.setWaitMinute(waitMinute);
      log.info("结束代驾,费用参数:{}", JSON.toJSONString(feeRuleRequestForm));
      FeeRuleResponseVo feeRuleResponseVo = feeRuleFeignClient.calculateOrderFee(feeRuleRequestForm).getData();
      log.info("费用明细:{}", JSON.toJSONString(feeRuleResponseVo));
      //订单总金额 需加上 路桥费、停车费、其他费用、乘客好处费
      BigDecimal totalAmount = feeRuleResponseVo.getTotalAmount().add(orderFeeForm.getTollFee()).add(orderFeeForm.getParkingFee()).add(orderFeeForm.getOtherFee()).add(orderInfo.getFavourFee());
      feeRuleResponseVo.setTotalAmount(totalAmount);
      return feeRuleResponseVo;
   });

   //5.计算系统奖励
   //5.1.获取订单数
   CompletableFuture<Long> orderNumCompletableFuture = CompletableFuture.supplyAsync(() -> {
      String startTime = new DateTime(orderInfo.getStartServiceTime()).toString("yyyy-MM-dd") + " 00:00:00";
      String endTime = new DateTime(orderInfo.getStartServiceTime()).toString("yyyy-MM-dd") + " 24:00:00";
      Long orderNum = orderInfoFeignClient.getOrderNumByTime(startTime, endTime).getData();
      return orderNum;
   }, threadPoolExecutor);
   //5.2.封装参数
   CompletableFuture<RewardRuleResponseVo> rewardRuleResponseVoCompletableFuture = orderNumCompletableFuture.thenApplyAsync((orderNum)->{
      RewardRuleRequestForm rewardRuleRequestForm = new RewardRuleRequestForm();
      rewardRuleRequestForm.setStartTime(orderInfo.getStartServiceTime());
      rewardRuleRequestForm.setOrderNum(orderNum);
      //5.3.执行
      RewardRuleResponseVo rewardRuleResponseVo = rewardRuleFeignClient.calculateOrderRewardFee(rewardRuleRequestForm).getData();
      log.info("结束代驾,系统奖励:{}", JSON.toJSONString(rewardRuleResponseVo));
      return rewardRuleResponseVo;
   });

   //6.计算分账信息
   CompletableFuture<ProfitsharingRuleResponseVo> profitsharingRuleResponseVoCompletableFuture = feeRuleResponseVoCompletableFuture.thenCombineAsync(orderNumCompletableFuture, (feeRuleResponseVo, orderNum)->{
      ProfitsharingRuleRequestForm profitsharingRuleRequestForm = new ProfitsharingRuleRequestForm();
      profitsharingRuleRequestForm.setOrderAmount(feeRuleResponseVo.getTotalAmount());
      profitsharingRuleRequestForm.setOrderNum(orderNum);
      ProfitsharingRuleResponseVo profitsharingRuleResponseVo = profitsharingRuleFeignClient.calculateOrderProfitsharingFee(profitsharingRuleRequestForm).getData();
      log.info("结束代驾,分账信息:{}", JSON.toJSONString(profitsharingRuleResponseVo));
      return profitsharingRuleResponseVo;
   });
   CompletableFuture.allOf(orderInfoCompletableFuture,
         realDistanceCompletableFuture,
         feeRuleResponseVoCompletableFuture,
         orderNumCompletableFuture,
         rewardRuleResponseVoCompletableFuture,
         profitsharingRuleResponseVoCompletableFuture
   ).join();

   //获取执行结果
   BigDecimal realDistance = realDistanceCompletableFuture.get();
   FeeRuleResponseVo feeRuleResponseVo = feeRuleResponseVoCompletableFuture.get();
   RewardRuleResponseVo rewardRuleResponseVo = rewardRuleResponseVoCompletableFuture.get();
   ProfitsharingRuleResponseVo profitsharingRuleResponseVo = profitsharingRuleResponseVoCompletableFuture.get();

   //7.封装更新订单账单相关实体对象
   UpdateOrderBillForm updateOrderBillForm = new UpdateOrderBillForm();
   updateOrderBillForm.setOrderId(orderFeeForm.getOrderId());
   updateOrderBillForm.setDriverId(orderFeeForm.getDriverId());
   //路桥费、停车费、其他费用
   updateOrderBillForm.setTollFee(orderFeeForm.getTollFee());
   updateOrderBillForm.setParkingFee(orderFeeForm.getParkingFee());
   updateOrderBillForm.setOtherFee(orderFeeForm.getOtherFee());
   //乘客好处费
   updateOrderBillForm.setFavourFee(orderInfo.getFavourFee());

   //实际里程
   updateOrderBillForm.setRealDistance(realDistance);
   //订单奖励信息
   BeanUtils.copyProperties(rewardRuleResponseVo, updateOrderBillForm);
   //代驾费用信息
   BeanUtils.copyProperties(feeRuleResponseVo, updateOrderBillForm);

   //分账相关信息
   BeanUtils.copyProperties(profitsharingRuleResponseVo, updateOrderBillForm);
   updateOrderBillForm.setProfitsharingRuleId(profitsharingRuleResponseVo.getProfitsharingRuleId());
   log.info("结束代驾,更新账单信息:{}", JSON.toJSONString(updateOrderBillForm));

   //8.结束代驾更新账单
   orderInfoFeignClient.endDrive(updateOrderBillForm);
   return true;
}


扫描二维码,在手机上阅读

推荐阅读: