查看原文
其他

Redis实现的分布式锁和分布式限流

黄青石 Java面试 2020-10-17

随着现在分布式越来越普遍,分布式锁也十分常用,上篇文章解释了使用zookeeper实现分布式锁,本次咱们说一下如何用Redis实现分布式锁和分布限流。

https://www.cnblogs.com/huangqingshi/p/9650837.html

Redis有个事务锁,就是如下的命令,这个命令的含义是将一个value设置到一个key中,如果不存在将会赋值并且设置超时时间为30秒,如何这个key已经存在了,则不进行设置。

  1. SET key value NX PX 30000

这个事务锁很好的解决了两个单独的命令,一个设置set key value nx,即该key不存在的话将对其进行设置,另一个是expire key seconds,设置该key的超时时间。我们可以想一下,如果这两个命令用程序单独使用会存在什么问题:

  • 如果一个set key的命令设置了key,然后程序异常了,expire时间没有设置,那么这个key会一直锁住。

  • 如果一个set key时出现了异常,但是直接执行了expire,过了一会儿之后另一个进行set key,还没怎么执行代码,结果key过期了,别的线程也进入了锁。

还有很多出问题的可能点,这里我们就不讨论了,下面咱们来看看如何实现吧。

本文使用的Spring Boot 2.x + Spring data redis + Swagger +lombok + AOP + lua脚本。在实现的过程中遇到了很多问题,都一一解决实现了。

依赖的POM文件如下:

  1. <?xml version="1.0" encoding="UTF-8"?>

  2. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

  3.         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

  4.    <modelVersion>4.0.0</modelVersion>

  5.    <parent>

  6.        <groupId>org.springframework.boot</groupId>

  7.        <artifactId>spring-boot-starter-parent</artifactId>

  8.        <version>2.1.2.RELEASE</version>

  9.        <relativePath/> <!-- lookup parent from repository -->

  10.    </parent>

  11.    <groupId>com.hqs</groupId>

  12.    <artifactId>distributedlock</artifactId>

  13.    <version>0.0.1-SNAPSHOT</version>

  14.    <name>distributedlock</name>

  15.    <description>Demo project for Spring Boot</description>


  16.    <properties>

  17.        <java.version>1.8</java.version>

  18.    </properties>


  19.    <dependencies>

  20.        <dependency>

  21.            <groupId>org.springframework.boot</groupId>

  22.            <artifactId>spring-boot-starter-aop</artifactId>

  23.        </dependency>

  24.        <dependency>

  25.            <groupId>org.springframework.boot</groupId>

  26.            <artifactId>spring-boot-starter-web</artifactId>

  27.        </dependency>

  28.        <dependency>

  29.            <groupId>org.springframework.boot</groupId>

  30.            <artifactId>spring-boot-starter-data-redis</artifactId>

  31.        </dependency>

  32.        <dependency>

  33.            <groupId>org.springframework.boot</groupId>

  34.            <artifactId>spring-boot-devtools</artifactId>

  35.            <scope>runtime</scope>

  36.        </dependency>

  37.        <dependency>

  38.            <groupId>org.projectlombok</groupId>

  39.            <artifactId>lombok</artifactId>

  40.            <optional>true</optional>

  41.        </dependency>

  42.        <dependency>

  43.            <groupId>org.springframework.boot</groupId>

  44.            <artifactId>spring-boot-starter-test</artifactId>

  45.            <scope>test</scope>

  46.        </dependency>

  47.        <dependency>

  48.            <groupId>io.springfox</groupId>

  49.            <artifactId>springfox-swagger-ui</artifactId>

  50.            <version>2.9.2</version>

  51.        </dependency>

  52.        <dependency>

  53.            <groupId>io.springfox</groupId>

  54.            <artifactId>springfox-swagger2</artifactId>

  55.            <version>2.9.2</version>

  56.            <scope>compile</scope>

  57.        </dependency>

  58.        <dependency>

  59.            <groupId>redis.clients</groupId>

  60.            <artifactId>jedis</artifactId>

  61.            <version>2.9.0</version>

  62.        </dependency>

  63.    </dependencies>


  64.    <build>

  65.        <plugins>

  66.            <plugin>

  67.                <groupId>org.springframework.boot</groupId>

  68.                <artifactId>spring-boot-maven-plugin</artifactId>

  69.            </plugin>

  70.        </plugins>

  71.    </build>


  72. </project>

使用了两个lua脚本,一个用于执行lock,另一个执行unlock。

咱们简单看一下,lock脚本就是采用Redis事务执行的set nx px命令,其实还有set nx ex命令,这个ex命令是采用秒的方式进行设置过期时间,这个px是采用毫秒的方式设置过期时间。

value需要使用一个唯一的值,这个值在解锁的时候需要判断是否一致,如果一致的话就进行解锁。这个也是官方推荐的方法。另外在lock的地方我设置了一个result,用于输出测试时的结果,这样就可以结合程序去进行debug了。

  1. local expire = tonumber(ARGV[2])

  2. local ret = redis.call('set', KEYS[1], ARGV[1], 'NX', 'PX', expire)

  3. local strret = tostring(ret)

  4. //用于查看结果,我本机获取锁成功后程序返回随机结果"table: 0x7fb4b3700fe0",否则返回"false"

  5. redis.call('set', 'result', strret)

  6. if strret == 'false' then

  7.    return false

  8. else

  9.    return true

  10. end


  1. redis.call('del', 'result')

  2. if redis.call('get', KEYS[1]) == ARGV[1] then

  3.    return redis.call('del', KEYS[1])

  4. else

  5.    return 0

  6. end

来看下代码,主要写了两个方法,一个是用与锁另外一个是用于结解锁。这块需要注意的是使用RedisTemplate,这块意味着key和value一定都是String的,我在使用的过程中就出现了一些错误。首先初始化两个脚本到程序中,然后调用执行脚本。,>

  1. package com.hqs.distributedlock.lock;



  2. import lombok.extern.slf4j.Slf4j;

  3. import org.springframework.beans.factory.annotation.Autowired;

  4. import org.springframework.data.redis.core.RedisTemplate;

  5. import org.springframework.data.redis.core.script.RedisScript;

  6. import org.springframework.stereotype.Component;


  7. import java.util.Collections;


  8. @Slf4j

  9. @Component

  10. public class DistributedLock {


  11.    //注意RedisTemplate用的String,String,后续所有用到的key和value都是String的

  12.    @Autowired

  13.    private RedisTemplate<String, String> redisTemplate;


  14.    @Autowired

  15.    RedisScript<Boolean> lockScript;


  16.    @Autowired

  17.    RedisScript<Long> unlockScript;


  18.    public Boolean distributedLock(String key, String uuid, String secondsToLock) {

  19.        Boolean locked = false;

  20.        try {

  21.            String millSeconds = String.valueOf(Integer.parseInt(secondsToLock) * 1000);

  22.            locked =redisTemplate.execute(lockScript, Collections.singletonList(key), uuid, millSeconds);

  23.            log.info("distributedLock.key{}: - uuid:{}: - timeToLock:{} - locked:{} - millSeconds:{}",

  24.                    key, uuid, secondsToLock, locked, millSeconds);

  25.        } catch (Exception e) {

  26.            log.error("error", e);

  27.        }

  28.        return locked;

  29.    }


  30.    public void distributedUnlock(String key, String uuid) {

  31.        Long unlocked = redisTemplate.execute(unlockScript, Collections.singletonList(key),

  32.                uuid);

  33.        log.info("distributedLock.key{}: - uuid:{}: - unlocked:{}", key, uuid, unlocked);


  34.    }


  35. }

还有一个就是脚本定义的地方需要注意,返回的结果集一定是Long, Boolean,List, 一个反序列化的值。这块要注意。

  1. package com.hqs.distributedlock.config;



  2. import com.sun.org.apache.xpath.internal.operations.Bool;

  3. import lombok.extern.slf4j.Slf4j;

  4. import org.springframework.beans.factory.annotation.Qualifier;

  5. import org.springframework.context.annotation.Bean;

  6. import org.springframework.context.annotation.Configuration;

  7. import org.springframework.core.io.ClassPathResource;

  8. import org.springframework.data.redis.core.script.DefaultRedisScript;

  9. import org.springframework.data.redis.core.script.RedisScript;

  10. import org.springframework.scripting.ScriptSource;

  11. import org.springframework.scripting.support.ResourceScriptSource;



  12. @Configuration

  13. @Slf4j

  14. public class BeanConfiguration {


  15.    /**

  16.     * The script resultType should be one of

  17.     * Long, Boolean, List, or a deserialized value type. It can also be null if the script returns

  18.     * a throw-away status (specifically, OK).

  19.     * @return

  20.     */

  21.    @Bean

  22.    public RedisScript<Long> limitScript() {

  23.        RedisScript redisScript = null;

  24.        try {

  25.            ScriptSource scriptSource = new ResourceScriptSource(new ClassPathResource("/scripts/limit.lua"));

  26. //            log.info("script:{}", scriptSource.getScriptAsString());

  27.            redisScript = RedisScript.of(scriptSource.getScriptAsString(), Long.class);

  28.        } catch (Exception e) {

  29.            log.error("error", e);

  30.        }

  31.        return redisScript;


  32.    }


  33.    @Bean

  34.    public RedisScript<Boolean> lockScript() {

  35.        RedisScript<Boolean> redisScript = null;

  36.        try {

  37.            ScriptSource scriptSource = new ResourceScriptSource(new ClassPathResource("/scripts/lock.lua"));

  38.            redisScript = RedisScript.of(scriptSource.getScriptAsString(), Boolean.class);

  39.        } catch (Exception e) {

  40.            log.error("error" , e);

  41.        }

  42.        return redisScript;

  43.    }


  44.    @Bean

  45.    public RedisScript<Long> unlockScript() {

  46.        RedisScript<Long> redisScript = null;

  47.        try {

  48.            ScriptSource scriptSource = new ResourceScriptSource(new ClassPathResource("/scripts/unlock.lua"));

  49.            redisScript = RedisScript.of(scriptSource.getScriptAsString(), Long.class);

  50.        } catch (Exception e) {

  51.            log.error("error" , e);

  52.        }

  53.        return redisScript;

  54.    }



  55.    @Bean

  56.    public RedisScript<Long> limitAnother() {

  57.        DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();

  58.        redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("/scripts/limit.lua")));

  59.        redisScript.setResultType(Long.class);

  60.        return redisScript;

  61.    }


  62. }

好了,这块就写好了,然后写好controller类准备测试。

  1. @PostMapping("/distributedLock")

  2.    @ResponseBody

  3.    public String distributedLock(String key, String uuid, String secondsToLock, String userId) throws Exception{

  4. //        String uuid = UUID.randomUUID().toString();

  5.        Boolean locked = false;

  6.        try {

  7.            locked = lock.distributedLock(key, uuid, secondsToLock);

  8.            if(locked) {

  9.                log.info("userId:{} is locked - uuid:{}", userId, uuid);

  10.                log.info("do business logic");

  11.                TimeUnit.MICROSECONDS.sleep(3000);

  12.            } else {

  13.                log.info("userId:{} is not locked - uuid:{}", userId, uuid);

  14.            }

  15.        } catch (Exception e) {

  16.            log.error("error", e);

  17.        } finally {

  18.            if(locked) {

  19.                lock.distributedUnlock(key, uuid);

  20.            }

  21.        }


  22.        return "ok";

  23.    }

我也写了一个测试类,用于测试和输出结果, 使用100个线程,然后锁的时间设置10秒,controller里边需要休眠3秒模拟业务执行。

  1. @Test

  2.    public void distrubtedLock() {

  3.        String url = "http://localhost:8080/distributedLock";

  4.        String uuid = "abcdefg";

  5. //        log.info("uuid:{}", uuid);

  6.        String key = "redisLock";

  7.        String secondsToLive = "10";


  8.        for(int i = 0; i < 100; i++) {

  9.            final int userId = i;

  10.            new Thread(() -> {

  11.                MultiValueMap<String, String> params = new LinkedMultiValueMap<>();

  12.                params.add("uuid", uuid);

  13.                params.add("key", key);

  14.                params.add("secondsToLock", secondsToLive);

  15.                params.add("userId", String.valueOf(userId));

  16.                String result = testRestTemplate.postForObject(url, params, String.class);

  17.                System.out.println("-------------" + result);

  18.            }

  19.            ).start();

  20.        }


  21.    }

获取锁的地方就会执行do business logic, 然后会有部分线程获取到锁并执行业务,执行完业务的就会释放锁。

分布式锁就实现好了,接下来实现分布式限流。先看一下limit的lua脚本,需要给脚本传两个值,一个值是限流的key,一个值是限流的数量。

获取当前key,然后判断其值是否为nil,如果为nil的话需要赋值为0,然后进行加1并且和limit进行比对,如果大于limt即返回0,说明限流了,如果小于limit则需要使用Redis的INCRBY key 1,就是将key进行加1命令。并且设置超时时间,超时时间是秒,并且如果有需要的话这个秒也是可以用参数进行设置。

  1. //lua 下标从 1 开始

  2. // 限流 key

  3. local key = KEYS[1]

  4. //限流大小

  5. local limit = tonumber(ARGV[1])


  6. // 获取当前流量大小

  7. local curentLimit = tonumber(redis.call('get', key) or "0")


  8. if curentLimit + 1 > limit then

  9.    // 达到限流大小 返回

  10.    return 0;

  11. else

  12.    // 没有达到阈值 value + 1

  13.    redis.call("INCRBY", key, 1)

  14.    // EXPIRE后边的单位是秒

  15.    redis.call("EXPIRE", key, 10)

  16.    return curentLimit + 1

  17. end


执行limit的脚本和执行lock的脚本类似。

  1. package com.hqs.distributedlock.limit;


  2. import lombok.extern.slf4j.Slf4j;

  3. import org.springframework.beans.factory.annotation.Autowired;

  4. import org.springframework.data.redis.core.RedisTemplate;

  5. import org.springframework.data.redis.core.script.RedisScript;

  6. import org.springframework.stereotype.Component;


  7. import java.util.Collections;


  8. /**

  9. * @author huangqingshi

  10. * @Date 2019-01-17

  11. */

  12. @Slf4j

  13. @Component

  14. public class DistributedLimit {


  15.    //注意RedisTemplate用的String,String,后续所有用到的key和value都是String的

  16.    @Autowired

  17.    private RedisTemplate<String, String> redisTemplate;



  18.    @Autowired

  19.    RedisScript<Long> limitScript;


  20.    public Boolean distributedLimit(String key, String limit) {

  21.        Long id = 0L;


  22.        try {

  23.            id = redisTemplate.execute(limitScript, Collections.singletonList(key),

  24.                    limit);

  25.            log.info("id:{}", id);

  26.        } catch (Exception e) {

  27.            log.error("error", e);

  28.        }


  29.        if(id == 0L) {

  30.            return false;

  31.        } else {

  32.            return true;

  33.        }

  34.    }


  35. }

接下来咱们写一个限流注解,并且设置注解的key和限流的大小:

  1. package com.hqs.distributedlock.annotation;


  2. import java.lang.annotation.ElementType;

  3. import java.lang.annotation.Retention;

  4. import java.lang.annotation.RetentionPolicy;

  5. import java.lang.annotation.Target;


  6. /**

  7. * 自定义limit注解

  8. * @author huangqingshi

  9. * @Date 2019-01-17

  10. */

  11. @Target(ElementType.METHOD)

  12. @Retention(RetentionPolicy.RUNTIME)

  13. public @interface DistriLimitAnno {

  14.    public String limitKey() default "limit";

  15.    public int limit() default 1;

  16. }

然后对注解进行切面,在切面中判断是否超过limit,如果超过limit的时候就需要抛出异常exceeded limit,否则正常执行。

  1. package com.hqs.distributedlock.aspect;


  2. import com.hqs.distributedlock.annotation.DistriLimitAnno;

  3. import com.hqs.distributedlock.limit.DistributedLimit;

  4. import lombok.extern.slf4j.Slf4j;

  5. import org.aspectj.lang.JoinPoint;

  6. import org.aspectj.lang.annotation.Aspect;

  7. import org.aspectj.lang.annotation.Before;

  8. import org.aspectj.lang.annotation.Pointcut;

  9. import org.aspectj.lang.reflect.MethodSignature;

  10. import org.springframework.beans.factory.annotation.Autowired;

  11. import org.springframework.context.annotation.EnableAspectJAutoProxy;

  12. import org.springframework.stereotype.Component;


  13. import java.lang.reflect.Method;


  14. /**

  15. * @author huangqingshi

  16. * @Date 2019-01-17

  17. */

  18. @Slf4j

  19. @Aspect

  20. @Component

  21. @EnableAspectJAutoProxy(proxyTargetClass = true)

  22. public class LimitAspect {


  23.    @Autowired

  24.    DistributedLimit distributedLimit;


  25.    @Pointcut("@annotation(com.hqs.distributedlock.annotation.DistriLimitAnno)")

  26.    public void limit() {};


  27.    @Before("limit()")

  28.    public void beforeLimit(JoinPoint joinPoint) throws Exception {

  29.        MethodSignature signature = (MethodSignature) joinPoint.getSignature();

  30.        Method method = signature.getMethod();

  31.        DistriLimitAnno distriLimitAnno = method.getAnnotation(DistriLimitAnno.class);

  32.        String key = distriLimitAnno.limitKey();

  33.        int limit = distriLimitAnno.limit();

  34.        Boolean exceededLimit = distributedLimit.distributedLimit(key, String.valueOf(limit));

  35.        if(!exceededLimit) {

  36.            throw new RuntimeException("exceeded limit");

  37.        }

  38.    }


  39. }

因为有抛出异常,这里我弄了一个统一的controller错误处理,如果controller出现Exception的时候都需要走这块异常。如果是正常的RunTimeException的时候获取一下,否则将异常获取一下并且输出。

  1. package com.hqs.distributedlock.util;


  2. import lombok.extern.slf4j.Slf4j;

  3. import org.springframework.http.HttpStatus;

  4. import org.springframework.web.bind.annotation.ControllerAdvice;

  5. import org.springframework.web.bind.annotation.ExceptionHandler;

  6. import org.springframework.web.bind.annotation.ResponseBody;

  7. import org.springframework.web.bind.annotation.ResponseStatus;

  8. import org.springframework.web.context.request.NativeWebRequest;


  9. import javax.servlet.http.HttpServletRequest;

  10. import java.util.HashMap;

  11. import java.util.Map;


  12. /**

  13. * @author huangqingshi

  14. * @Date 2019-01-17

  15. * 统一的controller错误处理

  16. */

  17. @Slf4j

  18. @ControllerAdvice

  19. public class UnifiedErrorHandler {

  20.    private static Map<String, String> res = new HashMap<>(2);


  21.    @ExceptionHandler(value = Exception.class)

  22.    @ResponseStatus(HttpStatus.OK)

  23.    @ResponseBody

  24.    public Object processException(HttpServletRequest req, Exception e) {

  25.        res.put("url", req.getRequestURL().toString());


  26.        if(e instanceof RuntimeException) {

  27.            res.put("mess", e.getMessage());

  28.        } else {

  29.            res.put("mess", "sorry error happens");

  30.        }

  31.        return res;

  32.    }


  33. }

好了,接下来将注解写到自定义的controller上,limit的大小为10,也就是10秒钟内限制10次访问。

  1. @PostMapping("/distributedLimit")

  2.    @ResponseBody

  3.    @DistriLimitAnno(limitKey="limit", limit = 10)

  4.    public String distributedLimit(String userId) {

  5.        log.info(userId);

  6.        return "ok";

  7.    }

也是来一段Test方法来跑,老方式100个线程开始跑,只有10次,其他的都是limit。没有问题。

总结一下,这次实现采用了使用lua脚本和Redis实现了锁和限流,但是真实使用的时候还需要多测试,另外如果此次Redis也是采用的单机实现方法,使用集群的时候可能需要改造一下。

关于锁这块其实Reids自己也实现了RedLock, java实现的版本Redission。也有很多公司使用了,功能非常强大。各种场景下都用到了。


扩展阅读

利用Redis实现分布式锁

从分布式一致性谈到CAP理论、BASE理论

Redis 分布式锁:乐观锁的实现,以秒杀系统为例

从构建分布式秒杀系统聊聊限流特技

大型网站限流算法的实现和改造


来源:https://www.cnblogs.com/huangqingshi/p/10290615.html

文章来源网络,版权归作者本人所有,如侵犯到原作者权益,请与我们联系删除

    您可能也对以下帖子感兴趣

    文章有问题?点此查看未经处理的缓存