• 52389

    文章

  • 521

    评论

  • 43

    友链

  • 最近新加了换肤功能,大家多来逛逛吧~~~~
  • 喜欢这个网站的朋友可以加一下QQ群,我们一起交流技术。

IPFS Series -- Pubsub

695856371Web网页设计师②群 | 喜欢本站的朋友可以收藏本站,或者加入我们大家一起来交流技术!

欢迎来到梁钟霖个人博客网站。本个人博客网站提供最新的站长新闻,各种互联网资讯。 还提供个人博客模板,最新最全的java教程,java面试题。在此我将尽我最大所能将此个人博客网站做的最好! 谢谢大家,愿大家一起进步!

Pubsub的源程序在这里。 其库包的大致说明在这里

pubsub功能目前还属于ipfs的一个实验性质的功能,如果要开启pubsub功能,在启动ipfs daempon的时候需要指定参数: --enable-pubsub-experiment。

ipfs daemon --enable-pubsub-experiment

具体的启动参见这里

数据结构

// PubSub 是topics的集合
type PubSub struct {
    cmdChan  chan cmd  //用来接收命令/cmd的Channel
    capacity int       //Channel的容量 
}

cmdChan用来接收命令,capacity指定这个Pubsub下每一个topic的容最大容量。

type cmd struct {
    op     operation
    topics []string
    ch     chan interface{}
    msg    interface{}
}

接收的cmd的op可以是:sub,subOnce,subOnceEach,pub,unsub,unsubAll,closeTopic,shutdown。

type registry struct {
    topics    map[string]map[chan interface{}]subtype
    revTopics map[chan interface{}]map[string]bool
}


topics:

Topic名 -->[消息的Channel-->子类型]

revTopics:

消息的Channel-->[topic名 --> bool]

 

程序分析

PubSub

func (ps *PubSub) Sub(topics ...string) chan interface{} 
func (ps *PubSub) SubOnce(topics ...string) chan interface{} 
func (ps *PubSub) SubOnceEach(topics ...string) chan interface{} 
func (ps *PubSub) sub(op operation, topics ...string) chan interface{} 
func (ps *PubSub) AddSub(ch chan interface{}, topics ...string) 
func (ps *PubSub) AddSubOnceEach(ch chan interface{}, topics ...string) 
func (ps *PubSub) Pub(msg interface{}, topics ...string) 
func (ps *PubSub) Unsub(ch chan interface{}, topics ...string) 
func (ps *PubSub) Close(topics ...string) 
func (ps *PubSub) Shutdown() 

上面所有的函数都是向Pubsub的cmdChan发送相应的命令。

Pubsub类中最重要的函数就是Start()

func (ps *PubSub) start() {
    reg := registry{
        topics:    make(map[string]map[chan interface{}]subtype),
        revTopics: make(map[chan interface{}]map[string]bool),
    }

loop:
    for cmd := range ps.cmdChan {
       if cmd.topics == nil {
          switch cmd.op {
          case unsubAll:
                reg.removeChannel(cmd.ch)

          case shutdown:
                break loop
          }

          continue loop
       }

       for _, topic := range cmd.topics {
           switch cmd.op {
           case sub:
                reg.add(topic, cmd.ch, stNorm)

           case subOnce:
                reg.add(topic, cmd.ch, stOnceAny)

           case subOnceEach:
                reg.add(topic, cmd.ch, stOnceEach)

           case pub:
                reg.send(topic, cmd.msg)

           case unsub:
                reg.remove(topic, cmd.ch)

           case closeTopic:
                reg.removeTopic(topic)
           }
        }
    }

    for topic, chans := range reg.topics {
        for ch, _ := range chans {
            reg.remove(topic, ch)
        }
    }
}

程序比较简单直接,一个死循环:首先用For...range来监视CmdChan Channel。如果没有收到命令,则等待。如果有收到Command,就根据Command的OP来做相应的处理。一个Pubsub类只有一个registry对象,用来记录相应的topics和channel/通道

 

Registry

func (reg *registry) add(topic string, ch chan interface{}, st subtype) 
将相应的topic和Channel信息加入registry对象

func (reg *registry) send(topic string, msg interface{})
根据传入的topic,给相应的channel发送 msg。同时根据topic的subtype,做相应的处理。
比如:stOnceAny和stOnceEach,删除相应的channel或者topic

func (reg *registry) removeTopic(topic string)
删除topic,调用下面的remove()实现

func (reg *registry) removeChannel(ch chan interface{})
删除Channel,调用下面的remove()实现

func (reg *registry) remove(topic string, ch chan interface{})
删除相应的topic和Channel

 

案例

使用pubsub功能有下面两个应用案例,博主会在后续的系列中进行分析。

ipfs-shipyard/ipfs-pubsub-room​github.com图标

eolszewski/ipfs-pubsub-chatroom​github.com图标

 

目前IPFS上有两个标杆应用是基于pubsub功能进行搭建的。

  • orbit-db: 分布式数据库
  • Orbit: 点对点的聊天工具

 转载至链接:https://my.oschina.net/gavinzheng731/blog/3007754。


转载原创文章请注明出处,转载至: 梁钟霖个人博客www.liangzl.com

您觉喜欢本网站,或者觉得本文章对您有帮助,那么可以选择打赏。
打赏多少,您高兴就行,谢谢您对梁钟霖这小子的支持! ~(@^_^@)~

  • 微信扫一扫

  • 支付宝扫一扫

    支付宝打赏

0条评论

Loading...


发表评论

电子邮件地址不会被公开。 必填项已用*标注

自定义皮肤
注册梁钟霖个人博客