做单元测试的时候,我们需要打桩mock掉一些中间件,miniredis是mock掉redis的一个利器它既可以通过非网络本地mock,也可通过tcp请求,经过redis协议完整mockredis代码,首先看下如何使用:
package main
import (
"github.com/alicebob/miniredis/v2"
"github.com/gomodule/redigo/redis"
"fmt"
"testing"
"time"
)
func TestSomething(t *testing.T) {
s := miniredis.RunT(t)
// Optionally set some keys your code expects:
s.Set("foo", "bar")
s.HSet("some", "other", "key")
// Run your code and see if it behaves.
// An example using the redigo library from "github.com/gomodule/redigo/redis":
c, err := redis.Dial("tcp", s.Addr())
fmt.Println(err)
_, err = c.Do("SET", "foo", "bar")
// Optionally check values in redis...
if got, err := s.Get("foo"); err != nil || got != "bar" {
t.Error("'foo' has the wrong value")
}
// ... or use a helper for that:
s.CheckGet(t, "foo", "bar")
// TTL and expiration:
s.Set("foo", "bar")
s.SetTTL("foo", 10*time.Second)
s.FastForward(11 * time.Second)
if s.Exists("foo") {
t.Fatal("'foo' should not have existed anymore")
}
}
可以通过 s := miniredis.RunT(t)启动一个模拟的redis server服务器,然后直接本地set值 s.Set("foo", "bar")
当然也可以通过redis协议进行远程设置,比如我们通过 "github.com/gomodule/redigo/redis"的redis客户端来进行设置。
c, err := redis.Dial("tcp", s.Addr())
_, err = c.Do("SET", "foo", "bar")
接着我们分析下看它的源码是如何实现的:
s := miniredis.RunT(t)本质是是启动了一个tcp服务器,然后注册了一系列处理函数,根据每个不同的redis命令进行了不同的处理,server的初始化位于github.com/alicebob/miniredis/v2@v2.30.2/miniredis.go
func RunT(t Tester) *Miniredis {
m := NewMiniRedis()
if err := m.Start(); err != nil {
t.Cleanup(m.Close)
func NewMiniRedis() *Miniredis {
m := Miniredis{
dbs: map[int]*RedisDB{},
scripts: map[string]string{},
subscribers: map[*Subscriber]struct{}{},
}
m.signal = sync.NewCond(&m)
核心结构体定义如下:
type RedisDB struct {
master *Miniredis // pointer to the lock in Miniredis
id int // db id
keys map[string]string // Master map of keys with their type
stringKeys map[string]string // GET/SET &c. keys
hashKeys map[string]hashKey // MGET/MSET &c. keys
listKeys map[string]listKey // LPUSH &c. keys
setKeys map[string]setKey // SADD &c. keys
hllKeys map[string]*hll // PFADD &c. keys
sortedsetKeys map[string]sortedSet // ZADD &c. keys
streamKeys map[string]*streamKey // XADD &c. keys
ttl map[string]time.Duration // effective TTL values
keyVersion map[string]uint // used to watch values
}
type Miniredis struct {
sync.Mutex
srv *server.Server
port int
passwords map[string]string // username password
dbs map[int]*RedisDB
selectedDB int // DB id used in the direct Get(), Set() &c.
scripts map[string]string // sha1 -> lua src
signal *sync.Cond
now time.Time // time.Now() if not set.
subscribers map[*Subscriber]struct{}
rand *rand.Rand
Ctx context.Context
CtxCancel context.CancelFunc
}
然后启动server开始服务
func (m *Miniredis) Start() error {
s, err := server.NewServer(fmt.Sprintf("127.0.0.1:%d", m.port))
if err != nil {
return err
}
return m.start(s)
}
github.com/alicebob/miniredis/v2@v2.30.2/server/server.go
type Server struct {
l net.Listener
cmds map[string]Cmd
preHook Hook
peers map[net.Conn]struct{}
mu sync.Mutex
wg sync.WaitGroup
infoConns int
infoCmds int
}
每接受一个请求,起一个协程来处理请求:
func newServer(l net.Listener) *Server {
s := Server{
cmds: map[string]Cmd{},
peers: map[net.Conn]struct{}{},
l: l,
}
s.wg.Add(1)
go func() {
defer s.wg.Done()
s.serve(l)
func (s *Server) serve(l net.Listener) {
for {
conn, err := l.Accept()
if err != nil {
return
}
s.ServeConn(conn)
func (s *Server) ServeConn(conn net.Conn) {
s.peers[conn] = struct{}{}
go func() {
defer s.wg.Done()
defer conn.Close()
s.servePeer(conn)
解析请求命令,然后分发进行处理:
func (s *Server) servePeer(c net.Conn) {
go func() {
defer close(readCh)
for {
args, err := readArray(r)
if err != nil {
peer.Close()
return
}
readCh <- args
for args := range readCh {
s.Dispatch(peer, args)
peer.Flush()
func (s *Server) Dispatch(c *Peer, args []string) {
cb, ok := s.cmds[cmdUp]
cb(c, cmdUp, args)
github.com/alicebob/miniredis/v2@v2.30.2/server/proto.go其中会解析redis协议得到一个个命令:
func readArray(rd *bufio.Reader) ([]string, error) {
line, err := rd.ReadString('\n')
switch line[0] {
default:
return nil, ErrProtocol
case '*':
比如执行命令s.Set("foo", "bar")的过程,本质上是将key对应的val存入一个map,其具体过程如下:
github.com/alicebob/miniredis/v2@v2.30.2/direct.go
func (m *Miniredis) Set(k, v string) error {
return m.DB(m.selectedDB).Set(k, v)
}
func (m *Miniredis) DB(i int) *RedisDB {
m.Lock()
defer m.Unlock()
return m.db(i)
}
// get DB. No locks!
func (m *Miniredis) db(i int) *RedisDB {
if db, ok := m.dbs[i]; ok {
return db
}
db := newRedisDB(i, m) // main miniredis has our mutex.
m.dbs[i] = &db
return &db
}
github.com/alicebob/miniredis/v2@v2.30.2/direct.go先定位到db然后设置值:
func (db *RedisDB) Set(k, v string) error {
db.master.Lock()
defer db.master.Unlock()
defer db.master.signal.Broadcast()
if db.exists(k) && db.t(k) != "string" {
return ErrWrongType
}
db.del(k, true) // Remove expire
db.stringSet(k, v)
func (db *RedisDB) del(k string, delTTL bool) {
t := db.t(k)
delete(db.keys, k)
switch t {
case "string":
delete(db.stringKeys, k)
case "hash":
delete(db.hashKeys, k)
case "list":
delete(db.listKeys, k)
case "set":
delete(db.setKeys, k)
case "zset":
delete(db.sortedsetKeys, k)
case "stream":
delete(db.streamKeys, k)
case "hll":
delete(db.hllKeys, k)
default:
panic("Unknown key type: " + t)
}
func (db *RedisDB) t(k string) string {
return db.keys[k]
}
github.com/alicebob/miniredis/v2@v2.30.2/db.go比如字符串的设置如下:
func (db *RedisDB) stringSet(k, v string) {
db.del(k, false)
db.keys[k] = "string"
db.stringKeys[k] = v
db.keyVersion[k]++
比如我们用redis客户端连接后c, err := redis.Dial("tcp", s.Addr()) 发送命令_, err = c.Do("SET", "foo", "bar")后,server端会接受命令,然后用提前注册好的命令处理函数来处理github.com/alicebob/miniredis/v2@v2.30.2/server/server.go
func NewServer(addr string) (*Server, error) {
l, err := net.Listen("tcp", addr)
func newServer(l net.Listener) *Server {
s.serve(l)
在这里会取出对应的命令处理函数:
func (s *Server) Dispatch(c *Peer, args []string) {
h := s.preHook
cb, ok := s.cmds[cmdUp]
cb(c, cmdUp, args)
命令的注册函数是:
func (s *Server) Register(cmd string, f Cmd) error {
cmd = strings.ToUpper(cmd)
s.cmds[cmd] = f
命令注册的位置位于:github.com/alicebob/miniredis/v2@v2.30.2/cmd_generic.go
func commandsGeneric(m *Miniredis) {
m.srv.Register("COPY", m.cmdCopy)
m.srv.Register("DEL", m.cmdDel)
github.com/alicebob/miniredis/v2@v2.30.2/cmd_string.go
func commandsString(m *Miniredis) {
m.srv.Register("GET", m.cmdGet)
比如get命令的处理函数如下
func (m *Miniredis) cmdGet(c *server.Peer, cmd string, args []string) {
if !m.handleAuth(c) {
return
}
if m.checkPubsub(c, cmd) {
return
}
withTx(m, c, func(c *server.Peer, ctx *connCtx) {
db := m.db(ctx.selectedDB)
c.WriteBulk(db.stringGet(key))
func (db *RedisDB) stringGet(k string) string {
if t, ok := db.keys[k]; !ok || t != "string" {
return ""
}
return db.stringKeys[k]
}
命令注册发生在服务启动的过程中github.com/alicebob/miniredis/v2@v2.30.2/miniredis.go
func (m *Miniredis) start(s *server.Server) error {
commandsConnection(m)
commandsGeneric(m)
commandsServer(m)
commandsString(m)