在微服务架构中,我们将系统拆分成很多个服务单元,各单位的应用间通过服务注册与订阅的方式相互依赖。由于每个单元都在不同的进程中运行,依赖通过远程调用的方式执行,这样就有可能因为网络原因或是依赖服务自身问题出现调用故障或延迟,而这些问题会直接导致调用方的对外服务也出现延迟,若此时调用方的请求不断增加,最后就会因等待出现故障的依赖方响应形成任务积压,最终导致自身服务的不可用。这样的架构相对于传统架构更加的不稳定,为了解决这样的问题,就产生了断路器等一系列的服务保护机制。而Spring Cloud Hystrix就是这样的实现了服务降级、服务熔断、线程和信号量隔离、请求缓存、请求合并以及服务监控等一系列服务保护功能的组件。
本篇文章还是在前两篇文章的基础上所作的:
SpringCloud专题之一:Eureka
Spring Cloud专题之二:OpenFeign
欢迎大家查看!!!
先启动需要的服务工程:
- EUREKA-SERVER:注册中心,端口为9001
- HELLO-SERVER:提供服务的客户端,端口为9002和9003
- EUREKA-CUSTOMER:消费服务的消费者端,端口为9004
在未加入Hystrix(断路器)之前,如果我关闭掉一个客户端,那么使用消费者访问的时候可以获得如下的输出:
因为feign的默认连接时间是1s,所以超过1s后就会报连接不上的错。
Hystrix代码
由于 openfeign 包 默认集成了 hystrix,所以只需要开启开关即可
#开启Hystrix降级处理feign.hystrix.enabled=true
引入jar包
<!--hystrix--><dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-hystrix</artifactId></dependency><dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-openfeign</artifactId></dependency><!--hystrix-javanica--><dependency> <groupId>com.netflix.hystrix</groupId> <artifactId>hystrix-javanica</artifactId></dependency>
1.服务降级
降级是指当请求超时,资源不足等情况发生时,进行的服务降级处理,不调用真实服务逻辑,而是使用快速失败的方式直接返回一个托底数据,保证服务链路的完整,避免服务雪崩。
降级的实现是指在调用远程服务的方法上增加@HystrixCommand的注解,通过指定fallbackMethod的值设置失败的回调方法,也可以使用@FeignClient的方式指定fallback类。
1.在Customer1Feign类的@FeignClient注解上添加fallback的类
/** * @className: Customer1Feign * @description: 测试多个feign使用相同的name的问题 * @author: charon * @create: 2021-06-06 09:42 */@FeignClient(value = "HELLO-SERVER",fallback = EurekaClientFallBack.class)public interface Customer1Feign { /** * 要求: * 必须要指定RequestParam属性的value值,同时RequestMethod的method也需要指定 * 方法上添加SpringMVC的注解 * @return */ @RequestMapping(value = "/sayHello1",method = RequestMethod.GET) String sayHello1(@RequestParam("name") String name);}
3.编写fallback使用的类:EurekaClientFallBack
/** * @className: EurekaClientFallBack * @description: 客户端的降级实现类 * @author: charon * @create: 2021-06-20 22:06 */@Componentpublic class EurekaClientFallBack implements Customer1Feign { /** * 日志记录类 */ private final Logger logger = LoggerFactory.getLogger(getClass()); /** * sayHello1接口的服务降级类 * @param name 参数 * @return */ @Override public String sayHello1(String name) { logger.error("您访问了EurekaClientFallBack#sayHello1(),传入的参数为:{}" , name); return "您访问了EurekaClientFallBack#sayHello1(),传入的参数为:" + name; } }
然后消费者端再次调用接口,会发现页面展示为如下图,而不是之前的Whitelabel Error Page了。
到这里,我们就实现了一个最简单的断路器功能了。
4.模拟实现提供服务的客户端代码执行超时的情况:
@RestControllerpublic class Hello1Controller { /** * 日志记录类 */ private final Logger logger = LoggerFactory.getLogger(getClass()); @Value("${server.port}") private String host; @Value("${spring.application.name}") private String instanceName; @RequestMapping("/sayHello1") public String sayHello1(@RequestParam("name") String name){ try { int sleepTime = new Random().nextInt(3000); logger.error("让线程阻塞 {} 毫秒",sleepTime); Thread.sleep(sleepTime); } catch (InterruptedException e) { e.printStackTrace(); } logger.info("你好,服务名:{},端口为:{},接收到的参数为:{}",instanceName,host,name); return "你好,服务名:"+instanceName+",端口为:"+host+",接收到的参数为:"+name; }}
在HELLO-SERVER的工程中让线程随机sleep几秒,然后消费者端调用,可以发现,当调用HELLO-SERVER超过1000毫秒时就会因为服务超时从而触发熔断请求,并调用回调逻辑返回结果。
除了上面的方式外,还可以使用@HystrixCommand的注解来配置fallbackMethod方法(更灵活)。
@HystrixCommand(fallbackMethod = "sayHello1Fallback")@Overridepublic String invokeSayHello1(String name) { long startTime = System.currentTimeMillis(); String result = feign1.sayHello1(name); logger.error("您访问了CustomerServiceImpl#sayHello1(),执行时间为:{} 毫秒",System.currentTimeMillis() - startTime ); return result;}public String sayHello1Fallback(String name){ logger.error("出错了,您访问了CustomerServiceImpl#sayHello1Fallback()" ); return "出错了,您访问了CustomerServiceImpl#sayHello1Fallback()";}
2.服务熔断
熔断就是当一定时间内,异常请求的比例(请求超时,网络故障,服务异常等)达到阈值,启动熔断器,熔断器一旦启动,则会停止调用具体的服务逻辑,通过fallback快速返回托底数据,保证服务链的完整。熔断又自动恢复的机制,如:当熔断器启动后,每隔5秒尝试将新的请求发送给服务端,如果服务可正常执行并返回结果,则关闭熔断器,服务恢复。如果仍然调用失败,则继续返回托底数据,熔断器处于开启状态。
服务降级是指调用服务出错了返回托底数据,而熔断则是出错后如果开启了熔断器将在一定的时间内不调用服务端。
熔断的实现是指在调用远程服务的方法上增加@HystrixCommand的注解。通过@HystrixProperty的name属性指定需要配置的属性(可以是字符串,也可以使用HystrixPropertiesManager常量类的常量),通过value设置属性的值,也可以通过setter方式设置属性值。
/** * 注解的配置意思为:当时间在20s内的10个请求中,当出现了30%(3个)的失败,则触发熔断 * @param name 参数 * @return 结果 */@HystrixCommand(fallbackMethod = "sayHello1Fallback",commandProperties = { @HystrixProperty(name="circuitBreaker.requestVolumeThreshold",value = "10"), @HystrixProperty(name= HystrixPropertiesManager.EXECUTION_ISOLATION_THREAD_TIMEOUT_IN_MILLISECONDS,value = "20000"), @HystrixProperty(name= HystrixPropertiesManager.CIRCUIT_BREAKER_ERROR_THRESHOLD_PERCENTAGE,value = "30"),})@Overridepublic String invokeSayHello1(String name) { long startTime = System.currentTimeMillis(); String result = feign1.sayHello1(name); logger.error("您访问了CustomerServiceImpl#sayHello1(),执行时间为:{} 毫秒",System.currentTimeMillis() - startTime ); return result;}
Hystrix 常用属性配置
3.请求缓存
请求缓存是保证在一次请求中多次调用同一个服务提供者接口,在cacheKey不变的情况下,后续调用结果都是第一次的缓存结果,而不是多次请求服务提供者,从而降低服务提供者处理重复请求的压力。
设置请求缓存:
@Override@CacheResult//用来标记请求命令返回的结果应该被缓存@HystrixCommandpublic User getUserById( String id) { return feign1.getUserById(id);}
定义缓存key:
当使用注解来定义请求缓存时,若要为请求命令指定具体的缓存key的生成规则,可以使用@CacheResult和@CacheRemove注解的cacheKeyMethod方法指定具体的生成函数,也可以通过@CacheKey注解在方法参数中指定用于组装缓存key的元素。
@Override@CacheResult(cacheKeyMethod = "getUserByIdCacheKey")//用来标记请求命令返回的结果应该被缓存@HystrixCommandpublic User getUserById( String id) { return feign1.getUserById(id);}public String getUserByIdCacheKey(String id){ return id;}/** * 第二种使用@CacheKey的方式,@CacheKey用来在请求命令的参数上标记,使其作为缓存的key值,如果没有标记则会使用所有参数,如果 * 同事还使用了@CacheResult和@CacheRemove注解的cacheKeyMethod方法指定缓存Key的生成,那么该注解将不会起作用 * 有些教程中说使用这个可以指定参数,比如:@CacheKey("id"),在我这边会报错: * java.beans.IntrospectionException: Method not found: isId */@Override@CacheResult@HystrixCommandpublic User getUserById(@CacheKey String id) { return feign1.getUserById(id);}
清理缓存:
@CacheRemove注解的commandKey属性是必须要指定的,它用来指明需要使用请求缓存的请求命令,因为只有通过该属性的配置,Hystrix才能找到正确的请求命令缓存位置。
@Override@CacheRemove(commandKey = "getUserByIdCacheKey")//用来让请求命令的缓存失效,失效的缓存根据定义的key决定@HystrixCommandpublic User removeUserById( String id) { return feign1.getUserById(id);}
controller调用,注意定义是要在同一个请求中,如果是不同的请求,则没有效果。
@RequestMapping("/getUserById")public List<User> getUserById(String id){ User user1 = serivce.getUserById(id); User user2 = serivce.getUserById(id); List<User> lsrUser = new ArrayList<>(2); lsrUser.add(user1); lsrUser.add(user2); return lsrUser;}
4.请求合并
请求合并是指在一段时间内将所有请求合并为一个请求,以减少通信的消耗和线程数的占用,从而大大降低服务端的负载。
请求合并的缺点:
在设置请求合并之后,本来一个请求可能5ms就搞定了,但是现在必须再等10ms等待其他的请求一起,这样一个请求的耗时就从5ms增加到了15ms了。不过如果我们要发起的命令本身就是一个高延迟的命令,那么这个时候就可以使用请求合并了,因为这个时候,等待的时间消耗就显得微不足道了。所以如果需要设置请求合并,千万不能将等待时间设置的过大。
服务提供者的控制类:
@RestControllerpublic class UserBatchController { /** * 请求合并的方法 * @param ids * @return */ @RequestMapping(value = "/getUserList", method = RequestMethod.GET) public List<User> getUserList(String ids) { System.out.println("ids===:" + ids); String[] split = ids.split(","); return Arrays.asList(split) .stream() .map(id -> new User(Integer.valueOf(id),"charon"+id,Integer.valueOf(id)*5)) .collect(Collectors.toList()); } /** * 请求单个user的方法 * @param id * @return */ @RequestMapping(value = "/getUser/{id}", method = RequestMethod.GET) public User getUser(@PathVariable("id") String id) { User user = new User(1, "Charon",15); return user; }}
消费者feign的调用接口:
@RequestMapping(value = "/getUser",method = RequestMethod.GET)Future<User> getUser(@RequestParam("id")Integer id);@RequestMapping(value = "/getUserList",method = RequestMethod.GET)List<User> getUserList (@RequestParam("ids") String ids);
消费者的service及实现类:
Future<User> getUser(Integer i);/** * 表示在10s内的getUser请求将会合并到getUserList请求上,合并发出,最大的合并请求数为200 * @param userId 用户id * @return */@HystrixCollapser(batchMethod = "getUserList",scope = com.netflix.hystrix.HystrixCollapser.Scope.GLOBAL, collapserProperties = { @HystrixProperty(name="timerDelayInMilliseconds",value="10"), @HystrixProperty(name="maxRequestsInBatch",value="200") } )@Overridepublic Future<User> getUser(Integer userId){ Future<User> user = feign1.getUser(userId); return user;}@HystrixCommandpublic List<User> getUserList(List<Integer> userIdList) { List<User> lstUser = feign1.getUserList(StringUtils.join(userIdList,",")); return lstUser;}
消费者控制类:
/** * 获取单个用户 * @return User */@RequestMapping("/getUser")public User getUser() throws ExecutionException, InterruptedException { Future<User> user = serivce.getUser(1); System.out.println("返回的结果:"+user); return user.get();}/** * 获取用户list * @return list */@RequestMapping("/getUserList")public List<User> getUserList() throws ExecutionException, InterruptedException { Future<User> user1 = serivce.getUser(1); Future<User> user2= serivce.getUser(2); Future<User> user3= serivce.getUser(3); List<User> users = new ArrayList<>(); users.add(user1.get()); users.add(user2.get()); users.add(user3.get()); System.out.println("返回的结果:" + users); return users;}
标注了HystrixCollapser这个注解的,这个方法永远不会执行,当有请求来的时候,直接请求batchMethod所指定的方法。batchMethod的方法在指定延迟时间内会将所有的请求合并一起执行
5.线程池隔离
Hystrix使用舱壁模式实现线程池的隔离,它会为每一个依赖服务创建一个独立的线程池,这样就算某个依赖服务出现延迟过高的情况,也只是对该依赖服务的调用产生影响,而不会拖慢其他的依赖服务。
使用线程池隔离的优点:
- 应用自身得到完全保护,不会受不可控的依赖服务影响,即便给依赖服务分配的线程池被填满,也不会影响到其他的服务
- 可以有效降低接入新服务的风险,如果新服务接入后运行不稳定或存在问题,完全不会影响原来的请求
- 每个服务都是独立的线程池,在一定程度上解决了高并发的问题
- 由于线程池有个数限制,所以也解决了限流的问题
使用线程池隔离的缺点:
- 增加了CPU的开销,因为不仅有tomcat的线程池,还需要有Hystrix的线程池
- 每个操作都是独立的线程,就有排队、调度和上下文切换等问题
不配置线程隔离:
@RequestMapping("/useThread")public String useThread(){ return serivce.useThread1() + " " + serivce.useThread2();}@Overridepublic String useThread1() { String threadName = Thread.currentThread().getName(); logger.error("使用的线程名称为:{}",threadName); return "使用的线程名称为:" + threadName;}@Overridepublic String useThread2() { String threadName = Thread.currentThread().getName(); logger.error("使用的线程名称为:{}",threadName); return "使用的线程名称为:" + threadName;}
如果不配置线程隔离,则使用的是同一个线程
如果我们给useThread1方法设置线程隔离:
@HystrixCommand(groupKey = "useThread1",//分组,设置服务名,一个group使用一个线程 commandKey = "useThread1",//命令名称,默认值为当前执行的方法名称 threadPoolKey = "useThread1",//是配置线程池名称,配置全局唯一标识接口线程池的名称,相同名称的线程池是同一个。默认值是分组名groupKey threadPoolProperties = { @HystrixProperty(name = "coreSize", value = "30"),//线程池大小 @HystrixProperty(name = "maxQueueSize", value = "100"),//最大队列长度 @HystrixProperty(name = "keepAliveTimeMinutes", value = "2"),//线程存活时间 @HystrixProperty(name = "queueSizeRejectionThreshold", value = "15")//拒绝请求 })
使用了线程池隔离之后,可以看到两个请求使用的不通的线程池。
6.信号量隔离
信号量隔离是指在规定时间内只允许指定数量的信号量进行服务访问,其他得不到信号量的线程进入fallback,访问结束后,归还信号量。说白了就是做了一个限流。
@RequestMapping("/semaphore")public String semaphore(){ for (int i = 0; i < 15; i++) { new Thread(new Runnable() { @Override public void run() { serivce.semaphore(); } }).start(); } return "OK";}@HystrixCommand(fallbackMethod = "semaphoreFallback",commandProperties = { @HystrixProperty(name="execution.isolation.strategy",value="SEMAPHORE"), //使用信号量隔离,默认为THREAD @HystrixProperty(name="execution.isolation.semaphore.maxConcurrentRequests",value="10"), // 信号量最大并发度})@Overridepublic void semaphore() { try { Thread.sleep(900); } catch (InterruptedException e) { e.printStackTrace(); } logger.error("正常执行方法");}public void semaphoreFallback(){ logger.error("执行了fallback方法");}
如下图所示,有10个线程拿到了信号量,执行了正常的方法,有5个线程没有拿到信号量,直接调用fallback方法。
原理分析
上一篇文章说过openFeign主要是通过jdk的动态代理构建对象,所以Hystrix集成到feign当中也是使用的jdk动态代理的invocationHandler上,那么来看看Hystrix实现的jdk的动态代理类--HystrixInvocationHandler吧!
invoke方法:
@Overridepublic Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable { // 如果调用的方法来自 java.lang.Object 则提前退出代码与 ReflectiveFeign.FeignInvocationHandler 相同 // ... HystrixCommand<Object> hystrixCommand = new HystrixCommand<Object>(setterMethodMap.get(method)) { @Override protected Object run() throws Exception { try { // 获取并调用MethodHandler,MethodHandler封装了Http请求,ribbon也在这里被集成 return HystrixInvocationHandler.this.dispatch.get(method).invoke(args); } catch (Exception e) { throw e; } catch (Throwable t) { throw (Error) t; } } // fallback的降级方法 @Override protected Object getFallback() { if (fallbackFactory == null) { return super.getFallback(); } try { Object fallback = fallbackFactory.create(getExecutionException()); Object result = fallbackMethodMap.get(method).invoke(fallback, args); if (isReturnsHystrixCommand(method)) { return ((HystrixCommand) result).execute(); } else if (isReturnsObservable(method)) { // Create a cold Observable return ((Observable) result).toBlocking().first(); } else if (isReturnsSingle(method)) { // Create a cold Observable as a Single return ((Single) result).toObservable().toBlocking().first(); } else if (isReturnsCompletable(method)) { ((Completable) result).await(); return null; } else if (isReturnsCompletableFuture(method)) { return ((Future) result).get(); } else { return result; } } catch (IllegalAccessException e) { // shouldn't happen as method is public due to being an interface throw new AssertionError(e); } catch (InvocationTargetException | ExecutionException e) { // Exceptions on fallback are tossed by Hystrix throw new AssertionError(e.getCause()); } catch (InterruptedException e) { // Exceptions on fallback are tossed by Hystrix Thread.currentThread().interrupt(); throw new AssertionError(e.getCause()); } } }; if (Util.isDefault(method)) { return hystrixCommand.execute(); } else if (isReturnsHystrixCommand(method)) { return hystrixCommand; } else if (isReturnsObservable(method)) { // Create a cold Observable return hystrixCommand.toObservable(); } else if (isReturnsSingle(method)) { // Create a cold Observable as a Single return hystrixCommand.toObservable().toSingle(); } else if (isReturnsCompletable(method)) { return hystrixCommand.toObservable().toCompletable(); } else if (isReturnsCompletableFuture(method)) { return new ObservableCompletableFuture<>(hystrixCommand); } return hystrixCommand.execute();}
首先创建一个HystrixCommand,用来表示对依赖服务的操作请求,同时传递所有需要的参数,从命名中可以知道才用了"命令模式"来实现对服务调用操作的封装。
命令模式:是指将来自客户端的请求封装成一个对象,从而让调用者使用不同的请求对服务提供者进行参数化。
上面的两种命令模式一共有4种命令的执行方式,Hystrix在执行的时候会根据创建的Command对象以及具体的情况来选择一个执行。
- execute() 方法 :同步执行,从依赖的服务返回一个单一的结果对象,或是在发生错误时抛出异常
- queue() 方法 :异步执行,直接返回一个Future对象,其中包含了服务执行结束时要返回的单一结果对象
- observe()方法:返回Observable对象,它代表了操作的多个结果,是一个Hot observable
- toObservable()方法:同样返回一个Observable对象,也表示了操作的多个结果,但它返回的是一个Cold Observable
接下来首先来看看HystrixCommand#execute()方法:
public R execute() { try { // queue()返回一个Future,get()同步等待执行结束,然后获取异步的结果。 return queue().get(); } catch (Exception e) { throw Exceptions.sneakyThrow(decomposeException(e)); }}
跟进queue()方法:
public Future<R> queue() { // 通过toObservable()获得一个Cold Observable, // 并且通过toBlocking()将该Observable转换成BlockingObservable,可以把数据以阻塞的方式发射出来 // toFuture()则是把BlockingObservable转换成一个Future final Future<R> delegate = toObservable().toBlocking().toFuture(); final Future<R> f = new Future<R>() { // future实现,调用delegate的对应实现 } return f; }
在queue()中调用了核心方法--toObservable()方法,
public Observable<R> toObservable() { final AbstractCommand<R> _cmd = this; // ... final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() { @Override public Observable<R> call() { if (commandState.get().equals(CommandState.UNSUBSCRIBED)) { return Observable.never(); } return applyHystrixSemantics(_cmd); } }; final Func1<R, R> wrapWithAllOnNextHooks = new Func1<R, R>() { @Override public R call(R r) { R afterFirstApplication = r; try { afterFirstApplication = executionHook.onComplete(_cmd, r); } catch (Throwable hookEx) { logger.warn("Error calling HystrixCommandExecutionHook.onComplete", hookEx); } try { return executionHook.onEmit(_cmd, afterFirstApplication); } catch (Throwable hookEx) { logger.warn("Error calling HystrixCommandExecutionHook.onEmit", hookEx); return afterFirstApplication; } } }; final Action0 fireOnCompletedHook = new Action0() { @Override public void call() { try { executionHook.onSuccess(_cmd); } catch (Throwable hookEx) { logger.warn("Error calling HystrixCommandExecutionHook.onSuccess", hookEx); } } }; return Observable.defer(new Func0<Observable<R>>() { @Override public Observable<R> call() { /* this is a stateful object so can only be used once */ if (!commandState.compareAndSet(CommandState.NOT_STARTED, CommandState.OBSERVABLE_CHAIN_CREATED)) { IllegalStateException ex = new IllegalStateException("This instance can only be executed once. Please instantiate a new instance."); //TODO make a new error type for this throw new HystrixRuntimeException(FailureType.BAD_REQUEST_EXCEPTION, _cmd.getClass(), getLogMessagePrefix() + " command executed multiple times - this is not permitted.", ex, null); } commandStartTimestamp = System.currentTimeMillis(); if (properties.requestLogEnabled().get()) { // log this command execution regardless of what happened if (currentRequestLog != null) { currentRequestLog.addExecutedCommand(_cmd); } } final boolean requestCacheEnabled = isRequestCachingEnabled(); final String cacheKey = getCacheKey(); // 先从缓存中获取如果有的话直接返回 if (requestCacheEnabled) { HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey); if (fromCache != null) { isResponseFromCache = true; return handleRequestCacheHitAndEmitValues(fromCache, _cmd); } } Observable<R> hystrixObservable = Observable.defer(applyHystrixSemantics) .map(wrapWithAllOnNextHooks); Observable<R> afterCache; // put in cache if (requestCacheEnabled && cacheKey != null) { // 里面订阅了,所以开始执行hystrixObservable HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd); HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache); if (fromCache != null) { // another thread beat us so we'll use the cached value instead toCache.unsubscribe(); isResponseFromCache = true; return handleRequestCacheHitAndEmitValues(fromCache, _cmd); } else { // we just created an ObservableCommand so we cast and return it afterCache = toCache.toObservable(); } } else { afterCache = hystrixObservable; } return afterCache .doOnTerminate(terminateCommandCleanup) // perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line)) .doOnUnsubscribe(unsubscribeCommandCleanup) // perform cleanup once .doOnCompleted(fireOnCompletedHook); } });}
这个方法非常长,首先看看applyHystrixSemantics()方法:
private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) { executionHook.onStart(_cmd); // 判断是否开启断路器 if (circuitBreaker.allowRequest()) { // 断路器是关闭的,则检查识都有可用的资源来执行命令 // 获取信号量实例 final TryableSemaphore executionSemaphore = getExecutionSemaphore(); final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false); final Action0 singleSemaphoreRelease = new Action0() { @Override public void call() { if (semaphoreHasBeenReleased.compareAndSet(false, true)) { // 释放信号量 executionSemaphore.release(); } } }; final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() { @Override public void call(Throwable t) { eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey); } }; // 尝试获取信号量 if (executionSemaphore.tryAcquire()) { try { // 执行业务 executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis()); return executeCommandAndObserve(_cmd) .doOnError(markExceptionThrown) .doOnTerminate(singleSemaphoreRelease) .doOnUnsubscribe(singleSemaphoreRelease); } catch (RuntimeException e) { return Observable.error(e); } } else { // 信号量获取失败,走fallback return handleSemaphoreRejectionViaFallback(); } } else { // 断路器是打开的,快速熔断,走fallback return handleShortCircuitViaFallback(); }}
applyHystrixSemantics()通过熔断器的allowRequest()方法判断是否需要快速失败走fallback,如果允许执行那么又会经过一层信号量的控制,都通过才会走execute。
所以,核心逻辑就落到了HystrixCircuitBreaker#allowRequest()方法上:
public boolean allowRequest() { // 强制开启熔断 if (properties.circuitBreakerForceOpen().get()) { return false; } // 强制关闭熔断 if (properties.circuitBreakerForceClosed().get()) { isOpen(); return true; } // 判断和计算当前断路器是否打开 或者 允许单个测试 ,通过这两个方法的配合,实现了断路器的打开和关闭状态的切换 return !isOpen() || allowSingleTest();}
Hystrix允许强制开启或者关闭熔断,如果不想有请求执行就开启,如果觉得可以忽略所有错误就关闭。在没有强制开关的情况下,主要就是判断当前熔断是否开启。另外,在熔断器开启的情况下,会在一定时间后允许发出一个测试的请求,来判断是否开启熔断器。
首先来看看isOpen()方法:
public boolean isOpen() { if (......原文转载:http://www.shaoqun.com/a/833922.html
跨境电商:https://www.ikjzd.com/
amazon go:https://www.ikjzd.com/w/67
vat:https://www.ikjzd.com/w/109
mail.ru:https://www.ikjzd.com/w/2232
在微服务架构中,我们将系统拆分成很多个服务单元,各单位的应用间通过服务注册与订阅的方式相互依赖。由于每个单元都在不同的进程中运行,依赖通过远程调用的方式执行,这样就有可能因为网络原因或是依赖服务自身问题出现调用故障或延迟,而这些问题会直接导致调用方的对外服务也出现延迟,若此时调用方的请求不断增加,最后就会因等待出现故障的依赖方响应形成任务积压,最终导致自身服务的不可用。这样的架构相对于传统架构更加
百思买:https://www.ikjzd.com/w/394
赶不上潮流?曾经的电商巨头eBay已经被淘汰?:https://www.ikjzd.com/articles/22000
应对中邮涨价,Wish提供运费补贴方案详解:https://www.ikjzd.com/articles/22003
欧盟寻求和美国贸易谈判,特朗普喊话这样说:https://www.ikjzd.com/articles/22007
跨境电商新手卖家须掌握的产品数据分析工具和方法:https://www.ikjzd.com/articles/22009
三个男人和我玩4P 两个㖭上面一个㖭下面:http://lady.shaoqun.com/a/247878.html
女班长在我身上娇喘 班长你下面夹得我好爽:http://lady.shaoqun.com/a/248256.html
无耻公公饥渴太久要我给他一次:http://lady.shaoqun.com/m/a/41187.html
Prime Day后大复盘:小卖遗憾陪跑,大卖爆单未半而中道崩殂:https://www.ikjzd.com/articles/146115
亚马逊要求有效追踪率高于95%,卖家该怎么发货才有效?:https://www.ikjzd.com/articles/146102
北欧运费突破20000美元大关,跨太平洋至美西报价高达25000美元:https://www.ikjzd.com/articles/146122
有多少人做噩梦把学生当猎物,一年换一次女生,教了33年?:http://lady.shaoqun.com/a/390527.html
没有评论:
发表评论