EventBus EventBus 是一个轻量级的事件发布/订阅框架,支持同步和异步发布消息,它可以简化 Go 协程之间的通信。
安装 确保计算机上已安装 Go(版本 1.18+)。在终端中输入以下命令:
go get github.com/werbenhu/eventbus
在项目中导入包
1 2 3 import ( "github.com/werbenhu/eventbus" )
EventBus 是什么? EventBus同时支持同步和异步的方式发布消息。EventBus使用一个Copy-On-Write的map管理handler和topic,所以不建议在有大量频繁的订阅和取消订阅的业务场景中使用。
异步的方式 在EventBus里,每个主题对应一个通道。Publish()
方法将消息推送到通道,Subscribe(
) 方法中的handler将处理从通道出来的消息。如果要使用带缓冲的EventBus,可以使用 eventbus.NewBuffered(bufferSize int)
方法创建带缓冲的EventBus,这样会为每个topic都创建一个带缓冲的channel。
同步的方式 同步的方式下EventBus不使用channel,而是通过直接调用handler将消息传递给订阅者。如果想同步的方式发布消息,使用eventbus.PublishSync()函数即可。
EventBus 示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 package mainimport ( "fmt" "time" "github.com/werbenhu/eventbus" ) func handler (topic string , payload int ) { fmt.Printf("topic:%s, payload:%d\n" , topic, payload) } func main () { bus := eventbus.New() bus.Subscribe("testtopic" , handler) bus.Publish("testtopic" , 100 ) bus.PublishSync("testtopic" , 200 ) time.Sleep(time.Millisecond) bus.Unsubscribe("testtopic" , handler) bus.Close() }
使用全局的EventBus单例对象 为了更方便的使用EventBus, 这里有一个全局的EventBus单例对象,使用eventbus.InitSingleton()
初始化这个单例对象,这个对象内部的channel是无缓冲的,直接使用eventbus.Subscribe()
,eventbus.Publish()
,eventbus.Unsubscribe()
,将会调用该单例对象对应的方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 func handler (topic string , payload int ) { fmt.Printf("topic:%s, payload:%d\n" , topic, payload) } func main () { eventbus.InitSingleton() eventbus.Subscribe("testtopic" , handler) var wg sync.WaitGroup wg.Add(1 ) go func () { for i := 0 ; i < 100 ; i++ { eventbus.Publish("testtopic" , i) } for i := 100 ; i < 200 ; i++ { eventbus.PublishSync("testtopic" , i) } wg.Done() }() wg.Wait() time.Sleep(time.Millisecond) eventbus.Unsubscribe("testtopic" , handler) eventbus.Close() }
使用Pipe代替Channel Pipe 将通道封装成泛型对象,泛型参数对应channle里的类型,这里没有主题的概念。eventbus.NewPipe[T]()
等价于 make(chan T)
,发布者发布消息,订阅者接收消息,可以使用 Pipe.Publish()
方法代替 chan <-
,使用 Pipe.Subscribe()
方法代替 <-chan
。如果有多个订阅者,则每个订阅者将接收到发布出来的每一条消息。
如果要使用带缓冲的通道,可以使用 eventbus.NewBufferedPipe[T](bufferSize int)
方法创建带缓冲的管道。Pipe同样支持同步和异步的方式发布消息。如果需要使用同步的方式,请调用Pipe.PublishSync()。
Pipe 示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 func handler1 (val string ) { fmt.Printf("handler1 val:%s\n" , val) } func handler2 (val string ) { fmt.Printf("handler2 val:%s\n" , val) } func main () { pipe := eventbus.NewPipe[string ]() pipe.Subscribe(handler1) pipe.Subscribe(handler2) var wg sync.WaitGroup wg.Add(1 ) go func () { for i := 0 ; i < 100 ; i++ { pipe.Publish(strconv.Itoa(i)) } for i := 100 ; i < 200 ; i++ { pipe.PublishSync(strconv.Itoa(i)) } wg.Done() }() wg.Wait() time.Sleep(time.Millisecond) pipe.Unsubscribe(handler1) pipe.Unsubscribe(handler2) pipe.Close() }