做单元测试的时候,我们需要打桩mock掉一些中间件,miniredis是mock掉redis的一个利器它既可以通过非网络本地mock,也可通过tcp请求,经过redis协议完整mockredis代码,首先看下如何使用:
package main | |
import ( | |
"github.com/alicebob/miniredis/v2" | |
"github.com/gomodule/redigo/redis" | |
"fmt" | |
"testing" | |
"time" | |
) | |
func TestSomething(t *testing.T) { | |
s := miniredis.RunT(t) | |
// Optionally set some keys your code expects: | |
s.Set("foo", "bar") | |
s.HSet("some", "other", "key") | |
// Run your code and see if it behaves. | |
// An example using the redigo library from "github.com/gomodule/redigo/redis": | |
c, err := redis.Dial("tcp", s.Addr()) | |
fmt.Println(err) | |
_, err = c.Do("SET", "foo", "bar") | |
// Optionally check values in redis... | |
if got, err := s.Get("foo"); err != nil || got != "bar" { | |
t.Error("'foo' has the wrong value") | |
} | |
// ... or use a helper for that: | |
s.CheckGet(t, "foo", "bar") | |
// TTL and expiration: | |
s.Set("foo", "bar") | |
s.SetTTL("foo", 10*time.Second) | |
s.FastForward(11 * time.Second) | |
if s.Exists("foo") { | |
t.Fatal("'foo' should not have existed anymore") | |
} | |
} |
可以通过 s := miniredis.RunT(t)启动一个模拟的redis server服务器,然后直接本地set值 s.Set("foo", "bar")
当然也可以通过redis协议进行远程设置,比如我们通过 "github.com/gomodule/redigo/redis"的redis客户端来进行设置。
c, err := redis.Dial("tcp", s.Addr()) | |
_, err = c.Do("SET", "foo", "bar") |
接着我们分析下看它的源码是如何实现的:
s := miniredis.RunT(t)本质是是启动了一个tcp服务器,然后注册了一系列处理函数,根据每个不同的redis命令进行了不同的处理,server的初始化位于github.com/alicebob/miniredis/v2@v2.30.2/miniredis.go
func RunT(t Tester) *Miniredis { | |
m := NewMiniRedis() | |
if err := m.Start(); err != nil { | |
t.Cleanup(m.Close) | |
func NewMiniRedis() *Miniredis { | |
m := Miniredis{ | |
dbs: map[int]*RedisDB{}, | |
scripts: map[string]string{}, | |
subscribers: map[*Subscriber]struct{}{}, | |
} | |
m.signal = sync.NewCond(&m) |
核心结构体定义如下:
type RedisDB struct { | |
master *Miniredis // pointer to the lock in Miniredis | |
id int // db id | |
keys map[string]string // Master map of keys with their type | |
stringKeys map[string]string // GET/SET &c. keys | |
hashKeys map[string]hashKey // MGET/MSET &c. keys | |
listKeys map[string]listKey // LPUSH &c. keys | |
setKeys map[string]setKey // SADD &c. keys | |
hllKeys map[string]*hll // PFADD &c. keys | |
sortedsetKeys map[string]sortedSet // ZADD &c. keys | |
streamKeys map[string]*streamKey // XADD &c. keys | |
ttl map[string]time.Duration // effective TTL values | |
keyVersion map[string]uint // used to watch values | |
} | |
type Miniredis struct { | |
sync.Mutex | |
srv *server.Server | |
port int | |
passwords map[string]string // username password | |
dbs map[int]*RedisDB | |
selectedDB int // DB id used in the direct Get(), Set() &c. | |
scripts map[string]string // sha1 -> lua src | |
signal *sync.Cond | |
now time.Time // time.Now() if not set. | |
subscribers map[*Subscriber]struct{} | |
rand *rand.Rand | |
Ctx context.Context | |
CtxCancel context.CancelFunc | |
} |
然后启动server开始服务
func (m *Miniredis) Start() error { | |
s, err := server.NewServer(fmt.Sprintf("127.0.0.1:%d", m.port)) | |
if err != nil { | |
return err | |
} | |
return m.start(s) | |
} |
github.com/alicebob/miniredis/v2@v2.30.2/server/server.go
type Server struct { | |
l net.Listener | |
cmds map[string]Cmd | |
preHook Hook | |
peers map[net.Conn]struct{} | |
mu sync.Mutex | |
wg sync.WaitGroup | |
infoConns int | |
infoCmds int | |
} |
每接受一个请求,起一个协程来处理请求:
func newServer(l net.Listener) *Server { | |
s := Server{ | |
cmds: map[string]Cmd{}, | |
peers: map[net.Conn]struct{}{}, | |
l: l, | |
} | |
s.wg.Add(1) | |
go func() { | |
defer s.wg.Done() | |
s.serve(l) | |
func (s *Server) serve(l net.Listener) { | |
for { | |
conn, err := l.Accept() | |
if err != nil { | |
return | |
} | |
s.ServeConn(conn) | |
func (s *Server) ServeConn(conn net.Conn) { | |
s.peers[conn] = struct{}{} | |
go func() { | |
defer s.wg.Done() | |
defer conn.Close() | |
s.servePeer(conn) |
解析请求命令,然后分发进行处理:
func (s *Server) servePeer(c net.Conn) { | |
go func() { | |
defer close(readCh) | |
for { | |
args, err := readArray(r) | |
if err != nil { | |
peer.Close() | |
return | |
} | |
readCh <- args | |
for args := range readCh { | |
s.Dispatch(peer, args) | |
peer.Flush() | |
func (s *Server) Dispatch(c *Peer, args []string) { | |
cb, ok := s.cmds[cmdUp] | |
cb(c, cmdUp, args) |
github.com/alicebob/miniredis/v2@v2.30.2/server/proto.go其中会解析redis协议得到一个个命令:
func readArray(rd *bufio.Reader) ([]string, error) { | |
line, err := rd.ReadString('\n') | |
switch line[0] { | |
default: | |
return nil, ErrProtocol | |
case '*': |
比如执行命令s.Set("foo", "bar")的过程,本质上是将key对应的val存入一个map,其具体过程如下:
github.com/alicebob/miniredis/v2@v2.30.2/direct.go
func (m *Miniredis) Set(k, v string) error { | |
return m.DB(m.selectedDB).Set(k, v) | |
} | |
func (m *Miniredis) DB(i int) *RedisDB { | |
m.Lock() | |
defer m.Unlock() | |
return m.db(i) | |
} | |
// get DB. No locks! | |
func (m *Miniredis) db(i int) *RedisDB { | |
if db, ok := m.dbs[i]; ok { | |
return db | |
} | |
db := newRedisDB(i, m) // main miniredis has our mutex. | |
m.dbs[i] = &db | |
return &db | |
} |
github.com/alicebob/miniredis/v2@v2.30.2/direct.go先定位到db然后设置值:
func (db *RedisDB) Set(k, v string) error { | |
db.master.Lock() | |
defer db.master.Unlock() | |
defer db.master.signal.Broadcast() | |
if db.exists(k) && db.t(k) != "string" { | |
return ErrWrongType | |
} | |
db.del(k, true) // Remove expire | |
db.stringSet(k, v) | |
func (db *RedisDB) del(k string, delTTL bool) { | |
t := db.t(k) | |
delete(db.keys, k) | |
switch t { | |
case "string": | |
delete(db.stringKeys, k) | |
case "hash": | |
delete(db.hashKeys, k) | |
case "list": | |
delete(db.listKeys, k) | |
case "set": | |
delete(db.setKeys, k) | |
case "zset": | |
delete(db.sortedsetKeys, k) | |
case "stream": | |
delete(db.streamKeys, k) | |
case "hll": | |
delete(db.hllKeys, k) | |
default: | |
panic("Unknown key type: " + t) | |
} | |
func (db *RedisDB) t(k string) string { | |
return db.keys[k] | |
} |
github.com/alicebob/miniredis/v2@v2.30.2/db.go比如字符串的设置如下:
func (db *RedisDB) stringSet(k, v string) { | |
db.del(k, false) | |
db.keys[k] = "string" | |
db.stringKeys[k] = v | |
db.keyVersion[k]++ |
比如我们用redis客户端连接后c, err := redis.Dial("tcp", s.Addr()) 发送命令_, err = c.Do("SET", "foo", "bar")后,server端会接受命令,然后用提前注册好的命令处理函数来处理github.com/alicebob/miniredis/v2@v2.30.2/server/server.go
func NewServer(addr string) (*Server, error) { | |
l, err := net.Listen("tcp", addr) | |
func newServer(l net.Listener) *Server { | |
s.serve(l) |
在这里会取出对应的命令处理函数:
func (s *Server) Dispatch(c *Peer, args []string) { | |
h := s.preHook | |
cb, ok := s.cmds[cmdUp] | |
cb(c, cmdUp, args) |
命令的注册函数是:
func (s *Server) Register(cmd string, f Cmd) error { | |
cmd = strings.ToUpper(cmd) | |
s.cmds[cmd] = f |
命令注册的位置位于:github.com/alicebob/miniredis/v2@v2.30.2/cmd_generic.go
func commandsGeneric(m *Miniredis) { | |
m.srv.Register("COPY", m.cmdCopy) | |
m.srv.Register("DEL", m.cmdDel) |
github.com/alicebob/miniredis/v2@v2.30.2/cmd_string.go
func commandsString(m *Miniredis) { | |
m.srv.Register("GET", m.cmdGet) |
比如get命令的处理函数如下
func (m *Miniredis) cmdGet(c *server.Peer, cmd string, args []string) { | |
if !m.handleAuth(c) { | |
return | |
} | |
if m.checkPubsub(c, cmd) { | |
return | |
} | |
withTx(m, c, func(c *server.Peer, ctx *connCtx) { | |
db := m.db(ctx.selectedDB) | |
c.WriteBulk(db.stringGet(key)) | |
func (db *RedisDB) stringGet(k string) string { | |
if t, ok := db.keys[k]; !ok || t != "string" { | |
return "" | |
} | |
return db.stringKeys[k] | |
} |
命令注册发生在服务启动的过程中github.com/alicebob/miniredis/v2@v2.30.2/miniredis.go
func (m *Miniredis) start(s *server.Server) error { | |
commandsConnection(m) | |
commandsGeneric(m) | |
commandsServer(m) | |
commandsString(m) |