APP下载

spring cloud gateway 之filter、限流篇

消息来源:baojiabao.com 作者: 发布时间:2024-05-17

报价宝综合消息spring cloud gateway 之filter、限流篇

1、限流

1.1 常见的限流算法

(1)计数器算法

计数器算法采用计数器实现限流有点简单粗暴,一般我们会限制一秒钟的能够通过的请求数,比如限流qps为100,算法的实现思路就是从第一个请求进来开始计时,在接下去的1s内,每来一个请求,就把计数加1,如果累加的数字达到了100,那么后续的请求就会被全部拒绝。等到1s结束后,把计数恢复成0,重新开始计数。具体的实现可以是这样的:对于每次服务呼叫,可以通过AtomicLong#incrementAndGet()方法来给计数器加1并返回最新值,通过这个最新值和阈值进行比较。这种实现方式,相信大家都知道有一个弊端:如果我在单位时间1s内的前10ms,已经通过了100个请求,那后面的990ms,只能眼巴巴的把请求拒绝,我们把这种现象称为“突刺现象”

(2)漏桶算法

漏桶算法为了消除"突刺现象",可以采用漏桶算法实现限流,漏桶算法这个名字就很形象,算法内部有一个容器,类似生活用到的漏斗,当请求进来时,相当于水倒入漏斗,然后从下端小口慢慢匀速的流出。不管上面流量多大,下面流出的速度始终保持不变。不管服务呼叫方多么不稳定,通过漏桶算法进行限流,每10毫秒处理一次请求。因为处理的速度是固定的,请求进来的速度是未知的,可能突然进来很多请求,没来得及处理的请求就先放在桶里,既然是个桶,肯定是有容量上限,如果桶满了,那么新进来的请求就丢弃。

在算法实现方面,可以准备一个伫列,用来储存请求,另外通过一个执行绪池(ScheduledExecutorService)来定期从伫列中获取请求并执行,可以一次性获取多个并发执行。

这种算法,在使用过后也存在弊端:无法应对短时间的突发流量。

(3)令牌桶算法

从某种意义上讲,令牌桶算法是对漏桶算法的一种改进,桶算法能够限制请求呼叫的速率,而令牌桶算法能够在限制呼叫的平均速率的同时还允许一定程度的突发呼叫。在令牌桶算法中,存在一个桶,用来存放固定数量的令牌。算法中存在一种机制,以一定的速率往桶中放令牌。每次请求呼叫需要先获取令牌,只有拿到令牌,才有机会继续执行,否则选择选择等待可用的令牌、或者直接拒绝。放令牌这个动作是持续不断的进行,如果桶中令牌数达到上限,就丢弃令牌,所以就存在这种情况,桶中一直有大量的可用令牌,这时进来的请求就可以直接拿到令牌执行,比如设定qps为100,那么限流器初始化完成一秒后,桶中就已经有100个令牌了,这时服务还没完全启动好,等启动完成对外提供服务时,该限流器可以抵挡瞬时的100个请求。所以,只有桶中没有令牌时,请求才会进行等待,最后相当于以一定的速率执行。

实现思路:可以准备一个伫列,用来储存令牌,另外通过一个执行绪池定期生成令牌放到伫列中,每来一个请求,就从伫列中获取一个令牌,并继续执行。

1.2 Spring Cloud Gateway限流

在Spring Cloud Gateway中,有Filter过滤器,因此可以在“pre”型别的Filter中自行实现上述三种过滤器。但是限流作为闸道器最基本的功能,Spring Cloud Gateway官方就提供了RequestRateLimiterGatewayFilterFactory这个类,适用Redis和lua指令码实现了令牌桶的方式。具体实现逻辑在RequestRateLimiterGatewayFilterFactory类中,lua指令码在如下图所示的资料夹中:

具体源代码不打算在这里讲述,读者可以自行检视,程式码量较少,先以案例的形式来讲解如何在Spring Cloud Gateway中使用内建的限流过滤器工厂来实现限流。

首先在工程的pom档案中引入gateway的起步依赖和redis的reactive依赖,程式码如下:

org.springframework.cloud

spring-cloud-starter-gateway

org.springframework.boot

spring-boot-starter-data-redis-reactive

在配置档案中做以下的配置:

server:

port: 8081

spring:

cloud:

gateway:

routes:

- id: limit_route

uri: http://httpbin.org:80/get

predicates:

- After=2017-01-20T17:42:47.789-07:00[America/Denver]

filters:

- name: RequestRateLimiter

args:

key-resolver: \'#{@hostAddrKeyResolver}\'

redis-rate-limiter.replenishRate: 1

redis-rate-limiter.burstCapacity: 3

application:

name: gateway-limiter

redis:

host: localhost

port: 6379

database: 0

在上面的配置档案,指定程式的埠为8081,配置了 redis的资讯,并配置了RequestRateLimiter的限流过滤器,该过滤器需要配置三个引数:

burstCapacity,令牌桶总容量。replenishRate,令牌桶每秒填充平均速率。key-resolver,用于限流的键的解析器的 Bean 物件的名字。它使用 SpEL 表示式根据#{@beanName}从 Spring 容器中获取 Bean 物件。KeyResolver需要实现resolve方法,比如根据Hostname进行限流,则需要用hostAddress去判断。实现完KeyResolver之后,需要将这个类的Bean注册到Ioc容器中。

public class HostAddrKeyResolver implements KeyResolver {

@Override

public Mono resolve(ServerWebExchange exchange) {

return Mono.just(exchange.getRequest().getRemoteAddress().getAddress().getHostAddress());

}

}

@Bean

public HostAddrKeyResolver hostAddrKeyResolver() {

return new HostAddrKeyResolver();

}

(1)根据uri限流

这时KeyResolver程式码如下:

public class UriKeyResolver implements KeyResolver {

@Override

public Mono resolve(ServerWebExchange exchange) {

return Mono.just(exchange.getRequest().getURI().getPath());

}

}

@Bean

public UriKeyResolver uriKeyResolver() {

return new UriKeyResolver();

}

(2)以使用者的维度限流

@Bean

KeyResolver userKeyResolver() {

return exchange -> Mono.just(exchange.getRequest().getQueryParams().getFirst("user"));

}

用jmeter进行压测,配置10thread去循环请求lcoalhost:8081,循环间隔1s。从压测的结果上看到有部分请求通过,由部分请求失败。通过redis客户端去检视redis中存在的key。如下:

可见,RequestRateLimiter是使用Redis来进行限流的,并在redis中储存了2个key。关注这两个key含义可以看lua源代码。

1.3 源代码下载

https://github.com/forezp/SpringCloudLearning/tree/master/sc-f-gateway-limiter

2、filter

2.1 filter的作用和生命周期

(1)作用

当我们有很多个服务时,比如下图中的user-service、goods-service、sales-service等服务,客户端请求各个服务的Api时,每个服务都需要做相同的事情,比如鉴权、限流、日志输出等。

对于这样重复的工作,有没有办法做的更好,答案是肯定的。在微服务的上一层加一个全域性的许可权控制、限流、日志输出的Api Gatewat服务,然后再将请求转发到具体的业务服务层。这个Api Gateway服务就是起到一个服务边界的作用,外接的请求访问系统,必须先通过闸道器层。

(2)生命周期

Spring Cloud Gateway同zuul类似,有“pre”和“post”两种方式的filter。客户端的请求先经过“pre”型别的filter,然后将请求转发到具体的业务服务,比如上图中的user-service,收到业务服务的响应之后,再经过“post”型别的filter处理,最后返回响应到客户端。

与zuul不同的是,filter除了分为“pre”和“post”两种方式的filter外,在Spring Cloud Gateway中,filter从作用范围可分为另外两种,一种是针对于单个路由的gateway filter,它在配置档案中的写法同predict类似;另外一种是针对于所有路由的global gateway filer。现在从作用范围划分的维度来讲解这两种filter。

2.2 gateway filter

过滤器允许以某种方式修改传入的HTTP请求或传出的HTTP响应。过滤器可以限定作用在某些特定请求路径上。 Spring Cloud Gateway包含许多内建的GatewayFilter工厂。

GatewayFilter工厂同上一篇介绍的Predicate工厂类似,都是在配置档案application.yml中配置,遵循了约定大于配置的思想,只需要在配置档案配置GatewayFilter Factory的名称,而不需要写全部的类名,比如AddRequestHeaderGatewayFilterFactory只需要在配置档案中写AddRequestHeader,而不是全部类名。在配置档案中配置的GatewayFilter Factory最终都会相应的过滤器工厂类处理。

Spring Cloud Gateway 内建的过滤器工厂一览表如下:

现在挑几个常见的过滤器工厂来讲解,每一个过滤器工厂在官方文件都给出了详细的使用案例,如果不清楚的还可以在org.springframework.cloud.gateway.filter.factory看每一个过滤器工厂的源代码。

(1)AddRequestHeader GatewayFilter Factory

建立工程,引入相关的依赖,包括spring boot 版本2.0.5,spring Cloud版本Finchley,gateway依赖如下:

org.springframework.cloud

spring-cloud-starter-gateway

在工程的配置档案中,加入以下的配置

server:

port: 8081

spring:

profiles:

active: add_request_header_route

---

spring:

cloud:

gateway:

routes:

- id: add_request_header_route

uri: http://httpbin.org:80/get

filters:

- AddRequestHeader=X-Request-Foo, Bar

predicates:

- After=2017-01-20T17:42:47.789-07:00[America/Denver]

profiles: add_request_header_route

在上述的配置中,工程的启动埠为8081,配置档案为add_request_header_route,在add_request_header_route配置中,配置了roter的id为add_request_header_route,路由地址为http://httpbin.org:80/get,该router有AfterPredictFactory,有一个filter为AddRequestHeaderGatewayFilterFactory(约定写成AddRequestHeader),AddRequestHeader过滤器工厂会在请求头加上一对请求头,名称为X-Request-Foo,值为Bar。为了验证AddRequestHeaderGatewayFilterFactory是怎么样工作的,检视它的源代码,AddRequestHeaderGatewayFilterFactory的源代码如下:

public class AddRequestHeaderGatewayFilterFactory extends AbstractNameValueGatewayFilterFactory {

@Override

public GatewayFilter apply(NameValueConfig config) {

return (exchange, chain) -> {

ServerHttpRequest request = exchange.getRequest().mutate()

.header(config.getName(), config.getValue())

.build();

return chain.filter(exchange.mutate().request(request).build());

};

}

}

由上面的程式码可知,根据旧的ServerHttpRequest建立新的 ServerHttpRequest ,在新的ServerHttpRequest加了一个请求头,然后建立新的 ServerWebExchange ,提交过滤器链继续过滤。

启动工程,通过curl命令来模拟请求:

curl localhost:8081

最终显示了从 httpbin.org:80/get得到了请求,响应…

{

"args": {},

"headers": {

"Accept": "*/*",

"Connection": "close",

"Forwarded": "proto=http;host="localhost:8081";for="0:0:0:0:0:0:0:1:56248"",

"Host": "httpbin.org",

"User-Agent": "curl/7.58.0",

"X-Forwarded-Host": "localhost:8081",

"X-Request-Foo": "Bar"

},

"origin": "0:0:0:0:0:0:0:1, 210.22.21.66",

"url": "http://localhost:8081/get"

}

可以上面的响应可知,确实在请求头中加入了X-Request-Foo这样的一个请求头,在配置档案中配置的AddRequestHeader过滤器工厂生效。

跟AddRequestHeader过滤器工厂类似的还有AddResponseHeader过滤器工厂,在此就不再重复。

(2)RewritePath GatewayFilter Factory

在Nginx服务启中有一个非常强大的功能就是重写路径,Spring Cloud Gateway预设也提供了这样的功能,这个功能是Zuul没有的。在配置档案中加上以下的配置:

spring:

profiles:

active: rewritepath_route

---

spring:

cloud:

gateway:

routes:

- id: rewritepath_route

uri: https://blog.csdn.net

predicates:

- Path=/foo/**

filters:

- RewritePath=/foo/(?.*), /${segment}

profiles: rewritepath_route

上面的配置中,所有的/foo/**开始的路径都会命中配置的router,并执行过滤器的逻辑,在本案例中配置了RewritePath过滤器工厂,此工厂将/foo/(?.*)重写为{segment},然后转发到https://blog.csdn.net。比如在网页上请求localhost:8081/foo/forezp,此时会将请求转发到https://blog.csdn.net/forezp的页面,比如在网页上请求localhost:8081/foo/forezp/1,页面显示404,就是因为不存在https://blog.csdn.net/forezp/1这个页面。

(3)自定义过滤器

Spring Cloud Gateway内建了19种强大的过滤器工厂,能够满足很多场景的需求,那么能不能自定义自己的过滤器呢,当然是可以的。在spring Cloud Gateway中,过滤器需要实现GatewayFilter和Ordered2个界面。写一个RequestTimeFilter,程式码如下:

public class RequestTimeFilter implements GatewayFilter, Ordered {

private static final Log log = LogFactory.getLog(GatewayFilter.class);

private static final String REQUEST_TIME_BEGIN = "requestTimeBegin";

@Override

public Mono filter(ServerWebExchange exchange, GatewayFilterChain chain) {

exchange.getAttributes().put(REQUEST_TIME_BEGIN, System.currentTimeMillis());

return chain.filter(exchange).then(

Mono.fromRunnable(() -> {

Long startTime = exchange.getAttribute(REQUEST_TIME_BEGIN);

if (startTime != null) {

log.info(exchange.getRequest().getURI().getRawPath() + ": " + (System.currentTimeMillis() - startTime) + "ms");

}

})

);

}

@Override

public int getOrder() {

return 0;

}

}

在上面的程式码中,Ordered中的int getOrder()方法是来给过滤器设定优先级别的,值越大则优先级越低。还有有一个filterI(exchange,chain)方法,在该方法中,先记录了请求的开始时间,并储存在ServerWebExchange中,此处是一个“pre”型别的过滤器,然后再chain.filter的内部类中的run()方法中相当于"post"过滤器,在此处打印了请求所消耗的时间。然后将该过滤器注册到router中,程式码如下:

@Bean

public RouteLocator customerRouteLocator(RouteLocatorBuilder builder) {

// @formatter:off

return builder.routes()

.route(r -> r.path("/customer/**")

.filters(f -> f.filter(new RequestTimeFilter())

.addResponseHeader("X-Response-Default-Foo", "Default-Bar"))

.uri("http://httpbin.org:80/get")

.order(0)

.id("customer_filter_router")

)

.build();

// @formatter:on

}

重启程式,通过curl命令模拟请求:

curl localhost:8081/customer/123

在程式的控制台输出一下的请求资讯的日志:

2018-11-16 15:02:20.177 INFO 20488 --- [ctor-http-nio-3] o.s.cloud.gateway.filter.GatewayFilter : /customer/123: 152ms

2.3 自定义过滤器工厂

在上面的自定义过滤器中,有没有办法自定义过滤器工厂类呢?这样就可以在配置档案中配置过滤器了。现在需要实现一个过滤器工厂,在打印时间的时候,可以设定引数来决定是否打印请引数。检视GatewayFilterFactory的源代码,可以发现GatewayFilterfactory的层级如下:

过滤器工厂的顶级界面是GatewayFilterFactory,我们可以直接继承它的两个抽象类来简化开发AbstractGatewayFilterFactory和AbstractNameValueGatewayFilterFactory,这两个抽象类的区别就是前者接收一个引数(像StripPrefix和我们建立的这种),后者接收两个引数(像AddResponseHeader)。

过滤器工厂的顶级界面是GatewayFilterFactory,有2个两个较接近具体实现的抽象类,分别为AbstractGatewayFilterFactory和AbstractNameValueGatewayFilterFactory,这2个类前者接收一个引数,比如它的实现类RedirectToGatewayFilterFactory;后者接收2个引数,比如它的实现类AddRequestHeaderGatewayFilterFactory类。现在需要将请求的日志打印出来,需要使用一个引数,这时可以参照RedirectToGatewayFilterFactory的写法。

public class RequestTimeGatewayFilterFactory extends AbstractGatewayFilterFactory {

private static final Log log = LogFactory.getLog(GatewayFilter.class);

private static final String REQUEST_TIME_BEGIN = "requestTimeBegin";

private static final String KEY = "withParams";

@Override

public List shortcutFieldOrder() {

return Arrays.asList(KEY);

}

public RequestTimeGatewayFilterFactory() {

super(Config.class);

}

@Override

public GatewayFilter apply(Config config) {

return (exchange, chain) -> {

exchange.getAttributes().put(REQUEST_TIME_BEGIN, System.currentTimeMillis());

return chain.filter(exchange).then(

Mono.fromRunnable(() -> {

Long startTime = exchange.getAttribute(REQUEST_TIME_BEGIN);

if (startTime != null) {

StringBuilder sb = new StringBuilder(exchange.getRequest().getURI().getRawPath())

.append(": ")

.append(System.currentTimeMillis() - startTime)

.append("ms");

if (config.isWithParams()) {

sb.append(" params:").append(exchange.getRequest().getQueryParams());

}

log.info(sb.toString());

}

})

);

};

}

public static class Config {

private boolean withParams;

public boolean isWithParams() {

return withParams;

}

public void setWithParams(boolean withParams) {

this.withParams = withParams;

}

}

}

在上面的程式码中 apply(Config config)方法内建立了一个GatewayFilter的匿名类,具体的实现逻辑跟之前一样,只不过加了是否打印请求引数的逻辑,而这个逻辑的开关是config.isWithParams()。静态内部类类Config就是为了接收那个boolean型别的引数服务的,里边的变数名可以随意写,但是要重写List shortcutFieldOrder()这个方法。 。

需要注意的是,在类的构造器中一定要呼叫下父类的构造器把Config型别传过去,否则会报ClassCastException

最后,需要在工程的启动档案Application类中,向Srping Ioc容器注册RequestTimeGatewayFilterFactory类的Bean。

@Bean

public RequestTimeGatewayFilterFactory elapsedGatewayFilterFactory() {

return new RequestTimeGatewayFilterFactory();

}

然后可以在配置档案中配置如下:

spring:

profiles:

active: elapse_route

---

spring:

cloud:

gateway:

routes:

- id: elapse_route

uri: http://httpbin.org:80/get

filters:

- RequestTime=false

predicates:

- After=2017-01-20T17:42:47.789-07:00[America/Denver]

profiles: elapse_route

启动工程,在浏览器上访问localhost:8081?name=forezp,可以在控制台上看到,日志输出了请求消耗的时间和请求引数。

2.4 global filter

Spring Cloud Gateway根据作用范围划分为GatewayFilter和GlobalFilter,二者区别如下:

GatewayFilter : 需要通过spring.cloud.routes.filters 配置在具体路由下,只作用在当前路由上或通过spring.cloud.default-filters配置在全域性,作用在所有路由上GlobalFilter : 全域性过滤器,不需要在配置档案中配置,作用在所有的路由上,最终通过GatewayFilterAdapter包装成GatewayFilterChain可识别的过滤器,它为请求业务以及路由的URI转换为真实业务服务的请求地址的核心过滤器,不需要配置,系统初始化时载入,并作用在每个路由上。Spring Cloud Gateway框架内建的GlobalFilter如下:

上图中每一个GlobalFilter都作用在每一个router上,能够满足大多数的需求。但是如果遇到业务上的定制,可能需要编写满足自己需求的GlobalFilter。在下面的案例中将讲述如何编写自己GlobalFilter,该GlobalFilter会校验请求中是否包含了请求引数“token”,如何不包含请求引数“token”则不转发路由,否则执行正常的逻辑。程式码如下:

public class TokenFilter implements GlobalFilter, Ordered {

Logger logger=LoggerFactory.getLogger( TokenFilter.class );

@Override

public Mono filter(ServerWebExchange exchange, GatewayFilterChain chain) {

String token = exchange.getRequest().getQueryParams().getFirst("token");

if (token == null || token.isEmpty()) {

logger.info( "token is empty..." );

exchange.getResponse().setStatusCode(HttpStatus.UNAUTHORIZED);

return exchange.getResponse().setComplete();

}

return chain.filter(exchange);

}

@Override

public int getOrder() {

return -100;

}

}

在上面的TokenFilter需要实现GlobalFilter和Ordered界面,这和实现GatewayFilter很类似。然后根据ServerWebExchange获取ServerHttpRequest,然后根据ServerHttpRequest中是否含有引数token,如果没有则完成请求,终止转发,否则执行正常的逻辑。

然后需要将TokenFilter在工程的启动类中注入到Spring Ioc容器中,程式码如下:

@Bean

public TokenFilter tokenFilter(){

return new TokenFilter();

}

启动工程,使用curl命令请求:

curl localhost:8081/customer/123

可以看到请没有被转发,请求被终止,并在控制台打印了如下日志:

2018-11-16 15:30:13.543 INFO 19372 --- [ctor-http-nio-2] gateway.TokenFilter : token is empty...

上面的日志显示了请求进入了没有传“token”的逻辑。

2.5 源代码下载

https://github.com/forezp/SpringCloudLearning/tree/master/sc-f-gateway-predicate

更多资料展示如下,需要的小伙伴们转发,私信口令:资料,即可获得以下资料!!!

2020-01-17 09:51:00

相关文章