系统运行流程

区块链以消息为中心,通过发布/订阅服务,所有节点之间的数据同步成为可能。

注意publish执行后,消息将发给全网,包括本地节点,所以本地节点在接收到自己发的消息后,需要特别做好识别(自己发布的消息自己不应该处理)。

一般情况下,startNode启动一个节点时候,miner, fullNode应该是为true,miner可以为true可以为false。一般作为挖矿节点,不执行send交易命令,只有全节点才执行send交易命令,但这只是分工,事实上它也可以发起交易,分工的好处是,它可以专注于挖矿,将更多区块放一起集中挖矿。

FullNodesChanne 通道支持处理的消息支持类型:gettxfrompool,tx;

MiningChannel 通道支持处理的消息类型:inv(type为tx),tx;

GeneralChannel 通道支持处理的消息类型:block,getdata,inv(type为block),getblocks,version;

undefined1、开始

每一个网络节点,可能是挖矿节点,也可能是全节点,或者既是挖矿节点,也是全节点。

  1. // Network 节点的数据结构
  2. type Network struct {
  3. Host host.Host//主机
  4. GeneralChannel *Channel//通用节点
  5. MiningChannel *Channel//挖矿节点
  6. FullNodesChannel *Channel//全节点
  7. Blockchain *blockchain.Blockchain
  8. //Blocks和Transactions消息队列,用于存储新产生的Block或Transaction
  9. //一般而言,我们在主程序执行send交易过程中,根据send参数mineNow决定是否立即挖矿,mineNow为true则存储Block消息队列(立即挖矿)
  10. //mineNow为false则存储Transaction消息队列(不立即挖矿)。两个消息由节点在startNode后启动消息处理协程进行处理
  11. //一般情况下,为提高挖矿效率,我们会汇聚几个交易在一起,然后挖矿一次
  12. Blocks chan *blockchain.Block//Block类型的通道(带缓冲的通道:在StartNode函数中构建Network实例,缓冲数量200)
  13. Transactions chan *blockchain.Transaction//Transaction类型的通道(带缓冲的通道:在StartNode函数中构建Network实例,缓冲数量200)
  14. //是否是挖矿节点
  15. Miner bool
  16. }

通过startNode函数,实例化Network时候,将根据参数,对GeneralChannel、MiningChannel、FullNodesChannel全部进行实例化。

  1. //GeneralChannel 通道订阅消息
  2. generalChannel, _ := JoinChannel(ctx, pubsub, host.ID(), GeneralChannel, true)
  3. //如果是挖矿节点, miningChannel 订阅消息,否则 miningChannel 不订阅消息
  4. subscribe := false
  5. if miner {
  6. subscribe = true
  7. }
  8. miningChannel, _ := JoinChannel(ctx, pubsub, host.ID(), MiningChannel, subscribe)
  9. //如果是全节点, fullNodesChannel 订阅消息,否则 fullNodesChannel 不订阅消息
  10. subscribe = false
  11. if fullNode {
  12. subscribe = true
  13. }
  14. fullNodesChannel, _ := JoinChannel(ctx, pubsub, host.ID(), FullNodesChannel, subscribe)

主程序为有界面命令控制程序,三个实例将作为参数传递给cliui:

  1. // 各通信通道建立命令行界面对象,监控来自三个通道实例的消息
  2. ui := NewCLIUI(generalChannel, miningChannel, fullNodesChannel)

undefined2、所有对network的操作命令,将会通过generalChannel实例(其它实例可能没有实例化),publish到全网

对network发送的消息指令类型有:

  • block
  • inv
  • getblocks
  • getdata
  • tx
  • gettxfrompool
  • version

    如:发送inv命令:

  1. func (net *Network) SendInv(peerId string, _type string, items [][]byte) {
  2. inventory := Inv{net.Host.ID().Pretty(), _type, items}
  3. payload := GobEncode(inventory)
  4. request := append(CmdToBytes("inv"), payload...)
  5. net.GeneralChannel.Publish("接收到 inventory 命令", request, peerId)
  6. }

undefined3、这些事件消息,将在cliui中开协程进行处理:

  1. // HandleStream 处理来自全网发来的且可以处理的消息内容
  2. func (ui *CLIUI) HandleStream(net *Network, content *ChannelContent) {
  3. // ui.displayContent(content)
  4. if content.Payload != nil {
  5. command := BytesToCmd(content.Payload\[:commandLength\])
  6. log.Infof("Received %s command \\n", command)
  7. switch command {
  8. case "block":
  9. net.HandleBlocks(content)
  10. case "inv":
  11. net.HandleInv(content)
  12. case "getblocks":
  13. net.HandleGetBlocks(content)
  14. case "getdata":
  15. net.HandleGetData(content)
  16. case "tx":
  17. net.HandleTx(content)
  18. case "gettxfrompool":
  19. net.HandleGetTxFromPool(content)
  20. case "version":
  21. net.HandleVersion(content)
  22. default:
  23. log.Warn("Unknown Command")
  24. }
  25. }
  26. }

cliui的RUN函数,需要处理来自network的各种事件消息:

  1. // Run 开启协程处理各种事件消息
  2. func (ui *CLIUI) Run(net *Network) error {
  3. go ui.handleEvents(net)
  4. defer ui.end()
  5. return ui.app.Run()
  6. }

handleEvents:

  1. func (ui *CLIUI) handleEvents(net *Network) {
  2. peerRefreshTicker := time.NewTicker(time.Second)
  3. defer peerRefreshTicker.Stop()
  4. go ui.readFromLogs(net.Blockchain.InstanceId)
  5. log.Info("HOST ADDR: ", net.Host.Addrs())
  6. for {
  7. select {
  8. case input := <-ui.inputCh:
  9. err := ui.GeneralChannel.Publish(input, nil, "")//未指定消息接收者,意味着所有节点(peer)均会收到
  10. if err != nil {
  11. log.Errorf("Publish error: %s", err)
  12. }
  13. ui.displaySelfMessage(input)
  14. case <-peerRefreshTicker.C:
  15. // 定期刷新peers list
  16. ui.refreshPeers()
  17. case m := <-ui.GeneralChannel.Content://如果 GeneralChannel 收到消息
  18. ui.HandleStream(net, m)
  19. case m := <-ui.MiningChannel.Content://如果 MiningChannel 收到消息
  20. ui.HandleStream(net, m)
  21. case m := <-ui.FullNodesChannel.Content://如果 FullNodesChannel 收到消息
  22. ui.HandleStream(net, m)
  23. case <-ui.GeneralChannel.ctx.Done():
  24. return
  25. case <-ui.doneCh:
  26. return
  27. }
  28. }
  29. }

undefined4、一切从Inv(block)消息开始

每次挖出新区块,节点向全网广播inv(资产,在区块链里面inv包括block和交易池中的tx两类,但狭义上说,只有block)消息:

  1. func (net *Network) MineTx(memopoolTxs map[string]blockchain.Transaction) {
  2. var txs []\*blockchain.Transaction
  3. log.Infof("挖矿的交易数: %d", len(memopoolTxs))
  4. chain := net.Blockchain.ContinueBlockchain()
  5. for id := range memopoolTxs {
  6. log.Infof("tx: %s \\n", memopoolTxs[id].ID)
  7. tx := memopoolTxs[id\]
  8. log.Info("tx校验: ", chain.VerifyTransaction(&tx))
  9. if chain.VerifyTransaction(&tx) {
  10. txs = append(txs, &tx)
  11. }
  12. }
  13. if len(txs) == 0 {
  14. log.Info("无合法的交易")
  15. }
  16. cbTx := blockchain.MinerTx(MinerAddress, "")
  17. txs = append(txs, cbTx)
  18. newBlock := chain.MineBlock(txs)
  19. UTXOs := blockchain.UTXOSet{Blockchain:chain}
  20. UTXOs.Compute()
  21. log.Info("挖出新的区块")
  22. ** //peerId为空,SendInv发布给全网**
  23. net.SendInv("", "block", [][]byte{newBlock.Hash})
  24. memoryPool.ClearAll()//清除内存池中的全部交易
  25. memoryPool.Wg.Done()
  26. }

当然,收到请求消息getblocks后,节点应向请求方发一个block类型的inv:

  1. func (net *Network) HandleGetBlocks(content *ChannelContent) {
  2. var buff bytes.Buffer
  3. var payload GetBlocks
  4. buff.Write(content.Payload[commandLength:])
  5. dec := gob.NewDecoder(&buff)
  6. err := dec.Decode(&payload)
  7. if err != nil {
  8. log.Panic(err)
  9. }
  10. chain := net.Blockchain.ContinueBlockchain()
  11. blockHashes := chain.GetBlockHashes(payload.Height)
  12. log.Info("LENGTH:", len(blockHashes))
  13. net.SendInv(payload.SendFrom, "block", blockHashes)
  14. }

undefined4、每个network节点启动后做什么

  1. 1、向全网请求区块信息,以补全本地区块链
  2. // 每一个节点均有区块链的一个完整副本
  3. err = RequestBlocks(network)//全网发布version命令,接收到该命令的节点,要么将本地version回给它(接收者的区块链height更大),要么向它发送getblocks命令请求自己没有的区块(接收者的区块链height更小)
  4. 2、启用协程,处理来自自身网络节点事件**(只有启用了rpc才会有相关事件发生)**
  5. //本地的Blocks有了新的block加入队列,或者本地的Transactions有了新的tx加入队列,则将新的block或tx发布给请求者或全网(根据请求者是否为空)
  6. go HandleEvents(network)
  7. 3、如果是矿工节点,启用协程,不断发送ping命令给全节点
  8. if miner {
  9. // 矿工事件循环,以不断地发送一个 ping 给全节点,目的是得到新的交易,为新交易挖矿,并添加到区块链
  10. //矿工节点将定时向全网发送gettxfrompool指令,请求每个节点交易内存池(pending)的待挖矿交易
  11. go network.MinersEventLoop()
  12. }
  13. if err != nil {
  14. panic(err)
  15. }
  16. 4、运行UI界面,将在Run函数体中启动协程,通过三个通道,循环接收并处理全网节点publish的消息
  17. //(包括generalChannel, miningChannel, fullNodesChannel节点)
  18. if err = ui.Run(network); err != nil {
  19. log.Error("运行文字UI发生错误: %s", err)
  20. }