如何基于 Redis 构建应用程序组件
本文来自作者 阿福 在 GitChat 上分享 「如何基于 Redis 构建应用程序组件」,「阅读原文」查看交流实录。
「文末高能」
编辑 | 哈比
本文基于 Redis 2.8 版本(即腾讯云和阿里云的 Redis 版本),使用 Python 2.7 作为编程语言。本文基于单实例的 Redis,不考虑 Redis 集群。
Redis 数据结构 & 命令
作为 Key-Value 的存储,Redis 的 Value 支持:字符串、链表、集合、散列表以及有序集合共计 5 种数据结构。
开始使用 Redis 构建应用程序组件之前,我们需要了解 Redis 针对不同的数据结构所提供的常用命令。
本章节仅仅是概述,完整的内容请参阅 redis.io。
字符串
Redis 的字符串即为 “字节组成的序列”,支持存储:byte string (字节串)、整数(64 位有符号整数)、浮点数(双精度浮点数)。
1. GET
key
获取 key 的值,值必须为“字符串”类型,若 key 不存在,返回 nil
2. SET
key value
设置 key 的值为 value,value 必须为“字符串” 类型
3. SETEX
key seconds value
设置 key 的值为 value,且设置 key 于 seconds 秒后 “过期”(请参阅 “Redis Key 过期时间” 章节)
4. SETNX
key value
若 key 不存在,设置 key 的值为 value,否则无操作
5. DEL
key [key …]
删除 key,支持批量删除
6. INCR
key
key 的值自增 1 并返回,若 key 不存在,将 key 的值设置为 0,再自增,值必须为“字符串”的“整数”
7. DECR
key
key 的值自减 1 并返回,若 key 不存在,将 key 的值设置为 0,再自减,值必须为“字符串”的“整数”
8. INCRBY
key increment
与 INCRBY 相似,自增的值为 increment
9. DECRBY
key decrement
与 DECRBY
相似,自减的值为 decrement
10. INCRBYFLOAT
key increment
INCRBY
的 “浮点数” 版本
说明:限于篇幅,本文没有列出 Redis “字符串” 存储 byte string (字节串)的相关命令。
链表
Redis 的链表以 “字符串” 作为元素,支持链表的 “双向” 操作:链表左端即为 “头部”,链表右端即为 “尾部”。
1. RPUSH
key value [value …]
元素添加到链表的右端,支持批量操作,返回添加后的链表长度,若 key 不存在,则创建空链表,再添加
2. LPUSH
key value [value …]
与 LPUSH 相似,元素添加到链表的左端
3. RPOP
key
移除并返回链表右端的元素,若 key 不存在或链表为空,返回 nil
4. LPOP
key
与 RPOP
相似,移除并返回链表左端的元素
5. BRPOP
key [key …] timeout
RPOP
阻塞版本,依次检查链表 key [key …],返回第一个非空链表的 key 及其右端元素,否则,Redis 连接将被阻塞(timeout 0 表示永久阻塞)
6. BLPOP
key [key …] timeout
与 BRPOP
相似,移除并返回链表左端的元素
7. RPOPLPUSH
source destination
移除 source 链表右端的元素,添加到 destination 链表的左端,并返回元素,若 source 链表为空或不存在,返回 nil,destination 链表无元素添加
8. BRPOPLPUSH
source destination timeout
RPOPLPUSH
阻塞版本,若 source 链表为空或不存在时,Redis 连接将被阻塞(timeout 0 表示永久阻塞)
9. LINDEX
key index
返回链表位于 index 位置的元素(左端开始,下标 0 开始)(-N 表示右端起的第 N 个位置),下标越界,返回 nil
10. LRANGE
key start stop
返回链表位于 start 到 stop 之间的元素,包括 start 和 stop 位置的元素,左端开始,下标定义与 LINDEX
一致
11. LTRIM
key start stop
移除链表位于 start 到 stop 之间的元素,范围与 LRANGE
相同
集合
Redis 的集合以 “字符串” 作为元素,确保元素各不相同,集合的元素是 “无序” 的。
1. SADD
key member [member …]
添加成员到集合中,支持批量操作,若 key 不存在,则创建空集,再添加,返回实际添加的数量
2. SREM
key member [member …]
移除集合中的成员,支持批量操作,返回实际移除的数量
3. SISMEMBER
key member
判断 member 是否为集合成员
4. SCARD
key
获取集合的基数(集合成员的数量)
5. SMEMBERS
key
获取集合的全部成员
6. SPOP
key
随机地移除并返回集合的一个元素,若集合不存在或为空集,返回 nil
7. SMOVE
source destination member
将成员 member 由 source 集合移动到 destination 集合,若 source 集合不存在或为空集,则无任何行为,返回实际移动的成员数量
8. SDIFF
key [key …]
获取 key [key …] 集合的差集,若集合不存在,作为空集参与计算
9. SDIFFSTORE
destination key [key …]
将 SDIFF
key [key …] 的执行结果写入 destination 集合
10. SINTER
key [key …]
获取 key [key …] 集合的交集,若集合不存在,作为空集参与计算
11. SINTERSTORE
destination key [key …]
将 SINTER
key [key …] 的执行结果写入 destination 集合
12. SUNION
key [key …]
获取 key [key …] 集合的并集,若集合不存在,作为空集参与计算
13. SUNIONSTORE
destination key [key …]
将 SUNION
key [key …] 的执行结果写入 destination 集合
散列表
Redis 的散列表作为 “无序” 的 “键值对”,确保 “键” 各不相同,以 “字符串” 作为 “值”。操作 “散列表” 的命令与 Redis 的 “字符串” 命令,语义非常相近。
1. HMGET
key field [field …]
获取散列表中键 field 对应的值,支持批量操作,若 key 不存在或 field 不存在于散列表,返回 nil
2. HMSET
field value [field value …]
添加 field -> value 的键值对到散列表,支持批量操作,若 key 不存在,则创建空散列表,再添加
3. HDEL
key field [field …]
移除散列表中 field 对应的键值对,支持批量操作,返回实际移除的数量
4. HLEN
key
获取散列表中的键值对数量
5. HEXISTS
key field
判断键 field 是否存在于散列表中
6. HKEYS
key
获取散列表中键的集合
7. HVALS
key
获取散列表中值的集合
8. HGETALL
key
获取散列表中全部的键值对
9. HINCRBY
key field increment
散列表中 field 对应的值自增 increment 并返回,若 key 不存在,则创建空的散列表,若 field 不存在于散列表中,先添加键值对 field -> 0 再自增
值必须为 “字符串” 的 “整数”
10. HINCRBYFLOAT
key field increment
HINCRBY
的 “浮点数” 版本
有序集合
相比较于散列表,Redis 的有序集合是 “有序” 的 “键值对”,确保 “键”(“集合成员”)各不相同,以 “浮点数” 作为 “值”(分数),排序由分数的大小决定(若分数相同,使用 “集合成员” 的字典序)。
1. ZADD
key score member [score member …]
将成员与分数添加到有序集合,支持批量操作,若 key 不存在,则创建空集合,再添加,若集合成员 member 已存在,则更新其分数。返回实际添加的集合成员数量
2. ZREM
key member [member …]
移除有序集合中的成员,支持批量操作,返回实际移除的数量
3. ZCARD
key
获取有序集合的基数(集合成员的数量)
4. ZINCRBY
key increment member
有序集合的成员 member 分数自增 increment,若 key 不存在,创建空集合,若 member 不属于集合成员,先添加(分数 0),再自增,返回自增后的分数
5. ZCOUNT
key min max
获取有序集合中,min <= 成员分数 <= max 的集合成员数量
min 和 max 参数支持前缀 “(”,用于表示 “开区间”
6. ZRANK
key member
获取有序集合成员 member 的排序位置,集合成员按照分数升序排序,位置下标由 0 开始
7. ZREVRANK
key member
ZRANK
的降序版本
8. ZSCORE
key member
获取有序集合成员 member 的分数,若 key 不存在或 member 不存在于有序集合,返回 nil
9. ZRANGE
key start stop
获取有序集合中,排序位置位于 start 和 stop 的集合成员,包括 start 和 stop 位置,集合成员的排序和下标与 ZRANK
一致,-N 表示尾部起第 N 个位置
10. ZREVRANGE
key start stop
ZRANGE
的降序排序版本
11. ZRANGEBYSCORE
key min max
获取有序集合中,min <= 成员分数 <= max 的集合成员,集合成员按照分数升序排序
min 和 max 参数支持前缀 “(”,用于表示 “开区间”
12. ZREVRANGEBYSCORE
key min max
ZRANGEBYSCORE
的降序排列版本
13. ZREMRANGEBYRANK
key start stop
移除 ZRANGE
key start stop 获得的集合成员,返回移除的集合成员数量
14. ZREMRANGEBYSCORE
移除 ZRANGEBYSCORE
key min max 获得的集合成员,返回移除的集合成员数量
15. ZINTERSTORE
destination numkeys key [key …] [AGGREGATE SUM|MIN|MAX]
计算 key [key …] (numkeys 标示集合数量)的交集,[AGGREGATE SUM|MIN|MAX] 标示集合成员聚合时分数的处理方式:和(默认)、最大值、最小值,计算结果存储到 destination
16. ZUNIONSTORE
destination numkeys key [key …] [AGGREGATE SUM|MIN|MAX]
ZINTERSTORE
的 “并集” 版本
其他 Redis 命令
Redis 事务
Redis 通过 MULTI
与 EXEC
命令实现基本的事务支持:
执行
MULTI
命令:开启事务执行其他命令:命令加入 “队列”
执行
EXEC
命令:依次执行 “队列” 中的命令执行
EXEC
命令前,通过DISCARD
即可取消事务(“队列” 中的命令将被丢弃)
Redis 确保:MULTI
与 EXEC
之间 “队列” 的执行是 “独立” 且 “隔离” 的操作,在此期间,Redis 不会执行任何其他的命令。
相比较于 “关系型数据库”:
EXEC
命令前,无法获得任何命令的执行结果(即:不能依赖 “队列” 中命令的执行结果决定后续行为)不支持“回滚”:即使 “队列” 出现执行失败的命令,其他命令仍然继续执行
说明:Redis 将 “发送多个命令,等待所有回复” 的行为称为 “流水线”,除了 “事务”,Redis 支持 “非事务” 的 “流水线”。
Redis Key 过期时间
Redis 支持 “过期时间”,允许一个 Redis Key 在特定的时限后 “自动被删除”。
1. EXPIRE
key seconds
设置 key 于设置的时间(秒)后过期,若过期时间为负数,则移除 key
2. PEXPIRE
key milliseconds
毫秒级的 EXPIRE
3. EXPIREAT
设置 key 过期的 Unix 时间(秒),若过期的 Unix 时间小于当前的 Unix 时间,则移除 key
4. PEXPIREAT
key milliseconds-timestamp
毫秒级 Unix 时间的 EXPIREAT
5. TTL
key
获取 key 距离过期的时间(秒),-1 表示 key 未设置过期时间,-2 表示 key 不存在
6. PTTL
key
毫秒级的 PTTL
7. PERSIST
key
移除 key 的过期时间设置。此外,删除或覆盖 key 存储内容的命令(例如:DEL
、SET
以及 *STORE 命令)亦将清除 key 的过期时间。
基于 Redis 构建应用程序组件
开始之前,我们首先构建一个 get_connection()
方法,用于获取与 Redis 的连接。
<pre> <code> def get_connection(): return redis.Redis.from_url('redis://127.0.0.1:6379/') </code> </pre>
计数器
计数器的基础需求
计数器能够用于业务统计、接口频次控制等场景,通用计数器的基础需求:
支持自定义的计数器标示符
支持自定义的计数时间窗口
实现
<pre> <code> def count(identity, count = 1, window = 1, conn = None): if conn is None: conn = get_connection() counter_key = 'counter_%d_%s' % (window, identity) counter_hash = 'hash_%d' % ((int(time.time()) / window), ) return conn.hincrby(counter_key, counter_hash, count) </code> </pre>
代码所示,选择 Redis 的散列表构建计数器:
计数器标示符(identity)和时间窗口(window)作为 Redis 的 Key
当前时间与时间窗口计算获得散列表的键,确保相同时间窗口内的时间能够获得相同的散列键
特别说明:基于 Redis 命令的原子性,确保了计数器的原子性以及计数的单调增长
日志记录
基础需求
分布式的系统中,必须提供统一的日志收集,其基础需求:
提供统一的日志收集接口,通过各个业务系统标示进行日志隔离
提供标准的日志查看接口(通过业务系统标示和数量,查看最新的日志)
实现
<pre> <code> # # 日志收集接口 # def log(app, message, conn = None): if conn is None: conn = get_connection() app_log_key = 'app_log_%s' % (app, ) return conn.rpush(app_log_key, '%s [%s] %s %s' % (app, socket.gethostname(), time.asctime(), message)) </code> </pre> <pre> <code> # # 查看日志接口 # def get_log(app, size, conn = None): if conn is None: conn = get_connection() app_log_key = 'app_log_%s' % (app, ) return conn.lrange(app_log_key, -1 - (size - 1), -1); </code> </pre>
收集的日志格式类似于:
<pre> ad-api [server] Fri Dec 1 11:30:14 2017 processing request... </pre>
代码所示,选择 Redis 的链表构建中心化的日志收集:
基于业务系统标识(app)构建 Redis 的 Key
日志收集使用 Redis 链表的
RPUSH
命令:日志添加到链表的右端日志查看使用
LRANGE
命令(注意LRANGE
的参数)
特别说明:除了
RPUSH
命令,Redis 链表同时支持LPUSH
命令,能够将数据添加到链表的左端,然而,Redis 不支持 “RRANGE
“,因此,不能使用LPUSH
命令进行日志收集。
统计分析
考虑一个 “接口调用延时” 的统计分析,其基础需求:
提供统一的 “接口调用延时” “上报” 接口:接口、调用延时
基于自定义的时间段(分钟级精度)获取 “接口调用” 分析:最大值、最小值、平均值
<pre> <code> # # “接口调用延时” 数据上报 # def report(interface, time_delay, conn = None): if conn is None: conn = get_connection() pipeline = conn.pipeline() temp_key_for_max = str(uuid.uuid4()) temp_key_for_min = str(uuid.uuid4()) raw_key = 'statistics_%s_%d' % (interface, (int(time.time()) / 60)) statistics_key_for_max = '%s_for_max' % (raw_key, ) statistics_key_for_min = '%s_for_min' % (raw_key, ) statistics_key_for_sum_count = '%s_for_sum_count' % (raw_key, ) pipeline.multi() pipeline.zadd(temp_key_for_max, 'max', time_delay) pipeline.zunionstore(statistics_key_for_max, [statistics_key_for_max, temp_key_for_max], aggregate = 'max') pipeline.zadd(temp_key_for_min, 'min', time_delay) pipeline.zunionstore(statistics_key_for_min, [statistics_key_for_min, temp_key_for_min], aggregate = 'min') pipeline.zincrby(statistics_key_for_sum_count, 'count', 1) pipeline.zincrby(statistics_key_for_sum_count, 'sum', time_delay) pipeline.delete(temp_key_for_max, temp_key_for_min) pipeline.execute() </code> </pre> <pre> <code> # # 获取 “接口调用” 数据统计 # def get_report(interface, begin_time, end_time, conn = None): if conn is None: conn = get_connection() temp_key_for_max = str(uuid.uuid4()) temp_key_for_min = str(uuid.uuid4()) temp_key_for_sum_count = str(uuid.uuid4()) keys_for_max = [] keys_for_min = [] keys_for_sum_count = [] for i in range(int(begin_time / 60), int(end_time / 60) + 1): raw_key = 'statistics_%s_%d' % (interface, i) keys_for_max.append('%s_for_max' % (raw_key, )) keys_for_min.append('%s_for_min' % (raw_key, )) keys_for_sum_count.append('%s_for_sum_count' % (raw_key, )) conn.zunionstore(temp_key_for_max, keys_for_max, aggregate = 'max') conn.zunionstore(temp_key_for_min, keys_for_min, aggregate = 'min') conn.zunionstore(temp_key_for_sum_count, keys_for_sum_count) min_time_delay = conn.zscore(temp_key_for_min, 'min') max_time_delay = conn.zscore(temp_key_for_max, 'max') sum_time_delay = conn.zscore(temp_key_for_sum_count, 'sum') count = conn.zscore(temp_key_for_sum_count, 'count') conn.delete(temp_key_for_max, temp_key_for_min, temp_key_for_sum_count) return { 'min' : min_time_delay, 'max' : max_time_delay, 'average' : float(sum_time_delay) / int(count) } </code> </pre>
代码所示,使用 Redis 的散列表构建配置中心:
开发环境、测试环境、生产环境的配置中心进行 “物理隔离”
基于业务系统标识(app)构建 Redis 的 Key,实现业务系统的配置互相隔离
散列表的 “键 - 值” 作为配置元组的 “Key-Value”
使用
HSET
命令实现配置写入使用
HGETALL
获取业务系统的全部配置
写在最后
请在阅读完本文之后,尝试思考以下问题,最终的答案我们线上交流见 ^_^
1. 本文实现的计数器,如何实现数据清理,例如:清理 1 天前的数据?
2. 本文实现的日志记录,如何实现日志数据的滚动?
3. 本文实现的分布式锁,是否可能存在漏洞?
4. 本文实现的信号量,是否可能存在漏洞?
5. 本文实现的 “消息队列”,能够变更为 LIFO 的 “写入” & “读取” 方式?
近期热文
免费福利
「阅读原文」看交流实录,你想知道的都在这里