查看原文
其他

基于Sentinel的亿级网关流量复制方案

李明月 搜狐技术产品 2023-02-23


  

本文字数:10725

预计阅读时间:27分钟



1. 前言

API 网关是随着微服务(Microservice)这个概念一起兴起的一种架构模式,它用于解决微服务过于分散,没有一个统一的出入口进行流量管理的问题。

当使用微服务构建整个 API 服务时,一般会有许许多多职责不同的应用在运行着,这些应用会需要一些通用的功能,例如鉴权、流控、监控、日志统计等。

在传统的单体应用中,这些功能一般都是内嵌在应用中,作为一个组件运行。但是在微服务模式下,不同种类且独立运行的应用可能会有数十甚至数百种,继续使用这种方式会造成非常高的管理和发布成本。所以就需要在这些应用上抽象出一个统一的流量入口,完成这些功能的实现。

Setinel是分布式系统的流量防卫兵,它以流量为切入点,从流量控制、流量路由、熔断降级、系统自适应过载保护、热点流量防护等多个维度保护服务的稳定性。

因此,在API网关中结合Sentinel的流控能力,再根据业务需求加以定制,可以有效地实现对大规模流量(本例中API网关的日均流量有1亿+)的实时控制,从而满足业务实践的需要。

流量复制是一种控制流量的具体形式,是指网关将发往某一个服务的流量复制一份,转发到另外一个服务上。流量复制可以使用线上真实流量进行服务功能的验证和服务性能的压测。本文以下将重点介绍在Sentinel现有流控架构上进行定制,实现流量复制功能的具体实践。

2. Sentinel的基本概念

资源:可以是任何东西,服务,服务里的方法,甚至是一段代码。使用 Sentinel 来进行资源保护,主要分为几个步骤:

  1. 定义资源
  2. 定义规则
  3. 检验规则是否生效

先把可能需要保护的资源定义好,之后再配置规则。也可以理解为,只要有了资源,我们就可以在任何时候灵活地定义各种流量控制规则。在编码的时候,只需要考虑这个代码是否需要保护,如果需要保护,就将之定义为一个资源。

3. Sentinel的流程架构

在 Sentinel 里面,所有的资源都对应一个资源名称(resourceName),每次资源调用都会创建一个 Entry 对象。Entry 可以通过对主流框架的适配自动创建,也可以通过注解的方式或调用 SphU API 显式创建。Sentinel采用的是责任链模式,对于规则的检查和处理由一系列槽链构成。Entry 创建的时候,同时也会创建一系列功能插槽(slot chain),这些插槽有不同的职责,例如:

  • NodeSelectorSlot  负责收集资源的路径,并将这些资源的调用路径,以树状结构存储起来,用于根据调用路径来限流降级;
  • ClusterBuilderSlot  则用于存储资源的统计信息以及调用者信息,例如该资源的 RT, QPS, thread count 等等,这些信息将用作为多维度限流,降级的依据;
  • StatisticSlot  则用于记录、统计不同纬度的 runtime 指标监控信息;
  • FlowSlot  则用于根据预设的限流规则以及前面 slot 统计的状态,来进行流量控制;
  • AuthoritySlot  则根据配置的黑白名单和调用来源信息,来做黑白名单控制;
  • DegradeSlot  则通过统计信息以及预设的规则,来做熔断降级;
  • SystemSlot  则通过系统的状态,例如 load1 等,来控制总的入口流量;总体的框架如下:

对于网关流控来说,我们对GatewayFlowSlot进行了自定义,结合sentinel本身对流量的定义能力,实现了对特定流量的管控和复制能力。如下图所示:这里展示的是流量集中管控的流程架构,首先从规则管理后台,配置网关流控的相关规则,会在内部转换生成热点参数规则。在外部请求进入的时候,通过SentinelGateway过滤器进行路由或者API分组的识别和请求属性的解析,然后将解析后的参数传入Sentinel槽链,在GatewayFlowSlot槽进行网关规则的检查,进而实现流量的集中控制。

4. 流量复制的具体实现

当请求进入网关后,首先会经过SentinelGatewayFilter的处理,SentinelGatewayFilter会读取API分组定义,进行请求的匹配,代码如下:

@Slf4j
public class SentinelGatewayFilter implements GatewayFilter, GlobalFilter, Ordered {

//......省略部分代码......

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        Route route = exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR);
        Mono<Void> asyncResult = chain.filter(exchange);
        if (route != null) {
            // 进行Route ID匹配
            asyncResult = onResourceModeRouteId(exchange, route, asyncResult);
            // 进行API分组匹配
            asyncResult = onResourceModelCustomApiName(exchange, route, asyncResult);
        }
        return asyncResult;
    }

    //......省略部分代码......

    private Mono<Void> onResourceModelCustomApiName(ServerWebExchange exchange, Route route, Mono<Void> asyncResult) {
        Set<String> matchingApis = pickMatchingApiDefinitions(exchange);
        for (String apiName : matchingApis) {
            // API分组匹配成功,进行请求属性的解析,并且将customizedRuleList::add函数追加到参数末尾,以便传递入Sentinel处理槽链中GatewayFlowSlot槽的处理逻辑中
            Object[] params = paramParser.parseParameterFor(apiName, exchange,
                    r -> r.getResourceMode() == SentinelGatewayConstants.RESOURCE_MODE_CUSTOM_API_NAME);
            Object[] paramsWithConsumer = new Object[params.length + 1];
            System.arraycopy(params, 0, paramsWithConsumer, 0, params.length);
            final List<ParamFlowRule> customizedRuleList = new ArrayList<>();
            paramsWithConsumer[params.length] = (GatewayFlowSlot.GatewayFlowCustomizedRuleContainer)customizedRuleList::add;
            exchange.getAttributes().put(GatewayFlowSlot.CUSTOMIZED_RULE_LIST_KEY, customizedRuleList);
            EntryConfig entryConfig = this.getCustomizedEntryConfig(
                    exchange,
                    route,
                    apiName,
                    paramsWithConsumer,
                    null
            );
            asyncResult = asyncResult.transform(new SentinelReactorTransformer<>(entryConfig));
        }
        return asyncResult;
    }

    Set<String> pickMatchingApiDefinitions(ServerWebExchange exchange) {
        return GatewayApiMatcherManager.getApiMatcherMap().values()
                .stream()
                .filter(m -> m.test(exchange))
                .map(WebExchangeApiMatcher::getApiName)
                .collect(Collectors.toSet());
    }

    //......省略部分代码......

    private EntryConfig getCustomizedEntryConfig(
            ServerWebExchange exchange,
            Route route,
            String resourceName,
            Object[] params,
            ContextConfig contextConfig
    ) 
{
        EntryConfig entryConfig = new EntryConfig(
                resourceName,
                ResourceTypeConstants.COMMON_API_GATEWAY,
                EntryType.IN,
                1,
                params,
                contextConfig
        );
        entryConfig.setSubscribeCallBack(() -> this.customizedEntryConfigSubscribeCallBacks.forEach(it -> it.accept(exchange, route)));
        return entryConfig;
    }

    private final List<BiConsumer<ServerWebExchange, Route>> customizedEntryConfigSubscribeCallBacks =
            Stream.<BiConsumer<ServerWebExchange, Route>>of(
                            this::alteredServiceConfig,
                            this::copyServiceConfig
                    )
                    .collect(Collectors.toList());

    //......省略部分代码......

    private void copyServiceConfig(ServerWebExchange exchange, Route route) {
        List<ParamFlowRule> customizedRuleList = exchange.getAttribute(GatewayFlowSlot.CUSTOMIZED_RULE_LIST_KEY);
        if(customizedRuleList == nullreturn;
        List<ParamFlowRule.CopyPercentServiceId> list = null;
        boolean compare = false;
        for(ParamFlowRule rule : customizedRuleList){
            list = rule.getCopyPercentServiceIds();
            compare = rule.isCopyServiceIdCompare();
        }
        List<ParamFlowRule.CopyPercentServiceId> copyPercentServiceIds =
                Optional.ofNullable(list)
                        .orElseGet(ArrayList::new)
                        .stream()
                        .filter(Objects::nonNull)
                        .collect(Collectors.toList());
        List<String> copyServiceIdList = null;
        if (!copyPercentServiceIds.isEmpty()) {
            copyServiceIdList = copyPercentServiceIds
                    .stream()
                    .filter(copyPercentServiceId ->
                            copyPercentServiceId.getPercent() > ThreadLocalRandom.current().nextInt(100))
                    .map(ParamFlowRule.CopyPercentServiceId::getCopyServiceId)
                    .collect(Collectors.toList());
        }
        if (copyServiceIdList == null) {
            for(ParamFlowRule rule : customizedRuleList){
                if(rule.getCopyServiceIdList() != null){
                    copyServiceIdList = rule.getCopyServiceIdList();
                    if(log.isDebugEnabled()){
                        log.debug("setting {}={}, {}={}", GatewayFlowSlot.COPY_SERVICE_ID_LIST_KEY, rule.getCopyServiceIdList(),
                                GatewayFlowSlot.COPY_SERVICE_ID_COMPARE_KEY, rule.isCopyServiceIdCompare());
                    }
                }
            }        
        }
        if (copyServiceIdList != null && !copyServiceIdList.isEmpty()) {
            // 将要复制请求的服务名列表写入exchange属性中
            exchange.getAttributes().put(GatewayFlowSlot.COPY_SERVICE_ID_LIST_KEY, copyServiceIdList);
            exchange.getAttributes().put(GatewayFlowSlot.COPY_SERVICE_ID_COMPARE_KEY, compare);
        }
    }
}

其中 private Mono<Void> onResourceModelCustomApiName(ServerWebExchange exchange, Route route, Mono<Void> asyncResult)方法中的pickMatchingApiDefinitions用来进行API分组的匹配,如果有命中的请求,则进行entryConfig的构建,entryConfig中包括private void copyServiceConfig(ServerWebExchange exchange, Route route)方法的回调。SentinelReactorTransformer封装了Sentinel槽链处理逻辑,其中的GatewayFlowSlot会对请求做具体的判断处理,代码如下:

@SpiOrder(-4000)  
public class GatewayFlowSlot extends AbstractLinkedProcessorSlot<DefaultNode> {  
  
   //......省略部分代码......
  
    private void checkGatewayParamFlow(ResourceWrapper resourceWrapper, int count, Object... args)  
        throws BlockException 
{  
        if (args == null) {  
            return;  
  }  
  
        List<ParamFlowRule> rules = GatewayRuleManager.getConvertedParamRules(resourceWrapper.getName());  
 if (rules == null || rules.isEmpty()) {  
            return;  
  }  
  
        for (ParamFlowRule rule : rules) {  
            // Initialize the parameter metrics.  
  ParameterMetricStorage.initParamMetricsFor(resourceWrapper, rule);  
 if (!ParamFlowChecker.passCheck(resourceWrapper, rule, count, args)) {  
                if(rule.isCustomized()){  
                    for(Object object : args){  
                        if(object instanceof GatewayFlowSlot.GatewayFlowCustomizedRuleContainer){  
 // 执行customizedRuleList::add函数                           ((GatewayFlowSlot.GatewayFlowCustomizedRuleContainer)object).accept(rule);  
  }  
                    }  
  
                }  
                else{  
                    String triggeredParam = "";  
 if (args.length > rule.getParamIdx()) {  
                        Object value = args[rule.getParamIdx()];  
  triggeredParam = String.valueOf(value);  
  }  
                    throw new ParamFlowException(resourceWrapper.getName(), triggeredParam, rule);  
  }  
            }  
        }  
    }  
  
    //......省略部分代码......
}

位于API网关后方的CopyServiceFilter负责实际请求的发送,代码如下:

@Slf4j  
public class CopyServiceFilter implements GlobalFilter, Ordered {  
  
    //......省略部分代码......
  
    @Override  
  public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {  
        List<String> copyServiceIds = this.getCopyServiceIds(exchange);  
 if (copyServiceIds.isEmpty()) {  
            return chain.filter(exchange);  
  }  
        exchange.getAttributes().put(COPY_SERVICE_ENABLED, true);  
 return chain.filter(exchange).then(Mono.fromRunnable(() -> {  
            for (String copyServiceId : copyServiceIds) {  
                Mono.defer(() ->  
                        execCopyServiceRequest(  
                                exchange,  
  copyServiceId,  
  ResponseVO  
                                        .builder()  
                                        .originServiceId(((Route) Objects.requireNonNull(exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR))).getUri().getHost())  
                                        .originalRequestUri(exchange.getRequest().getURI())  
                                        .originRequestBody(exchange.getAttribute(RequestResponseLoggingFilter.REQUEST_BODY))  
                                        .originResponseBody(exchange.getAttribute(RequestResponseLoggingFilter.RESPONSE_BODY))  
                                        .originResponseStatusCode(  
                                                Optional.of(exchange)  
                                                        .map(ServerWebExchange::getResponse)  
                                                        .map(ServerHttpResponse::getRawStatusCode)  
                                                        .orElse(200)  
                                        )  
                                        .originResponseHeaders(exchange.getResponse().getHeaders().toSingleValueMap())  
                                        .build()  
                        )  
                ).onErrorResume(error -> {  
                    log.warn(error.getMessage(), error);  
 return Mono.empty();  
  }).subscribe(responseVO -> {  
                    if(Boolean.TRUE.equals(exchange.getAttribute(GatewayFlowSlot.COPY_SERVICE_ID_COMPARE_KEY))){  
                        this.publishCopyServiceResponseBody(responseVO);  
  }  
                });  
  }  
        }));  
  }  
  
    private Mono<ResponseVO> execCopyServiceRequest(ServerWebExchange exchange, String copyServiceId, ResponseVO responseVO) {  
        ServerHttpRequest request = exchange.getRequest();  
  URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);  
  ServiceInstance instance = loadBalancer.choose(copyServiceId);  
 if(instance == null) {  
            log.warn("copyServiceId {} choose instance is null", copyServiceId);  
 return Mono.empty();  
  }  
        URI copyRequestUri = loadBalancer.reconstructURI(instance, requestUrl);  
  HttpMethod method = Optional.ofNullable(request.getMethod()).orElse(HttpMethod.GET);  
  HttpHeaders filteredHeaders = filterRequest(headersFiltersProvider.getIfAvailable(), exchange);  
 boolean preserveHost = exchange  
                .getAttributeOrDefault(PRESERVE_HOST_HEADER_ATTRIBUTE, false);  
  WebClient.RequestBodySpec bodySpec = this.webClient.method(method).uri(copyRequestUri)  
                .headers(httpHeaders -> {  
                    httpHeaders.addAll(filteredHeaders);  
 if (!preserveHost) {  
                        httpHeaders.remove(HttpHeaders.HOST);  
  }  
                    httpHeaders.remove(HttpHeaders.ACCEPT_ENCODING);  
  httpHeaders.remove(HttpHeaders.CONTENT_LENGTH);  
  });  
  WebClient.RequestHeadersSpec<?> headersSpec = bodySpec;  
  Object requestBody = exchange.getAttribute(RequestResponseLoggingFilter.REQUEST_BODY);  
 if (requestBody != null) {  
            headersSpec = bodySpec.bodyValue(requestBody);  
  }  
        return headersSpec  
                .exchange()  
                .map(resp -> {  
                    responseVO.setCopyResponseHeaders(resp.headers().asHttpHeaders().toSingleValueMap());  
  responseVO.setCopyResponseStatusCode(resp.rawStatusCode());  
 return resp;  
  })  
                .flatMap(resp -> resp  
                        .bodyToMono(String.class)  
                        .defaultIfEmpty("")  
                        .map(body -> responseVO.toBuilder()  
                                        .copyServiceId(copyServiceId)  
                                        .copyRequestUri(copyRequestUri)  
                                        .copyRequestBody(requestBody)  
                                        .copyResponseBody(body)  
                                        .build()  
                        )  
                )  
                .onErrorResume(copyException ->  
                        Mono.fromCallable(() ->  
                                responseVO.toBuilder()  
                                        .copyServiceId(copyServiceId)  
                                        .copyRequestUri(copyRequestUri)  
                                        .copyRequestBody(requestBody)  
                                        .copyException(copyException)  
                                        .build()  
                        )  
                )
;  
  } 
    
 //......省略部分代码......
  
}

5. 流量复制的使用方法

  • 首先定义要复制流量的具体API,也就是API分组,如下图所示:

  • 然后配置流量复制规则,将API分组/user.user-space-api.online/account/v1/accountSimple的10%的流量复制到服务名为UGC.SPACES-USER-API-READ.ONLINE的服务上,如下图所示:点击保存后,规则被编码为JSON字符串,存储到Apollo配置中心,然后推送至各个网关节点的内存中,进而被Sentinel读取并处理,实现流量的复制。

  • 复制的效果图,如下所示:

原始服务的流量图:复制服务的流量图:

6. 总结

通过把线上流量复制到UAT或者测试环境,可以使用线上的真实流量来对服务进行功能正确性的校验或者通过线上大规模的流量对服务进行压力测试,既方便又快捷。后续还可以实现把两个服务的响应收集起来,进行实时比对,进一步验证被测试服务与线上服务表现的一致性。

7. 参考文档

1.https://github.com/alibaba/Sentinel 2.https://sentinelguard.io/zh-cn/docs/introduction.html





您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存