查看原文
其他

如何基于 Redis 构建应用程序组件

2017-12-14 阿福 GitChat技术杂谈

本文来自作者 阿福 在 GitChat 上分享 「如何基于 Redis 构建应用程序组件」,「阅读原文」查看交流实录。

「文末高能」

编辑 | 哈比

本文基于 Redis 2.8 版本(即腾讯云和阿里云的 Redis 版本),使用 Python 2.7 作为编程语言。本文基于单实例的 Redis,不考虑 Redis 集群。

Redis 数据结构 & 命令

作为 Key-Value 的存储,Redis 的 Value 支持:字符串、链表、集合、散列表以及有序集合共计 5 种数据结构。

开始使用 Redis 构建应用程序组件之前,我们需要了解 Redis 针对不同的数据结构所提供的常用命令。

本章节仅仅是概述,完整的内容请参阅 redis.io。

字符串

Redis 的字符串即为 “字节组成的序列”,支持存储:byte string (字节串)、整数(64 位有符号整数)、浮点数(双精度浮点数)。

1. GET key

获取 key 的值,值必须为“字符串”类型,若 key 不存在,返回 nil

2. SET key value

设置 key 的值为 value,value 必须为“字符串” 类型

3. SETEX key seconds value

设置 key 的值为 value,且设置 key 于 seconds 秒后 “过期”(请参阅 “Redis Key 过期时间” 章节)

4. SETNX key value

若 key 不存在,设置 key 的值为 value,否则无操作

5. DEL key [key …]

删除 key,支持批量删除

6. INCR key

key 的值自增 1 并返回,若 key 不存在,将 key 的值设置为 0,再自增,值必须为“字符串”的“整数”

7. DECR key

key 的值自减 1 并返回,若 key 不存在,将 key 的值设置为 0,再自减,值必须为“字符串”的“整数”

8. INCRBY key increment

与 INCRBY 相似,自增的值为 increment

9. DECRBY key decrement

与 DECRBY 相似,自减的值为 decrement

10. INCRBYFLOAT key increment

INCRBY 的 “浮点数” 版本

说明:限于篇幅,本文没有列出 Redis “字符串” 存储 byte string (字节串)的相关命令。

链表

Redis 的链表以 “字符串” 作为元素,支持链表的 “双向” 操作:链表左端即为 “头部”,链表右端即为 “尾部”。

1. RPUSH key value [value …]

元素添加到链表的右端,支持批量操作,返回添加后的链表长度,若 key 不存在,则创建空链表,再添加

2. LPUSH key value [value …]

与 LPUSH 相似,元素添加到链表的左端

3. RPOP key

移除并返回链表右端的元素,若 key 不存在或链表为空,返回 nil

4. LPOP key

与 RPOP 相似,移除并返回链表左端的元素

5. BRPOP key [key …] timeout

RPOP 阻塞版本,依次检查链表 key [key …],返回第一个非空链表的 key 及其右端元素,否则,Redis 连接将被阻塞(timeout 0 表示永久阻塞)

6. BLPOP key [key …] timeout

与 BRPOP 相似,移除并返回链表左端的元素

7. RPOPLPUSH source destination

移除 source 链表右端的元素,添加到 destination 链表的左端,并返回元素,若 source 链表为空或不存在,返回 nil,destination 链表无元素添加

8. BRPOPLPUSH source destination timeout

RPOPLPUSH 阻塞版本,若 source 链表为空或不存在时,Redis 连接将被阻塞(timeout 0 表示永久阻塞)

9. LINDEX key index

返回链表位于 index 位置的元素(左端开始,下标 0 开始)(-N 表示右端起的第 N 个位置),下标越界,返回 nil

10. LRANGE key start stop

返回链表位于 start 到 stop 之间的元素,包括 start 和 stop 位置的元素,左端开始,下标定义与 LINDEX 一致

11. LTRIM key start stop

移除链表位于 start 到 stop 之间的元素,范围与 LRANGE 相同

集合

Redis 的集合以 “字符串” 作为元素,确保元素各不相同,集合的元素是 “无序” 的。

1. SADD key member [member …]

添加成员到集合中,支持批量操作,若 key 不存在,则创建空集,再添加,返回实际添加的数量

2. SREM key member [member …]

移除集合中的成员,支持批量操作,返回实际移除的数量

3. SISMEMBER key member

判断 member 是否为集合成员

4. SCARD key

获取集合的基数(集合成员的数量)

5. SMEMBERS key

获取集合的全部成员

6. SPOP key

随机地移除并返回集合的一个元素,若集合不存在或为空集,返回 nil

7. SMOVE source destination member

将成员 member 由 source 集合移动到 destination 集合,若 source 集合不存在或为空集,则无任何行为,返回实际移动的成员数量

8. SDIFF key [key …]

获取 key [key …] 集合的差集,若集合不存在,作为空集参与计算

9. SDIFFSTORE destination key [key …]

将 SDIFF key [key …] 的执行结果写入 destination 集合

10. SINTER key [key …]

获取 key [key …] 集合的交集,若集合不存在,作为空集参与计算

11. SINTERSTORE destination key [key …]

将 SINTER key [key …] 的执行结果写入 destination 集合

12. SUNION key [key …]

获取 key [key …] 集合的并集,若集合不存在,作为空集参与计算

13. SUNIONSTORE destination key [key …]

将 SUNION key [key …] 的执行结果写入 destination 集合

散列表

Redis 的散列表作为 “无序” 的 “键值对”,确保 “键” 各不相同,以 “字符串” 作为 “值”。操作 “散列表” 的命令与 Redis 的 “字符串” 命令,语义非常相近。

1. HMGET key field [field …]

获取散列表中键 field 对应的值,支持批量操作,若 key 不存在或 field 不存在于散列表,返回 nil

2. HMSET field value [field value …]

添加 field -> value 的键值对到散列表,支持批量操作,若 key 不存在,则创建空散列表,再添加

3. HDEL key field [field …]

移除散列表中 field 对应的键值对,支持批量操作,返回实际移除的数量

4. HLEN key

获取散列表中的键值对数量

5. HEXISTS key field

判断键 field 是否存在于散列表中

6. HKEYS key

获取散列表中键的集合

7. HVALS key

获取散列表中值的集合

8. HGETALL key

获取散列表中全部的键值对

9. HINCRBY key field increment

散列表中 field 对应的值自增 increment 并返回,若 key 不存在,则创建空的散列表,若 field 不存在于散列表中,先添加键值对 field -> 0 再自增

值必须为 “字符串” 的 “整数”

10. HINCRBYFLOAT key field increment

HINCRBY 的 “浮点数” 版本

有序集合

相比较于散列表,Redis 的有序集合是 “有序” 的 “键值对”,确保 “键”(“集合成员”)各不相同,以 “浮点数” 作为 “值”(分数),排序由分数的大小决定(若分数相同,使用 “集合成员” 的字典序)。

1. ZADD key score member [score member …]

将成员与分数添加到有序集合,支持批量操作,若 key 不存在,则创建空集合,再添加,若集合成员 member 已存在,则更新其分数。返回实际添加的集合成员数量

2. ZREM key member [member …]

移除有序集合中的成员,支持批量操作,返回实际移除的数量

3. ZCARD key

获取有序集合的基数(集合成员的数量)

4. ZINCRBY key increment member

有序集合的成员 member 分数自增 increment,若 key 不存在,创建空集合,若 member 不属于集合成员,先添加(分数 0),再自增,返回自增后的分数

5. ZCOUNT key min max

获取有序集合中,min <= 成员分数 <= max 的集合成员数量

min 和 max 参数支持前缀 “(”,用于表示 “开区间”

6. ZRANK key member

获取有序集合成员 member 的排序位置,集合成员按照分数升序排序,位置下标由 0 开始

7. ZREVRANK key member

ZRANK 的降序版本

8. ZSCORE key member

获取有序集合成员 member 的分数,若 key 不存在或 member 不存在于有序集合,返回 nil

9. ZRANGE key start stop

获取有序集合中,排序位置位于 start 和 stop 的集合成员,包括 start 和 stop 位置,集合成员的排序和下标与 ZRANK 一致,-N 表示尾部起第 N 个位置

10. ZREVRANGE key start stop

ZRANGE 的降序排序版本

11. ZRANGEBYSCORE key min max

获取有序集合中,min <= 成员分数 <= max 的集合成员,集合成员按照分数升序排序

min 和 max 参数支持前缀 “(”,用于表示 “开区间”

12. ZREVRANGEBYSCORE key min max

ZRANGEBYSCORE 的降序排列版本

13. ZREMRANGEBYRANK key start stop

移除 ZRANGE key start stop 获得的集合成员,返回移除的集合成员数量

14. ZREMRANGEBYSCORE

移除 ZRANGEBYSCORE key min max 获得的集合成员,返回移除的集合成员数量

15. ZINTERSTORE destination numkeys key [key …] [AGGREGATE SUM|MIN|MAX]

计算 key [key …] (numkeys 标示集合数量)的交集,[AGGREGATE SUM|MIN|MAX] 标示集合成员聚合时分数的处理方式:和(默认)、最大值、最小值,计算结果存储到 destination

16. ZUNIONSTORE destination numkeys key [key …] [AGGREGATE SUM|MIN|MAX]

ZINTERSTORE 的 “并集” 版本

其他 Redis 命令

Redis 事务

Redis 通过 MULTI 与 EXEC 命令实现基本的事务支持:

  1. 执行 MULTI 命令:开启事务

  2. 执行其他命令:命令加入 “队列”

  3. 执行 EXEC 命令:依次执行 “队列” 中的命令

  4. 执行 EXEC 命令前,通过 DISCARD 即可取消事务(“队列” 中的命令将被丢弃)

Redis 确保:MULTI 与 EXEC 之间 “队列” 的执行是 “独立” 且 “隔离” 的操作,在此期间,Redis 不会执行任何其他的命令。

相比较于 “关系型数据库”:

  1. EXEC命令前,无法获得任何命令的执行结果(即:不能依赖 “队列” 中命令的执行结果决定后续行为)

  2. 不支持“回滚”:即使 “队列” 出现执行失败的命令,其他命令仍然继续执行

说明:Redis 将 “发送多个命令,等待所有回复” 的行为称为 “流水线”,除了 “事务”,Redis 支持 “非事务” 的 “流水线”。

Redis Key 过期时间

Redis 支持 “过期时间”,允许一个 Redis Key 在特定的时限后 “自动被删除”。

1. EXPIRE key seconds

设置 key 于设置的时间(秒)后过期,若过期时间为负数,则移除 key

2. PEXPIRE key milliseconds

毫秒级的 EXPIRE

3. EXPIREAT

设置 key 过期的 Unix 时间(秒),若过期的 Unix 时间小于当前的 Unix 时间,则移除 key

4. PEXPIREAT key milliseconds-timestamp

毫秒级 Unix 时间的 EXPIREAT

5. TTL key

获取 key 距离过期的时间(秒),-1 表示 key 未设置过期时间,-2 表示 key 不存在

6. PTTL key

毫秒级的 PTTL

7. PERSIST key

移除 key 的过期时间设置。此外,删除或覆盖 key 存储内容的命令(例如:DELSET 以及 *STORE 命令)亦将清除 key 的过期时间。

基于 Redis 构建应用程序组件

开始之前,我们首先构建一个 get_connection() 方法,用于获取与 Redis 的连接。

<pre> <code> def get_connection(): return redis.Redis.from_url('redis://127.0.0.1:6379/') </code> </pre>

计数器

计数器的基础需求

计数器能够用于业务统计、接口频次控制等场景,通用计数器的基础需求:

  1. 支持自定义的计数器标示符

  2. 支持自定义的计数时间窗口

实现

<pre> <code> def count(identity, count = 1, window = 1, conn = None): if conn is None: conn = get_connection() counter_key = 'counter_%d_%s' % (window, identity) counter_hash = 'hash_%d' %  ((int(time.time()) / window), ) return conn.hincrby(counter_key, counter_hash, count) </code> </pre>

代码所示,选择 Redis 的散列表构建计数器:

  1. 计数器标示符(identity)和时间窗口(window)作为 Redis 的 Key

  2. 当前时间与时间窗口计算获得散列表的键,确保相同时间窗口内的时间能够获得相同的散列键

特别说明:基于 Redis 命令的原子性,确保了计数器的原子性以及计数的单调增长

日志记录

基础需求

分布式的系统中,必须提供统一的日志收集,其基础需求:

  1. 提供统一的日志收集接口,通过各个业务系统标示进行日志隔离

  2. 提供标准的日志查看接口(通过业务系统标示和数量,查看最新的日志)

实现

<pre> <code> # # 日志收集接口 # def log(app, message, conn = None): if conn is None: conn = get_connection() app_log_key = 'app_log_%s' % (app, ) return conn.rpush(app_log_key, '%s [%s] %s %s' % (app, socket.gethostname(), time.asctime(), message)) </code> </pre> <pre> <code> # # 查看日志接口 # def get_log(app, size, conn = None): if conn is None: conn = get_connection() app_log_key = 'app_log_%s' % (app, ) return conn.lrange(app_log_key, -1 - (size - 1), -1); </code> </pre>

收集的日志格式类似于:

<pre> ad-api [server] Fri Dec  1 11:30:14 2017 processing request... </pre>

代码所示,选择 Redis 的链表构建中心化的日志收集:

  1. 基于业务系统标识(app)构建 Redis 的 Key

  2. 日志收集使用 Redis 链表的 RPUSH 命令:日志添加到链表的右端

  3. 日志查看使用 LRANGE 命令(注意 LRANGE 的参数)

特别说明:除了 RPUSH 命令,Redis 链表同时支持 LPUSH 命令,能够将数据添加到链表的左端,然而,Redis 不支持 “RRANGE“,因此,不能使用 LPUSH 命令进行日志收集。

统计分析

考虑一个 “接口调用延时” 的统计分析,其基础需求:

  1. 提供统一的 “接口调用延时” “上报” 接口:接口、调用延时

  2. 基于自定义的时间段(分钟级精度)获取 “接口调用” 分析:最大值、最小值、平均值

<pre> <code> # # “接口调用延时” 数据上报 # def report(interface, time_delay, conn = None):  if conn is None:    conn = get_connection()  pipeline = conn.pipeline()  temp_key_for_max = str(uuid.uuid4())  temp_key_for_min = str(uuid.uuid4())  raw_key = 'statistics_%s_%d' % (interface, (int(time.time()) / 60))  statistics_key_for_max = '%s_for_max' % (raw_key, )  statistics_key_for_min = '%s_for_min' % (raw_key, )  statistics_key_for_sum_count = '%s_for_sum_count' % (raw_key, )  pipeline.multi()  pipeline.zadd(temp_key_for_max, 'max', time_delay)  pipeline.zunionstore(statistics_key_for_max, [statistics_key_for_max, temp_key_for_max], aggregate = 'max')  pipeline.zadd(temp_key_for_min, 'min', time_delay)  pipeline.zunionstore(statistics_key_for_min, [statistics_key_for_min, temp_key_for_min], aggregate = 'min')  pipeline.zincrby(statistics_key_for_sum_count, 'count', 1)  pipeline.zincrby(statistics_key_for_sum_count, 'sum', time_delay)  pipeline.delete(temp_key_for_max, temp_key_for_min)  pipeline.execute() </code> </pre> <pre> <code> # # 获取 “接口调用” 数据统计 # def get_report(interface, begin_time, end_time, conn = None):  if conn is None:    conn = get_connection()  temp_key_for_max = str(uuid.uuid4())  temp_key_for_min = str(uuid.uuid4())  temp_key_for_sum_count = str(uuid.uuid4())  keys_for_max = []  keys_for_min = []  keys_for_sum_count = []  for i in range(int(begin_time / 60), int(end_time / 60) + 1):    raw_key = 'statistics_%s_%d' % (interface, i)    keys_for_max.append('%s_for_max' % (raw_key, ))    keys_for_min.append('%s_for_min' % (raw_key, ))    keys_for_sum_count.append('%s_for_sum_count' % (raw_key, ))  conn.zunionstore(temp_key_for_max, keys_for_max, aggregate = 'max')  conn.zunionstore(temp_key_for_min, keys_for_min, aggregate = 'min')  conn.zunionstore(temp_key_for_sum_count, keys_for_sum_count)  min_time_delay = conn.zscore(temp_key_for_min, 'min')  max_time_delay = conn.zscore(temp_key_for_max, 'max')  sum_time_delay = conn.zscore(temp_key_for_sum_count, 'sum')  count = conn.zscore(temp_key_for_sum_count, 'count')  conn.delete(temp_key_for_max, temp_key_for_min, temp_key_for_sum_count)  return { 'min' : min_time_delay, 'max' : max_time_delay, 'average' : float(sum_time_delay) / int(count) } </code> </pre>

代码所示,使用 Redis 的散列表构建配置中心:

  1. 开发环境、测试环境、生产环境的配置中心进行 “物理隔离”

  2. 基于业务系统标识(app)构建 Redis 的 Key,实现业务系统的配置互相隔离

  3. 散列表的 “键 - 值” 作为配置元组的 “Key-Value”

  • 使用 HSET 命令实现配置写入

  • 使用 HGETALL 获取业务系统的全部配置

写在最后

请在阅读完本文之后,尝试思考以下问题,最终的答案我们线上交流见 ^_^

1. 本文实现的计数器,如何实现数据清理,例如:清理 1 天前的数据?

2. 本文实现的日志记录,如何实现日志数据的滚动?

3. 本文实现的分布式锁,是否可能存在漏洞?

4. 本文实现的信号量,是否可能存在漏洞?

5. 本文实现的 “消息队列”,能够变更为 LIFO 的 “写入” & “读取” 方式?

近期热文

深度学习在摄影技术中的应用与发展

这样做,你的面试成功率将达到 90%

如何用 TensorFlow 让一切看起来更美?

Web 安全:前端攻击 XSS 深入解析

300万粉丝,全国最大的线上抽奖平台,深度解析

免费福利

「阅读原文」看交流实录,你想知道的都在这里

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存