目录
- 什么是 Redis
- 前置知识
- String
- 介绍
- 常用命令
- 使用场景
- 底层实现
- SDS 结构体
- List
- 介绍
- 常用命令
- 使用场景
- 底层实现
- ZipList
- QuickList
- Hash
- 介绍
- 常用命令
- 使用场景
- 底层实现
- Dict
- Dict 的 rehash
- Set
- 介绍
- 常用命令
- 使用场景
- 底层实现
- Intset
- ZSet
- 介绍
- 常用命令
- 使用场景
- 底层实现
- SkipList
什么是 Redis
Redis 是基于内存的 K-V 数据库,常用于缓存、消息队列,分布式锁等场景,并且提供了常见的数据结构:字符串、哈希、列表、集合、带排序的集合
Redis 数据类型详解
前置知识
Redis中的任意数据类型的键和值都会被封装为一个 RedisObject
typedef struct redisObject {
unsigned type:4;
unsigned encoding:4;
unsigned lru:LRU_BITS; /* LRU time (relative to global lru_clock) or
* LFU data (least significant 8 bits frequency
* and most significant 16 bits access time). */
int refcount;
void *ptr;
} robj;
type
:指具体的对象类型
encoding
:底层编码方式
lru
:记录最后一次被访问的时间
refcount
:对象引用计数器,计数器为 0 时表示没有被引用,可以回收
ptr
:实际存储的值
五种数据结构
Redis中会根据存储的数据类型不同,选择不同的编码方式。每种数据类型的使用的编码方式如下:
数据类型 | 编码方式 |
OBJ_STRING | int、embstr、raw |
OBJ_LIST | LinkedList和ZipList(3.2以前)、QuickList(3.2以后) |
OBJ_SET | intset、HT |
OBJ_ZSET | ZipList、HT、SkipList |
OBJ_HASH | ZipList、HT |
Redis 的编码方式
Redis 中会根据存储的数据类型不同,选择不同的编码方式,共包含 11 种不同类型:
编号 | 编码方式 | 说明 |
0 | OBJ_ENCODING_RAW | raw编码动态字符串 |
1 | OBJ_ENCODING_INT | long类型的整数的字符串 |
2 | OBJ_ENCODING_HT | hash表(字典dict) |
3 | OBJ_ENCODING_ZIPMAP | 已废弃 |
4 | OBJ_ENCODING_LINKEDLIST | 双端链表 |
5 | OBJ_ENCODING_ZIPLIST | 压缩列表 |
6 | OBJ_ENCODING_INTSET | 整数集合 |
7 | OBJ_ENCODING_SKIPLIST | 跳表 |
8 | OBJ_ENCODING_EMBSTR | embstr的动态字符串 |
9 | OBJ_ENCODING_QUICKLIST | 快速列表 |
10 | OBJ_ENCODING_STREAM | Stream流 |
String
介绍
字符串 (string) 其实是最简单的数据结构,不仅可以存储字符串,还可以存储整数、浮点数、二进制数据等;
常用命令
set key value
:添加一个 string 类型的键值对
get key
:获取 key 对应的 value
mset
:批量设置对个键值对
mget
:批量获取多个键值对
incr
:让一个整形自增 1
setnx
:添加一个 string 类型的字符串,前提是这个 key 不存在
使用场景
缓存对象
直接缓存整个 json 字符串对象 SET user:1 '{"name":"ezreal", "age":18}'
缓存更新策略
- 使用 redis 的内存淘汰机制;
- 超时剔除,给数据添加 TTL 过期时间;
- 客户端主动更新缓存;
分布式锁
使用 setnx 指令可以实现分布式锁,若设置值成功则表示获取锁成功,设置失败则获取锁失败,注意要设置该 key 的过期时间,防止一直抢占该锁
常规计数
可以通过 incr
指令来统计数量,前提设置的 value 需要是整形
共享 session
我们可以把多台服务器上的 session 统一放到 redis 中,无论向集群哪台服务器发送消息,都保证去 redis 中获取 session 的信息
底层实现
redis 的底层实现是 SDS 字符串
SDS 结构体
struct __attribute__ ((__packed__)) sdshdr8 {
uint8_t len; /* buf已保存的字符串字节数,不包含结束标示 */
uint8_t alloc; /* buf申请的总的字节数,不包含结束标示 */
unsigned char flags; /* 不同的SDS头类型 */
char buf[];
};
len
:该字符串的 长度
alloc
:buf 申请的 总字节数
buf
:实际存储数据的位置
flag
:SDS 头类型,用来控制SDS头大小 :SDS_TYPE_5
,SDS_TYPE_8
,SDS_TYPE_16
,SDS_TYPE_32
,SDS_TYPE_64
整形的存储
如果存储的是一个整形,则则直接存储在 RedisObject 中即可;
SDS 动态扩容
假如现在在 name 字符串后加入 ezreal,形成新的字符串 'nameezreal'
- 若新字符串的长度小于 1M,则新的内存空间为 新字符串 的长度的两倍 + 1;
- 若新字符串的长度大于 1M,则新的内存空间为 新字符串 的长度 + 1M + 1,称为内存预分配;
List
介绍
列表是一个有序的字符串集合,可以通过下标索引来访问指定位置的字符串,还可以在列表头部或者尾部来添加或删除元素
常用命令
LPUSH KEY
:向列表左侧插入元素
RPUSH KEY
:向列表右侧插入元素
LPOP KEY
:向列表左侧弹出元素
RPOP KEY
:向列表右侧弹出元素
LRANGE KEY star end
:获取指定范围内的 key
BLPOP / BRPOP
:与 LPOP / RPOP 类似,只不过在没有元素时会等待
使用场景
消息队列
消息队列实现的三大要素:消息保序,处理重复的消息,保证消息的可靠性
- 消息保序
我们可以基于 LPUSH / RPOP 的方法来实现一个队列,消息先进先出,保证消息的顺序;
当有一个缺点,消费者只能不断的轮询该队列是否有消息,因为消费者新增一条消息后是不能通知消费者的,所以消费者要不断轮询 while(1)
,不断的占用 CPU。我们可以使用 BRPOP
来优化,消费者没有获取到消息时,会自动阻塞,直到生产者放入新的消息。
- 处理重复的消息
我们可以为每个消息生成一个 唯一 的消息 ID,每次消费消息时,从数据库中判断该消息是否已经被处理了,防止消息的重复消息。
- 保证消息可靠性
当消息被消费者获取后,没有处理完消息,机器就宕机了,这样不仅消息没有完全被消费,List 中的消息也丢失了,从而导致这个任务永远丢失了;
我们可以使用 Redis 中的BRPOPLPUSH
命令,作用是把该消息从 List 中拿出来,同时还会把该消息放到另一个 List 中作留存。
底层实现
在 Redis 3.2 之前,List 的底层实现是压缩列表 ( ZipList
) 或 双向链表
在 Redis 3.2 之后,List 的底层实现是 QuickList
ZipList
ZipList 又叫压缩列表,类似 "双向链表",但其不是链表。它由一段连续的内存块组成,可以在列表头部或者尾部添加或者删除元素。
zlbytes
:整个 ZipList 的 内存字节数,占用 4 字节
zltail
:从 ZipList 的起始地址到最后一个 entry 的内存字节数,占用 4 字节
zllen
:ZipList 中 entry 的个数,占用 2 字节
zlend
:结束标识 0xff
,占用 1 字节
ZipEntry结构
previous_entry_length
:上一个 entry 的长度,占用 1个 或 5个字节
- 若上一个 entry 的长度小于 256 字节,则 encoding 使用 1 个字节来存储
- 若上一个 entry 的长度大于 256 字节,则 encoding 使用 5 个字节来存储
encoding
:记录 content 的类型 [ 整形 or 字符串 ] 和长度,占用 1个,2个 或者 5个字节
content
:实际的内容
Encoding 编码
Entry 中的 encoding 编码分为字符串和整数两种:
字符串:如果 encoding 是以“00”“01”或者“10”开头,则证明 content 是字符串
整数:如果encoding是以“11”开始,则证明content是整数,且encoding固定只占用1个字节
编码 | 编码长度 | 整数类型 |
11000000 | 1 | int16_t(2 bytes) |
11010000 | 1 | int32_t(4 bytes) |
11100000 | 1 | int64_t(8 bytes) |
11110000 | 1 | 24位有符整数(3 bytes) |
11111110 | 1 | 8位有符整数(1 bytes) |
1111xxxx | 1 | 直接在xxxx位置保存数值,范围从0001~1101,减1后结果为实际值 |
可见,ZipList 通过记录上一个entry 的长度 和 自身entry 的长度(length + encoding + content),从而可以快速得到上一个 entry 和下一个 entry ,从而实现从头到尾 或 从尾到头的遍历。
QuickList
QuickList 是一个双向链表,它的每一个节点都是 ZipList
为了避免 QuickList 中的每个 ZipList 中 entry 过多,Redis 提供了一个配置项:list-max-ziplist-size
来限制。
如果值为正,则代表 ZipList 的允许的 entry 个数的最大值;
如果值为负,则代表ZipList的最大内存大小,分5种情况:
- -1:每个 ZipList 的内存占用不能超过 4kb
- -2:每个 ZipList 的内存占用不能超过 8kb
- -3:每个 ZipList 的内存占用不能超过 16kb
- -4:每个 ZipList 的内存占用不能超过 32kb
- -5:每个 ZipList 的内存占用不能超过 64kb
其默认值为 -2:
以下是 QuickList 的和 QuickListNode 的结构源码:
QuickList
typedef struct quicklist {
quicklistNode *head; // 头结点指针
quicklistNode *tail; // 尾结点指针
unsigned long count; /* total count of all entries in all ziplists */
unsigned long len; /* number of quicklistNodes */
int fill : QL_FILL_BITS; /* fill factor for individual nodes */
unsigned int compress : QL_COMP_BITS; /* depth of end nodes not to compress;0=off */
unsigned int bookmark_count: QL_BM_BITS;
quicklistBookmark bookmarks[];
} quicklist;
QuickListNode
typedef struct quicklistNode {
struct quicklistNode *prev; // 前一个指针
struct quicklistNode *next; // 后一个指针
unsigned char *zl; // 当前结点的ziplist指针
unsigned int sz; /* ziplist size in bytes */
unsigned int count : 16; /* count of items in ziplist */
unsigned int encoding : 2; /* RAW==1 or LZF==2 */
unsigned int container : 2; /* NONE==1 or ZIPLIST==2 */
unsigned int recompress : 1; /* was this node previous compressed? */
unsigned int attempted_compress : 1; /* node can't compress; too small */
unsigned int extra : 10; /* more bits to steal for future usage */
} quicklistNode;
整体结构
Hash
介绍
Hash 是一个键值对的集合,形如:key:{field1: value1,field1: value1}
,非常适合存储一个对象
常用命令
# 存储一个哈希表key的键值
HSET key field value
# 获取哈希表key对应的field键值
HGET key field
# 在一个哈希表key中存储多个键值对
HMSET key field value [field value...]
# 批量获取哈希表key中多个field键值
HMGET key field [field ...]
# 删除哈希表key中的field键值
HDEL key field [field ...]
# 返回哈希表key中field的数量
HLEN key
# 返回哈希表key中所有的键值
HGETALL key
# 为哈希表key中field键的值加上增量n
HINCRBY key field n
使用场景
缓存对象
例如下图,对于使用 string + json
还是用 hash
来存储,建议优先选择 string + json
,对于某些字段经常改变的话,可以考虑使用 hash
购物车
以用户 id 为 key,商品 id 为 field,商品数量为 value,恰好构成了购物车的 3 个要素,如下图所示。
涉及的命令如下:
- 添加商品:HSET cart:{用户id} {商品id} 1
- 添加数量:HINCRBY cart:{用户id} {商品id} 1
- 商品总数:HLEN cart:{用户id}
- 删除商品:HDEL cart:{用户id} {商品id}
- 获取购物车所有商品:HGETALL cart:{用户id}
当前仅仅是将商品 ID 存储到了 Redis 中,在回显商品具体信息的时候,还需要拿着商品 id 查询一次数据库,获取完整的商品的信息。
底层实现
Hash 的底层实现可以是 ZipList 或者 Dict
- Hash结构默认采用
ZipList
编码,用以节省内存。ZipList
中相邻的两个entry
分别保存field
和value
- 当 ZipList 的元素个数小于 512(默认)且每个 Entry 的大小小于 64 字节(默认),则会使用 ZipList 来作为 Hash 的底层实现,否则则使用 Dict
Dict
Dict 中维护一个 哈希表 DitHashTable
,里面有一个 DictEntry 数组,用来存储实际的 key-value
,每个槽位下的 DictEntry 都会形成一个链表,处理哈希冲突
向 Dict 插入一个元素时,先会根据 key 计算出一个哈希值 h,然后计算 h & sizemask 得到数组下标,最后加入该 DictEntry 即可
数据结构体
字典(Dict)
typedef struct dict {
dictType *type; // dict 类型,不同的哈希函数
void *privdata; // 私有数据,做特殊hash的时候用
dictht ht[2]; // 包含两个hash表,一个是当前数据,另个为空,rehash的时候用
long rehashidx; // rehash的进度
int16_t pauserehash; // rehash是否暂停
} dict;
哈希表(DictHashTable)
typedef struct dictht {
dictEntry **table; // 指向entry 的数组
unsigned long size; // 哈希表大小
unsigned long sizemask; // 掩码:size - 1
unsigned long used; // entry 的个数
} dictht;
哈希节点(DictEntry)
typedef struct dictEntry {
void *key; // 键
union {
void *val;
uint64_t u64;
int64_t s64;
double d;
} v;// 值
struct dictEntry *next;// 下一个结点
} dictEntry;
Dict 的 rehash
不管是扩容还是收缩,必定会创建新的哈希表,导致哈希表的 size 和 sizemask 变化,而 key 的查询与 sizemask 有关。因此必须对哈希表中的每一个 key 重新计算索引,插入新的哈希表,这个过程称为 rehash。过程是这样的:
- 计算新hash表的realeSize,值取决于当前要做的是扩容还是收缩:
- 如果是扩容,则新size为第一个大于等于
dict.ht[0].used + 1
的2^n - 如果是收缩,则新size为第一个大于等于
dict.ht[0].used
的2^n (不得小于4) - 按照新的 realeSize 申请内存空间,创建 dictht,并赋值给 dict.ht[1]
- 设置 dict.rehashidx = 0,标示开始 rehash
- 将 dict.ht[0] 中的每一个 dictEntry 都 rehash 到 dict.ht[1]
- 将 dict.ht[1] 赋值给 dict.ht[0],给 dict.ht[1] 初始化为空哈希表,释放原来的dict.ht[0]的内存
- 将 rehashidx 赋值为 -1,代表 rehash 结束
- 在rehash过程中,新增操作,则直接写入ht[1] ,查询、修改和删除则会在dict.ht[0]和dict.ht[1]依次查找并执行。这样可以确保 ht[0] 的数据只减不增,随着 rehash 最终为空
Set
介绍
Set 是 Redis 中的无序集合,可以保证元素的唯一性,但不一定保证元素的有序性,可以对 set 取交集,并集等操作
常用命令
Set 常用操作
# 往集合key中存入元素,元素存在则忽略,若key不存在则新建
SADD key member [member ...]
# 从集合key中删除元素
SREM key member [member ...]
# 获取集合key中所有元素
SMEMBERS key
# 获取集合key中的元素个数
SCARD key
# 判断member元素是否存在于集合key中
SISMEMBER key member
# 从集合key中随机选出count个元素,元素不从key中删除
SRANDMEMBER key [count]
# 从集合key中随机选出count个元素,元素从key中删除
SPOP key [count]
Set 集合操作
# 交集运算
SINTER key [key ...]
# 将交集结果存入新集合destination中
SINTERSTORE destination key [key ...]
# 并集运算
SUNION key [key ...]
# 将并集结果存入新集合destination中
SUNIONSTORE destination key [key ...]
# 差集运算
SDIFF key [key ...]
# 将差集结果存入新集合destination中
SDIFFSTORE destination key [key ...]
使用场景
点赞
使用 set 集合来存储,key 为文章 id,value为 用户 id 即可,获取该文章的点赞数量取该 set 中 id 的总数即可
关注
使用 set 集合来存储,key 为用户 id,value 为该用户关注的用户的 id,可以对两个 set 取并集,就可以找出它们的共同关注
抽奖
使用 set 集合来存储,key 为活动 id,value 为参见该活动的用户
- 若不可重复抽奖,则使用
SPOP key [count]
来随机获取count
个用户,并从 set 中移除 - 若可以重复抽奖,则使用
SRANDMEMBER key [count]
来随机获取 count 个用户,但不会从 set 中移除
底层实现
Set 底层是通过 IntSet(整数集合)或者 Dict 来实现的
- 用 Dict 来存储实际上和 Hash 的结构一样,只不过把 value 值统一存储为 null 即可
- 当数据都是整数时,且数据的数量不超过
set-max-intset-entries
时,会使用intset
来存储数据 - 否则则使用
Dict
来存储
Intset
Intset 是 Redis 中的一种实现模式,基于整数数组来实现,具备长度可变,有序的特征
结构体
typedef struct intset {
uint32_t encoding; // 编码方式
uint32_t length; // 长度
int8_t contents[]; // 存储位置
} intset;
encoding
:表示存储整数的大小
/* Note that these encodings are ordered, so:
* INTSET_ENC_INT16 < INTSET_ENC_INT32 < INTSET_ENC_INT64. */
#define INTSET_ENC_INT16 (sizeof(int16_t))
#define INTSET_ENC_INT32 (sizeof(int32_t))
#define INTSET_ENC_INT64 (sizeof(int64_t))
length
:数组的长度
contents
:存储实际的内容
encoding
:4字节
length
:4字节
contents
:2字节 * 3 = 6 字节
扩容过程
我们向该其中添加一个数字:50000,这个数字超出了 int16_t 的范围,intset 会自动升级编码方式到合适的大小。
以当前案例来说流程如下:
- 升级编码为
INTSET_ENC_INT32
, 每个整数占 4 字节,并按照新的编码方式及元素个数扩容数组 - 倒序依次将数组中的元素拷贝到扩容后的正确位置
- 将待添加的元素放入数组末尾
- 最后,将 inset 的 encoding 属性改为
INTSET_ENC_INT32
,将length
属性改为 4
小结:
- intset 可以保证 contents 的整数有序,唯一
- 因为 contents 数组有序,所以可以使用 二分查找 来快速找到目标值
- 具备类型升级机制,可以节省内存空间
ZSet
介绍
ZSet 是一个有序集合类型,比 Set 多出了一个 score 字段,该字段用来对 value 值进行排序,保证 ZSet 中的值是有序的
所以每一个元素都有两个值,一个 score,用来排序,一个 value 存储实际的值
常用命令
ZSet 常用操作
# 往有序集合key中加入带分值元素
ZADD key score member [[score member]...]
# 往有序集合key中删除元素
ZREM key member [member...]
# 返回有序集合key中元素member的分值
ZSCORE key member
# 返回有序集合key中元素个数
ZCARD key
# 为有序集合key中元素member的分值加上increment
ZINCRBY key increment member
# 正序获取有序集合key从start下标到stop下标的元素
ZRANGE key start stop [WITHSCORES]
# 倒序获取有序集合key从start下标到stop下标的元素
ZREVRANGE key start stop [WITHSCORES]
# 返回有序集合中指定分数区间内的成员,分数由低到高排序。
ZRANGEBYSCORE key min max [WITHSCORES] [LIMIT offset count]
# 返回指定成员区间内的成员,按字典正序排列, 分数必须相同。
ZRANGEBYLEX key min max [LIMIT offset count]
# 返回指定成员区间内的成员,按字典倒序排列, 分数必须相同
ZREVRANGEBYLEX key max min [LIMIT offset count]
ZSet 运算操作
# 并集计算(相同元素分值相加),numberkeys一共多少个key,WEIGHTS每个key对应的分值乘积
ZUNIONSTORE destkey numberkeys key [key...]
# 交集计算(相同元素分值相加),numberkeys一共多少个key,WEIGHTS每个key对应的分值乘积
ZINTERSTORE destkey numberkeys key [key...]
使用场景
排行榜
以文章排行榜为例子,使用 article:rank 作为 key,点赞数量作为 score,文章的 id 作为 value 即可,根据 score 从大到小进行排序,则可以得到一个 文章热度的排行榜。
底层实现
ZSet 使用 压缩列表 或者 跳表 + 字典来实现
- 可以根据 key 找到 value;
- 保证 value 是唯一的;
- 可以根据 value 查找分数;
Ziplist 本身是没有排序功能的,而且没有键值对的概念,需要通过编码来实现:
- ziplist 是连续内存,score 和 value 是相互依靠的两个 entry;
- ziplist 本身不能排序,所以只能通过 score 来对 entry 从小到大排序;
当同时满足一下条件时,会选择使用 Ziplist
- 元素的数量小于
zset_max_ziplist_entries
,默认是 256 - 每个元素的内存大小小于
zset_max_ziplist_value
字节,默认是 64 KB
否则使用 跳表 +Dict
使用 Dict 存储 value 和 score 的映射关系;
使用 跳表 来有序的存储 score 和 value 值,便于范围查找
SkipList
跳表是一个并联多个有序链表的数据结构,它可以在 O(logn) 的时间复杂度内完成插入、删除、增加的操作;
特点
- 跳表是一个双向链表,每个节点包含 score 和 element 值
- 节点按 score 进行排序
- 每个节点都可以包含多层指针,层数是 1 到 32 之间的随机数
- 不同层指针到下一个节点的跨度不同,层级越高,跨度越大
- 增删改查效率与红黑树基本一致,实现却更简单
跳表代码
public class SkipList {
private final int MAX_LEVEL = 32;
private final double factor = 0.25;
private int level = 1;
/**
* 头结点
*/
private final Node head = new Node(null, MAX_LEVEL);
static class Node {
Integer value;
Node[] next;
int size;
public Node(Integer value, int size) {
this.value = value;
this.size = size;
this.next = new Node[size];
}
}
public boolean search(int value) {
Node p = head;
for (int i = level - 1; i >= 0; i--) {
while (p.next[i] != null && p.next[i].value <= value) {
if (p.next[i].value == value) {
return true;
}
p = p.next[i];
}
}
return false;
}
public void insert(int value) {
int randomLevel = randomLevel();
Node newNode = new Node(value, randomLevel);
Node p = head;
for (int i = level - 1; i >= 0; i--) {
while (p.next[i] != null && p.next[i].value <= value) {
p = p.next[i];
}
if (p.next[i] == null) {
p.next[i] = newNode;
} else {
newNode.next[i] = p.next[i];
p.next[i] = newNode;
}
}
if (randomLevel > level) {
for (int i = level ; i < randomLevel ; i++) {
head.next[i] = newNode;
}
level = randomLevel;
}
}
public boolean delete(int value) {
boolean contain = search(value);
Node p = head;
Node h = null;
if (contain) {
for (int i = level - 1; i >= 0; i--) {
while (p.next[i] != null && p.next[i].value <= value) {
h = p;
p = p.next[i];
}
h.next[i] = p.next[i];
}
}
return contain;
}
private int randomLevel() {
int level = 1;
Random random = new Random();
while (random.nextDouble() <= factor && level <= MAX_LEVEL) {
level++;
}
return level;
}
}