玩转Golang的channel,二百行代码实现PubSub模式

Golang
332
0
0
2022-07-03

引言

PubSub(Publish/Subscribe)模式,,意为“发布/订阅”模式,是为了解决一对多的依赖关系,使多个消费者同时监听某一个主题,不仅可以让生产者和消费者解耦,同时也让不同的消费者之间相互解耦(注:有些反模式依赖订阅者执行的先后顺序,使用共享数据来传递状态,是需要避免的,因为这样会使消费者耦合在一起,不能独立变化)。这其中的关键就在于需要有中介来维护订阅关系,并负责把生产的消息,传递给订阅方。

在Golang这门语言中,channel天然就适合来当这个中介,下面就让我们一步步根据PubSub模式,实现工具类EventBus.

定义类型

首先,让我们先定义一些基本类型和核心操作。

//EventID是Event的唯一标识
type EventID int64

//Event
type Event interface {
    ID() EventID
}

//EventHandler
type EventHandler interface {
    OnEvent(ctx context.Context, event Event) error
    CanAutoRetry(err error) bool
}

// JobStatus holds information related to a job status
type JobStatus struct {
    RunAt      time.Time
    FinishedAt time.Time
    Err        error
}

//EventBus ...
type EventBus struct {}

func (eb *EventBus) Subscribe(eventID EventID, handlers ...EventHandler) { }

func (eb *EventBus) Unsubscribe(eventID EventID, handlers ...EventHandler) { }

func (eb *EventBus) Publish(evt Event) <-chan JobStatus { }

重点拆解

首先,消费者要通过Subscribe来订阅相关的主题,这其中的重点就是需要根据EventID维护订阅的消费者,很自然的想到map,我们选择用handlers map[EventID][]EventHandler来维护,考虑到并发问题,还需要来加个锁。

//Subscribe ...
func (eb *EventBus) Subscribe(eventID EventID, handlers ...EventHandler) {
    eb.mu.Lock()
    defer eb.mu.Unlock()

    eb.handlers[eventID] = append(eb.handlers[eventID], handlers...)
}

这里实现的比较简单,没有考虑一个消费者,重复订阅的问题,留给了使用方自己处理。(但同一个消费者为什么要多次调用subcribe,订阅同一个主题呢,感觉是在写bug

下面就是最核心的Publish函数了,一方面一定是需要一个channel(最好是有buffer的)来传递Event数据,另一方面,为了保证性能,需要有一些常驻协程,来监听消息,并启动相关的消费者。以下是相关代码(在完整版代码里,添加了日志、错误处理等,这里为了展示重点,暂且隐去)

func (eb *EventBus) Start() {
    if eb.started {
        return
    }

    for i := 0; i < eb.eventWorkers; i++ {
        eb.wg.Add(1)
        go eb.eventWorker(eb.eventJobQueue)
    }

    eb.started = true
}


func (eb *EventBus) eventWorker(jobQueue <-chan EventJob) {
loop:
    for {
        select {
        case job := <-jobQueue:
            jobStatus := JobStatus{
                RunAt: time.Now(),
            }

            ctx, cancel := context.WithTimeout(context.Background(), eb.timeout)
            g, _ := errgroup.WithContext(ctx)
            for index := range job.handlers {
                handler := job.handlers[index]
                g.Go(func() error {
                    return eb.runHandler(ctx, handler, job.event)
                })
            }
            jobStatus.Err = g.Wait()

            jobStatus.FinishedAt = time.Now()

            select {
            case job.resultChan <- jobStatus:
            default:
            }
            cancel()
        }
    }
}

做好上面的准备工作后,以下就是真正的Publish代码了。

// EventJob ...
type EventJob struct {
    event      Event
    handlers   []EventHandler
    resultChan chan JobStatus
}

//Publish ...
func (eb *EventBus) Publish(evt Event) <-chan JobStatus {
    eb.mu.RLock()
    defer eb.mu.RUnlock()
    if ehs, ok := eb.handlers[evt.ID()]; ok {
        handlers := make([]EventHandler, len(ehs))
        copy(handlers, ehs) //snapshot一份当时的消费者
        job := EventJob{
            event:      evt,
            handlers:   handlers,
            resultChan: make(chan JobStatus, 1),
        }

        var jobQueue = eb.eventJobQueue
        select {
        case jobQueue <- job:
        default:
        }

        return job.resultChan
    } else {
        err := fmt.Errorf("no handlers for event(%d)", evt.ID())
        resultChan := make(chan JobStatus, 1)
        resultChan <- JobStatus{
            Err: err,
        }
        return resultChan
    }
}

这里没有在eventWorker中直接从handlers中根据ID拿到相关的消费者,一方面是为了让eventWorker更加通用,另一方面也是为减少因为锁操作引起的阻塞。

至此,我们已经把最核心的代码一一拆解完成,完整代码,请参见channelx项目中的event_bus.go

使用示例

没有例子的工具类是不完整的,下面就提供一个例子。

先定义一个event,这里把id定义成私有的,然后在构造函数中,强制指定。

const ExampleEventID channelx.EventID = 1

type ExampleEvent struct {
    id channelx.EventID
}

func NewExampleEvent() ExampleEvent {
    return ExampleEvent{id:ExampleEventID}
}

func (evt ExampleEvent) ID() channelx.EventID  {
    return evt.id
}

接下来是event handler,需要根据实际的需要,在OnEvent中检查接收到的事件是否是订阅的事件,以及接收到事件结构是否能转换成特定的类型。在防御编程后,就可以处理事件逻辑了。

type ExampleHandler struct {
    logger channelx.Logger
}

func NewExampleHandler(logger channelx.Logger) *ExampleHandler {
    return &ExampleHandler{
        logger: logger,
    }
}

func (h ExampleHandler) Logger() channelx.Logger{
    return h.logger
}

func (h ExampleHandler) CanAutoRetry(err error) bool {
    return false
}

func (h ExampleHandler) OnEvent(ctx context.Context, event channelx.Event) error {
    if event.ID() != ExampleEventID {
        return fmt.Errorf("subscribe wrong event(%d)", event.ID())
    }

    _, ok := event.(ExampleEvent)
    if !ok {
        return fmt.Errorf("failed to convert received event to ExampleEvent")
    }

    // handle the event here
    h.Logger().Infof("event handled")

    return nil
}

最后,就是EventBus的启动,事件的订阅和发布了。

eventBus := channelx.NewEventBus(logger, "test", 4,4,2, time.Second, 5 * time.Second)
eventBus.Start()

handler := NewExampleHandler(logger)
eventBus.Subscribe(ExampleEventID, handler)
eventBus.Publish(NewExampleEvent())

写在最后

里面实现的轻量级util都开源在channelx,欢迎大家审阅,如果有你喜欢用的工具,欢迎点个赞或者star :)