Resilience4J的BulkHead舱壁隔离

时间:2024-6-25    作者:老大夫    分类: SpringCloud


Resilience4J提供了两种隔离方式

作用:隔离风险,防止连坐出错

  • SemaphoreBulkhead 信号量舱壁
  • FixedThreadPoolBulkhead

使用 SemaphoreBulkhead 信号量舱壁

  1. 修改该provider的controller
//=========Resilience4j bulkhead 的例子
@GetMapping(value = "/pay/bulkhead/{id}")
public String myBulkhead(@PathVariable("id") Integer id)
{
    if(id == -4) throw new RuntimeException("----bulkhead id 不能-4");

    if(id == 9999)
    {
        try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); }
    }

    return "Hello, bulkhead! inputId:  "+id+" \t " + IdUtil.simpleUUID();
}
  1. 修改该OpenFeign的API接口
@FeignClient("cloud-payment-service")
public interface PayFeignApi {

    @PostMapping("/pay/add")
    public ResultData addPay(@RequestBody PayDTO payDTO);

    @GetMapping("/pay/get/{id}")
    public ResultData getPayInfo(@PathVariable("id") Integer id);

    @GetMapping("pay/get/info")
    public String myLb();

    @GetMapping(value = "/pay/circuit/{id}")
    public String myCircuit(@PathVariable("id") Integer id);

    @GetMapping(value = "/pay/bulkhead/{id}")
    public String myBulkhead(@PathVariable("id") Integer id);
}
  1. consumer-openFeign引入pom依赖
<!--resilience4j-bulkhead-->
<dependency>
    <groupId>io.github.resilience4j</groupId>
    <artifactId>resilience4j-bulkhead</artifactId>
</dependency>
  1. consumer-openfeiign-yml配置
server:
  port: 80

spring:
  application:
    name: cloud-consumer-openfeign-order
  ####Spring Cloud Consul for Service Discovery
  cloud:
    consul:
      host: localhost
      port: 8500
      discovery:
        prefer-ip-address: true #优先使用服务ip进行注册
        service-name: ${spring.application.name}
    openfeign:
      client:
        config:
          default:
            #cloud-payment-service:
            #连接超时时间,为避免演示出错,讲解完本次内容后设置为20秒
            connectTimeout: 20000
            #读取超时时间,为避免演示出错,讲解完本次内容后设置为20秒
            readTimeout: 20000
            #开启httpclient5
      httpclient:
        hc5:
          enabled: true
          #开启压缩特性
      compression:
        request:
          enabled: true
          min-request-size: 2048
          mime-types: text/xml,application/xml,application/json
        response:
          enabled: true
      #开启circuitbreaker和分组激活
      circuitbreaker:
        enabled: true
        group:
          enabled: true #没开分组永远不用分组的配置。精确优先、分组次之(开了分组)、默认最后

# feign日志以什么级别监控哪个接口
logging:
  level:
    com:
      atguigu:
        cloud:
          apis:
            PayFeignApi: debug

# Resilience4j CircuitBreaker 按照时间:TIME_BASED 的例子
#resilience4j:
#  timelimiter:
#    configs:
#      default:
#        timeout-duration: 10s #神坑的位置,timelimiter 默认限制远程1s,超于1s就超时异常,配置了降级,就走降级逻辑
#  circuitbreaker:
#    configs:
#      default:
#        failureRateThreshold: 50 #设置50%的调用失败时打开断路器,超过失败请求百分⽐CircuitBreaker变为OPEN状态。
#        slowCallDurationThreshold: 2s #慢调用时间阈值,高于这个阈值的视为慢调用并增加慢调用比例。
#        slowCallRateThreshold: 30 #慢调用百分比峰值,断路器把调用时间⼤于slowCallDurationThreshold,视为慢调用,当慢调用比例高于阈值,断路器打开,并开启服务降级
#        slidingWindowType: TIME_BASED # 滑动窗口的类型
#        slidingWindowSize: 2 #滑动窗口的大小配置,配置TIME_BASED表示2秒
#        minimumNumberOfCalls: 2 #断路器计算失败率或慢调用率之前所需的最小样本(每个滑动窗口周期)。
#        permittedNumberOfCallsInHalfOpenState: 2 #半开状态允许的最大请求数,默认值为10。
#        waitDurationInOpenState: 5s #从OPEN到HALF_OPEN状态需要等待的时间
#        recordExceptions:
#          - java.lang.Exception
#    instances:
#      cloud-payment-service:
#        baseConfig: default

####resilience4j bulkhead 的例子
resilience4j:
  bulkhead:
    configs:
      default:
        maxConcurrentCalls: 2 # 隔离允许并发线程执行的最大数量
        maxWaitDuration: 1s # 当达到并发调用数量时,新的线程的阻塞时间,我只愿意等待1秒,过时不候进舱壁兜底fallback
    instances:
      cloud-payment-service:
        baseConfig: default
  timelimiter:
    configs:
      default:
        timeout-duration: 20s
  1. openfeign的controller修改
/**
 *(船的)舱壁,隔离
 * @param id
 * @return
 */
@GetMapping(value = "/feign/pay/bulkhead/{id}")
@Bulkhead(name = "cloud-payment-service",fallbackMethod = "myBulkheadFallback",type = Bulkhead.Type.SEMAPHORE)
public String myBulkhead(@PathVariable("id") Integer id)
{
    return payFeignApi.myBulkhead(id);
}
public String myBulkheadFallback(Throwable t)
{
    return "myBulkheadFallback,隔板超出最大数量限制,系统繁忙,请稍后再试-----/(ㄒoㄒ)/~~";
}

使用FixedThreadPoolBulkhead固定线程池舱壁

  1. 引入POM
<!--resilience4j-bulkhead-->
<dependency>
    <groupId>io.github.resilience4j</groupId>
    <artifactId>resilience4j-bulkhead</artifactId>
</dependency>
  1. 修改yml
####resilience4j bulkhead -THREADPOOL的例子
resilience4j:
  timelimiter:
    configs:
      default:
        timeout-duration: 10s #timelimiter默认限制远程1s,超过报错不好演示效果所以加上10秒
  thread-pool-bulkhead:
    configs:
      default:
        core-thread-pool-size: 1
        max-thread-pool-size: 1
        queue-capacity: 1
    instances:
      cloud-payment-service:
        baseConfig: default
# spring.cloud.openfeign.circuitbreaker.group.enabled 请设置为false 新启线程和原来主线程脱离
  1. 修改OpenFeign的controller
@RestController
public class OrderCircuitController
{
    @Resource
    private PayFeignApi payFeignApi;

    @GetMapping(value = "/feign/pay/circuit/{id}")
    @CircuitBreaker(name = "cloud-payment-service", fallbackMethod = "myCircuitFallback")
    public String myCircuitBreaker(@PathVariable("id") Integer id)
    {
        return payFeignApi.myCircuit(id);
    }
    //myCircuitFallback就是服务降级后的兜底处理方法
    public String myCircuitFallback(Integer id,Throwable t) {
        // 这里是容错处理逻辑,返回备用结果
        return "myCircuitFallback,系统繁忙,请稍后再试-----/(ㄒoㄒ)/~~";
    }

//    /**
//     *(船的)舱壁,隔离
//     * @param id
//     * @return
//     */
//    @GetMapping(value = "/feign/pay/bulkhead/{id}")
//    @Bulkhead(name = "cloud-payment-service",fallbackMethod = "myBulkheadFallback",type = Bulkhead.Type.SEMAPHORE)
//    public String myBulkhead(@PathVariable("id") Integer id)
//    {
//        return payFeignApi.myBulkhead(id);
//    }
//    public String myBulkheadFallback(Throwable t)
//    {
//        return "myBulkheadFallback,隔板超出最大数量限制,系统繁忙,请稍后再试-----/(ㄒoㄒ)/~~";
//    }
    /**
     *(船的)舱壁,隔离 threadPool
     * @param id
     * @return
     */
    @GetMapping(value = "/feign/pay/bulkhead/{id}")
    @Bulkhead(name = "cloud-payment-service",fallbackMethod = "myBulkheadPoolFallback",type = Bulkhead.Type.THREADPOOL)
    public CompletableFuture<String> myBulkheadTHREADPOOL(@PathVariable("id") Integer id)
    {
        System.out.println(Thread.currentThread().getName()+"\t"+"-----开始进入");
        try {
            TimeUnit.SECONDS.sleep(3);
        }catch (Exception e){
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName()+"\t"+"-----准备离开");
        return CompletableFuture.supplyAsync(()->payFeignApi.myBulkhead(id)+"\t"+"Bulkhead.Type.THREADPOOL");
    }
    public CompletableFuture<String> myBulkheadPoolFallback(Throwable t)
    {
        return CompletableFuture.supplyAsync(()->"Bulkhead.Type.THREADPOOL");
    }
}


扫描二维码,在手机上阅读

推荐阅读: