生产者/消费者模型
简单来说,生产者生产一些数据,然后放到队列中,同时消费者从队列中取数据。这样就让生产和消费变成了异步的两个过程。当队列中没有数据时,消费者就进入饥饿的等待中;而队列中数据已满时,生产者则面临因产品积压导致 CPU 被剥夺的问题。
package main
import (
"fmt"
"os"
"os/signal"
"syscall"
"time"
)
// 生产者
func Producer(factor int, out chan <- int) {
for i := 0; ; i++ {
out <- i*factor
time.Sleep(time.Second)
}
}
// 消费者
func Consumer(in <- chan int) {
for v := range in {
fmt.Println(v)
}
}
func main() {
ch := make(chan int, 64)
go Producer(3, ch)
go Producer(5, ch)
go Consumer(ch)
// Ctrl+C 退出
sig := make(chan os.Signal, 1)
signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
fmt.Printf("quit (%v)\n", <- sig)
}
订阅/发布模型
订阅/发布(publish-subscribe)模型跟生产者/消费者模型有很多相似之处,发布者好比生产者,订阅者好比消费者。生产者和消费者是 M:N 的关系。而在传统生产者/消费者模型中,是将消息发送到一个队列中,而发布/订阅模型则是将消息发布给一个主题。
// pubsub/pubsub.go
package pubsub
import (
"sync"
"time"
)
type (
subscriber chan interface{} // 订阅者为一个通道
topicFunc func(v interface{}) bool // 主题为一个过滤器
)
type Publisher struct {
m sync.RWMutex // 读写锁
buffer int // 订阅队列的缓存大小,如果订阅者通道中的消息超过 buffer,则会丢失该消息
timeout time.Duration // 发布超时时间
subscribers map[subscriber]topicFunc // 订阅者列表
}
// 构建一个发布者对象,可以设置发布超时时间和发布队列的长度
func NewPublisher(publishTimeout time.Duration, buffer int) *Publisher {
return &Publisher{
buffer: buffer,
timeout: publishTimeout,
subscribers: make(map[subscriber]topicFunc),
}
}
// 添加一个新的订阅者,订阅全部主题
func (p *Publisher) Subscribe() chan interface{} {
return p.SubscribeTopic(nil)
}
// 添加一个新的订阅者,订阅过滤器筛选后的主题
func (p *Publisher) SubscribeTopic(topic topicFunc) chan interface{} {
ch := make(chan interface{}, p.buffer)
p.m.Lock()
p.subscribers[ch] = topic
p.m.Unlock()
return ch
}
// 退出订阅
func (p *Publisher) Evict(sub chan interface{}) {
p.m.Lock()
defer p.m.Unlock()
delete(p.subscribers, sub)
close(sub)
}
// 发布一个主题。遍历所有订阅者,将消息发送到订阅者通道
func (p *Publisher) Publish(v interface{}) {
p.m.RLock()
defer p.m.RUnlock()
var wg sync.WaitGroup
for sub, topic := range p.subscribers {
wg.Add(1)
go p.sendTopic(sub, topic, v, &wg)
}
wg.Wait()
}
// 发布主题,可以容忍一定的超时。其实就是将发布的消息发送到订阅者通道中
func (p *Publisher) sendTopic(sub subscriber, topic topicFunc, v interface{}, wg *sync.WaitGroup) {
defer wg.Done()
// 存在订阅回调,但是回调失败,直接返回
if topic != nil && !topic(v) {
return
}
// 将消息发送到订阅通道中,超时退出
select {
case sub <- v:
case <-time.After(p.timeout):
}
}
// 关闭发布者对象,同时关闭所有的订阅者通道
func (p *Publisher) Close() {
p.m.Lock()
defer p.m.Unlock()
for sub := range p.subscribers {
delete(p.subscribers, sub)
close(sub)
}
}
// main.go
package main
import (
"fmt"
"github.com/lwlwilliam/test/go/pubsub"
"os"
"os/signal"
"strings"
"syscall"
"time"
)
func main() {
p := pubsub.NewPublisher(100*time.Millisecond, 5)
defer p.Close()
all := p.Subscribe()
world := p.SubscribeTopic(func(v interface{}) bool {
if s, ok := v.(string); ok {
return strings.Contains(s, "world")
}
return false
})
p.Publish("Hello world 1")
p.Publish("Hello william 2")
p.Publish("Hello world 3")
p.Publish("Hello world 4")
p.Publish("Hello world 5")
p.Publish("Hello world 6") // 由于 all 通道满了,所以 all 会丢失该消息
p.Publish("Hello world 7") // 由于 world 通道也满了,所以 all 和 world 均会丢失该消息
go func() {
for msg := range all {
fmt.Println("all:", msg)
}
}()
go func() {
for msg := range world {
fmt.Println("world:", msg)
}
}()
time.Sleep(3 * time.Second)
p.Publish("Hello world 8") // 正常情况下,睡眠 3 秒后,all 和 world 通道都清空缓存了,所以 all 和 world 均会订阅成功
// Ctrl+C 退出
sig := make(chan os.Signal, 1)
signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
fmt.Printf("quit (%v)\n", <-sig)
}
生产者/消费者模型和订阅/发布模型的关系,类似于网络协议中的”pull”协议和”push”协议;生产者/消费者模型的数据传输不需要生产者发送给消费者,而是消费者主动”pull”数据;订阅/发布模型需要发布者将数据”push”给订阅者。