一个轻量级的事件发布/订阅框架,它可以简化 Go 协程之间的通信
2023-05-03 10:47:24 # Go

EventBus

EventBus 是一个轻量级的事件发布/订阅框架,支持同步和异步发布消息,它可以简化 Go 协程之间的通信。

安装

确保计算机上已安装 Go(版本 1.18+)。在终端中输入以下命令:

go get github.com/werbenhu/eventbus

在项目中导入包

1
2
3
import (
"github.com/werbenhu/eventbus"
)

EventBus 是什么?

EventBus同时支持同步和异步的方式发布消息。EventBus使用一个Copy-On-Write的map管理handler和topic,所以不建议在有大量频繁的订阅和取消订阅的业务场景中使用。

异步的方式

在EventBus里,每个主题对应一个通道。Publish() 方法将消息推送到通道,Subscribe() 方法中的handler将处理从通道出来的消息。如果要使用带缓冲的EventBus,可以使用 eventbus.NewBuffered(bufferSize int) 方法创建带缓冲的EventBus,这样会为每个topic都创建一个带缓冲的channel。

同步的方式

同步的方式下EventBus不使用channel,而是通过直接调用handler将消息传递给订阅者。如果想同步的方式发布消息,使用eventbus.PublishSync()函数即可。

EventBus 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package main

import (
"fmt"
"time"

"github.com/werbenhu/eventbus"
)

func handler(topic string, payload int) {
fmt.Printf("topic:%s, payload:%d\n", topic, payload)
}

func main() {
bus := eventbus.New()

// Subscribe() 订阅一个主题,如果handler不是函数则返回错误。
// handler必须有两个参数:第一个参数必须是字符串类型,
// handler的第二个参数类型必须与 `Publish()` 中的 payload 类型一致。
bus.Subscribe("testtopic", handler)

// 异步方式发布消息
bus.Publish("testtopic", 100)

// 同步方式发布消息
bus.PublishSync("testtopic", 200)

// 订阅者接收消息。为了确保订阅者可以接收完所有消息的异步消息,这里在取消订阅之前给了一点延迟。
time.Sleep(time.Millisecond)
bus.Unsubscribe("testtopic", handler)
bus.Close()
}

使用全局的EventBus单例对象

为了更方便的使用EventBus, 这里有一个全局的EventBus单例对象,使用eventbus.InitSingleton() 初始化这个单例对象,这个对象内部的channel是无缓冲的,直接使用eventbus.Subscribe(),eventbus.Publish(),eventbus.Unsubscribe(),将会调用该单例对象对应的方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
func handler(topic string, payload int) {
fmt.Printf("topic:%s, payload:%d\n", topic, payload)
}

func main() {

// 初始化单例对象
eventbus.InitSingleton()

// eventbus.Subscribe() 将调用全局单例singleton.Subscribe()方法
eventbus.Subscribe("testtopic", handler)

var wg sync.WaitGroup
wg.Add(1)
go func() {
// 异步方式发布消息
for i := 0; i < 100; i++ {
// 调用全局单例singleton.Publish()方法
eventbus.Publish("testtopic", i)
}
// 同步方式发布消息
for i := 100; i < 200; i++ {
// 调用全局单例singleton.PublishSync()方法
eventbus.PublishSync("testtopic", i)
}
wg.Done()
}()
wg.Wait()

time.Sleep(time.Millisecond)
// eventbus.Unsubscribe() 将调用全局单例singleton.Unsubscribe()方法
eventbus.Unsubscribe("testtopic", handler)

// eventbus.Close() 将调用全局单例singleton.Close()方法
eventbus.Close()
}

使用Pipe代替Channel

Pipe 将通道封装成泛型对象,泛型参数对应channle里的类型,这里没有主题的概念。
eventbus.NewPipe[T]() 等价于 make(chan T),发布者发布消息,订阅者接收消息,可以使用 Pipe.Publish() 方法代替 chan <-,使用 Pipe.Subscribe() 方法代替 <-chan。如果有多个订阅者,则每个订阅者将接收到发布出来的每一条消息。

如果要使用带缓冲的通道,可以使用 eventbus.NewBufferedPipe[T](bufferSize int) 方法创建带缓冲的管道。Pipe同样支持同步和异步的方式发布消息。如果需要使用同步的方式,请调用Pipe.PublishSync()。

Pipe 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func handler1(val string) {
fmt.Printf("handler1 val:%s\n", val)
}

func handler2(val string) {
fmt.Printf("handler2 val:%s\n", val)
}

func main() {
pipe := eventbus.NewPipe[string]()
pipe.Subscribe(handler1)
pipe.Subscribe(handler2)

var wg sync.WaitGroup
wg.Add(1)
go func() {
for i := 0; i < 100; i++ {
pipe.Publish(strconv.Itoa(i))
}
for i := 100; i < 200; i++ {
pipe.PublishSync(strconv.Itoa(i))
}
wg.Done()
}()
wg.Wait()

time.Sleep(time.Millisecond)
pipe.Unsubscribe(handler1)
pipe.Unsubscribe(handler2)
pipe.Close()
}
上一页
2023-05-03 10:47:24 # Go