golang源码分析:miniredis

Golang
204
0
0
2024-01-19
标签   Redis

做单元测试的时候,我们需要打桩mock掉一些中间件,miniredis是mock掉redis的一个利器它既可以通过非网络本地mock,也可通过tcp请求,经过redis协议完整mockredis代码,首先看下如何使用:

package main
import (
  "github.com/alicebob/miniredis/v2"
  "github.com/gomodule/redigo/redis"


  "fmt"
  "testing"
  "time"
)


func TestSomething(t *testing.T) {
  s := miniredis.RunT(t)


  // Optionally set some keys your code expects:
  s.Set("foo", "bar")
  s.HSet("some", "other", "key")


  // Run your code and see if it behaves.
// An example using the redigo library from "github.com/gomodule/redigo/redis":
  c, err := redis.Dial("tcp", s.Addr())
  fmt.Println(err)
  _, err = c.Do("SET", "foo", "bar")


  // Optionally check values in redis...
  if got, err := s.Get("foo"); err != nil || got != "bar" {
    t.Error("'foo' has the wrong value")
  }
  // ... or use a helper for that:
  s.CheckGet(t, "foo", "bar")


  // TTL and expiration:
  s.Set("foo", "bar")
  s.SetTTL("foo", 10*time.Second)
  s.FastForward(11 * time.Second)
  if s.Exists("foo") {
    t.Fatal("'foo' should not have existed anymore")
  }
}

可以通过 s := miniredis.RunT(t)启动一个模拟的redis server服务器,然后直接本地set值 s.Set("foo", "bar")

当然也可以通过redis协议进行远程设置,比如我们通过 "github.com/gomodule/redigo/redis"的redis客户端来进行设置。

  c, err := redis.Dial("tcp", s.Addr())
  _, err = c.Do("SET", "foo", "bar")

接着我们分析下看它的源码是如何实现的:

s := miniredis.RunT(t)本质是是启动了一个tcp服务器,然后注册了一系列处理函数,根据每个不同的redis命令进行了不同的处理,server的初始化位于github.com/alicebob/miniredis/v2@v2.30.2/miniredis.go

func RunT(t Tester) *Miniredis {
  m := NewMiniRedis()
  if err := m.Start(); err != nil {
  t.Cleanup(m.Close)
func NewMiniRedis() *Miniredis {
  m := Miniredis{
    dbs:         map[int]*RedisDB{},
    scripts:     map[string]string{},
    subscribers: map[*Subscriber]struct{}{},
  }
  m.signal = sync.NewCond(&m)

核心结构体定义如下:

type RedisDB struct {
  master        *Miniredis               // pointer to the lock in Miniredis
  id            int                      // db id
  keys          map[string]string        // Master map of keys with their type
  stringKeys    map[string]string        // GET/SET &c. keys
  hashKeys      map[string]hashKey       // MGET/MSET &c. keys
  listKeys      map[string]listKey       // LPUSH &c. keys
  setKeys       map[string]setKey        // SADD &c. keys
  hllKeys       map[string]*hll          // PFADD &c. keys
  sortedsetKeys map[string]sortedSet     // ZADD &c. keys
  streamKeys    map[string]*streamKey    // XADD &c. keys
  ttl           map[string]time.Duration // effective TTL values
  keyVersion    map[string]uint          // used to watch values
}
type Miniredis struct {
  sync.Mutex
  srv         *server.Server
  port        int
  passwords   map[string]string // username password
  dbs         map[int]*RedisDB
  selectedDB  int               // DB id used in the direct Get(), Set() &c.
  scripts     map[string]string // sha1 -> lua src
  signal      *sync.Cond
  now         time.Time // time.Now() if not set.
  subscribers map[*Subscriber]struct{}
  rand        *rand.Rand
  Ctx         context.Context
  CtxCancel   context.CancelFunc
}

然后启动server开始服务

func (m *Miniredis) Start() error {
  s, err := server.NewServer(fmt.Sprintf("127.0.0.1:%d", m.port))
  if err != nil {
    return err
  }
  return m.start(s)
}

github.com/alicebob/miniredis/v2@v2.30.2/server/server.go

type Server struct {
  l         net.Listener
  cmds      map[string]Cmd
  preHook   Hook
  peers     map[net.Conn]struct{}
  mu        sync.Mutex
  wg        sync.WaitGroup
  infoConns int
  infoCmds  int
}

每接受一个请求,起一个协程来处理请求:

func newServer(l net.Listener) *Server {
  s := Server{
    cmds:  map[string]Cmd{},
    peers: map[net.Conn]struct{}{},
    l:     l,
  }
          s.wg.Add(1)
  go func() {
    defer s.wg.Done()
    s.serve(l)
func (s *Server) serve(l net.Listener) {
        for {
    conn, err := l.Accept()
    if err != nil {
      return
    }
    s.ServeConn(conn)  
func (s *Server) ServeConn(conn net.Conn) {
        s.peers[conn] = struct{}{}
        go func() {
    defer s.wg.Done()
    defer conn.Close()


    s.servePeer(conn)

解析请求命令,然后分发进行处理:

func (s *Server) servePeer(c net.Conn) {
          go func() {
    defer close(readCh)


    for {
      args, err := readArray(r)
      if err != nil {
        peer.Close()
        return
      }


      readCh <- args
          for args := range readCh {
    s.Dispatch(peer, args)
    peer.Flush()  
func (s *Server) Dispatch(c *Peer, args []string) {
        cb, ok := s.cmds[cmdUp]
        cb(c, cmdUp, args)

github.com/alicebob/miniredis/v2@v2.30.2/server/proto.go其中会解析redis协议得到一个个命令:

func readArray(rd *bufio.Reader) ([]string, error) {
  line, err := rd.ReadString('\n')
        switch line[0] {
  default:
    return nil, ErrProtocol
  case '*':

比如执行命令s.Set("foo", "bar")的过程,本质上是将key对应的val存入一个map,其具体过程如下:

github.com/alicebob/miniredis/v2@v2.30.2/direct.go

func (m *Miniredis) Set(k, v string) error {
  return m.DB(m.selectedDB).Set(k, v)
}
func (m *Miniredis) DB(i int) *RedisDB {
  m.Lock()
  defer m.Unlock()
  return m.db(i)
}
// get DB. No locks!
func (m *Miniredis) db(i int) *RedisDB {
  if db, ok := m.dbs[i]; ok {
    return db
  }
  db := newRedisDB(i, m) // main miniredis has our mutex.
  m.dbs[i] = &db
  return &db
}

github.com/alicebob/miniredis/v2@v2.30.2/direct.go先定位到db然后设置值:

func (db *RedisDB) Set(k, v string) error {
          db.master.Lock()
  defer db.master.Unlock()
  defer db.master.signal.Broadcast()
          if db.exists(k) && db.t(k) != "string" {
    return ErrWrongType
  }
        db.del(k, true) // Remove expire
  db.stringSet(k, v)  
func (db *RedisDB) del(k string, delTTL bool) {
          t := db.t(k)
  delete(db.keys, k)
        switch t {
  case "string":
    delete(db.stringKeys, k)
  case "hash":
    delete(db.hashKeys, k)
  case "list":
    delete(db.listKeys, k)
  case "set":
    delete(db.setKeys, k)
  case "zset":
    delete(db.sortedsetKeys, k)
  case "stream":
    delete(db.streamKeys, k)
  case "hll":
    delete(db.hllKeys, k)
  default:
    panic("Unknown key type: " + t)
  }
      func (db *RedisDB) t(k string) string {
  return db.keys[k]
}

github.com/alicebob/miniredis/v2@v2.30.2/db.go比如字符串的设置如下:

func (db *RedisDB) stringSet(k, v string) {
          db.del(k, false)
  db.keys[k] = "string"
  db.stringKeys[k] = v
  db.keyVersion[k]++

比如我们用redis客户端连接后c, err := redis.Dial("tcp", s.Addr()) 发送命令_, err = c.Do("SET", "foo", "bar")后,server端会接受命令,然后用提前注册好的命令处理函数来处理github.com/alicebob/miniredis/v2@v2.30.2/server/server.go

func NewServer(addr string) (*Server, error) {
  l, err := net.Listen("tcp", addr)  
func newServer(l net.Listener) *Server {
        s.serve(l)

在这里会取出对应的命令处理函数:

func (s *Server) Dispatch(c *Peer, args []string) {
        h := s.preHook
        cb, ok := s.cmds[cmdUp]
        cb(c, cmdUp, args)

命令的注册函数是:

  func (s *Server) Register(cmd string, f Cmd) error {
        cmd = strings.ToUpper(cmd)
        s.cmds[cmd] = f

命令注册的位置位于:github.com/alicebob/miniredis/v2@v2.30.2/cmd_generic.go

func commandsGeneric(m *Miniredis) {
  m.srv.Register("COPY", m.cmdCopy)
  m.srv.Register("DEL", m.cmdDel)

github.com/alicebob/miniredis/v2@v2.30.2/cmd_string.go

func commandsString(m *Miniredis) {
        m.srv.Register("GET", m.cmdGet)

比如get命令的处理函数如下

func (m *Miniredis) cmdGet(c *server.Peer, cmd string, args []string) {
          if !m.handleAuth(c) {
    return
  }
  if m.checkPubsub(c, cmd) {
    return
  }
        withTx(m, c, func(c *server.Peer, ctx *connCtx) {
    db := m.db(ctx.selectedDB)


        c.WriteBulk(db.stringGet(key))
func (db *RedisDB) stringGet(k string) string {
  if t, ok := db.keys[k]; !ok || t != "string" {
    return ""
  }
  return db.stringKeys[k]
}

命令注册发生在服务启动的过程中github.com/alicebob/miniredis/v2@v2.30.2/miniredis.go

func (m *Miniredis) start(s *server.Server) error {
          commandsConnection(m)
  commandsGeneric(m)
  commandsServer(m)
  commandsString(m)