生产者-消费者模型与订阅-发布模型

2021/03/24 计算机原理

生产者/消费者模型

简单来说,生产者生产一些数据,然后放到队列中,同时消费者从队列中取数据。这样就让生产和消费变成了异步的两个过程。当队列中没有数据时,消费者就进入饥饿的等待中;而队列中数据已满时,生产者则面临因产品积压导致 CPU 被剥夺的问题。

package main

import (
    "fmt"
    "os"
    "os/signal"
    "syscall"
    "time"
)

// 生产者
func Producer(factor int, out chan <- int) {
    for i := 0; ; i++ {
        out <- i*factor
        time.Sleep(time.Second)
    }
}

// 消费者
func Consumer(in <- chan int) {
    for v := range in {
        fmt.Println(v)
    }
}

func main() {
    ch := make(chan int, 64)
    go Producer(3, ch)
    go Producer(5, ch)
    go Consumer(ch)

    // Ctrl+C 退出
    sig := make(chan os.Signal, 1)
    signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
    fmt.Printf("quit (%v)\n", <- sig)
}

订阅/发布模型

订阅/发布(publish-subscribe)模型跟生产者/消费者模型有很多相似之处,发布者好比生产者,订阅者好比消费者。生产者和消费者是 M:N 的关系。而在传统生产者/消费者模型中,是将消息发送到一个队列中,而发布/订阅模型则是将消息发布给一个主题。

// pubsub/pubsub.go
package pubsub

import (
    "sync"
    "time"
)

type (
    subscriber chan interface{}         // 订阅者为一个通道
    topicFunc  func(v interface{}) bool // 主题为一个过滤器
)

type Publisher struct {
    m           sync.RWMutex             // 读写锁
    buffer      int                      // 订阅队列的缓存大小,如果订阅者通道中的消息超过 buffer,则会丢失该消息
    timeout     time.Duration            // 发布超时时间
    subscribers map[subscriber]topicFunc // 订阅者列表
}

// 构建一个发布者对象,可以设置发布超时时间和发布队列的长度
func NewPublisher(publishTimeout time.Duration, buffer int) *Publisher {
    return &Publisher{
        buffer:      buffer,
        timeout:     publishTimeout,
        subscribers: make(map[subscriber]topicFunc),
    }
}

// 添加一个新的订阅者,订阅全部主题
func (p *Publisher) Subscribe() chan interface{} {
    return p.SubscribeTopic(nil)
}

// 添加一个新的订阅者,订阅过滤器筛选后的主题
func (p *Publisher) SubscribeTopic(topic topicFunc) chan interface{} {
    ch := make(chan interface{}, p.buffer)
    p.m.Lock()
    p.subscribers[ch] = topic
    p.m.Unlock()
    return ch
}

// 退出订阅
func (p *Publisher) Evict(sub chan interface{}) {
    p.m.Lock()
    defer p.m.Unlock()
    delete(p.subscribers, sub)
    close(sub)
}

// 发布一个主题。遍历所有订阅者,将消息发送到订阅者通道
func (p *Publisher) Publish(v interface{}) {
    p.m.RLock()
    defer p.m.RUnlock()
    var wg sync.WaitGroup
    for sub, topic := range p.subscribers {
        wg.Add(1)
        go p.sendTopic(sub, topic, v, &wg)
    }
    wg.Wait()
}

// 发布主题,可以容忍一定的超时。其实就是将发布的消息发送到订阅者通道中
func (p *Publisher) sendTopic(sub subscriber, topic topicFunc, v interface{}, wg *sync.WaitGroup) {
    defer wg.Done()
    // 存在订阅回调,但是回调失败,直接返回
    if topic != nil && !topic(v) {
        return
    }

    // 将消息发送到订阅通道中,超时退出
    select {
    case sub <- v:
    case <-time.After(p.timeout):
    }
}

// 关闭发布者对象,同时关闭所有的订阅者通道
func (p *Publisher) Close() {
    p.m.Lock()
    defer p.m.Unlock()
    for sub := range p.subscribers {
        delete(p.subscribers, sub)
        close(sub)
    }
}
// main.go
package main

import (
    "fmt"
    "github.com/lwlwilliam/test/go/pubsub"
    "os"
    "os/signal"
    "strings"
    "syscall"
    "time"
)

func main() {
    p := pubsub.NewPublisher(100*time.Millisecond, 5)
    defer p.Close()
    all := p.Subscribe()

    world := p.SubscribeTopic(func(v interface{}) bool {
        if s, ok := v.(string); ok {
            return strings.Contains(s, "world")
        }
        return false
    })

    p.Publish("Hello world 1")
    p.Publish("Hello william 2")
    p.Publish("Hello world 3")
    p.Publish("Hello world 4")
    p.Publish("Hello world 5")
    p.Publish("Hello world 6") // 由于 all 通道满了,所以 all 会丢失该消息
    p.Publish("Hello world 7") // 由于 world 通道也满了,所以 all 和 world 均会丢失该消息

    go func() {
        for msg := range all {
            fmt.Println("all:", msg)
        }
    }()

    go func() {
        for msg := range world {
            fmt.Println("world:", msg)
        }
    }()

    time.Sleep(3 * time.Second)
    p.Publish("Hello world 8") // 正常情况下,睡眠 3 秒后,all 和 world 通道都清空缓存了,所以 all 和 world 均会订阅成功

    // Ctrl+C 退出
    sig := make(chan os.Signal, 1)
    signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
    fmt.Printf("quit (%v)\n", <-sig)
}

生产者/消费者模型和订阅/发布模型的关系,类似于网络协议中的”pull”协议和”push”协议;生产者/消费者模型的数据传输不需要生产者发送给消费者,而是消费者主动”pull”数据;订阅/发布模型需要发布者将数据”push”给订阅者。

Search

    Table of Contents