系统运行流程
区块链以消息为中心,通过发布/订阅服务,所有节点之间的数据同步成为可能。
注意publish执行后,消息将发给全网,包括本地节点,所以本地节点在接收到自己发的消息后,需要特别做好识别(自己发布的消息自己不应该处理)。
一般情况下,startNode启动一个节点时候,miner, fullNode应该是为true,miner可以为true可以为false。一般作为挖矿节点,不执行send交易命令,只有全节点才执行send交易命令,但这只是分工,事实上它也可以发起交易,分工的好处是,它可以专注于挖矿,将更多区块放一起集中挖矿。
FullNodesChanne 通道支持处理的消息支持类型:gettxfrompool,tx;
MiningChannel 通道支持处理的消息类型:inv(type为tx),tx;
GeneralChannel 通道支持处理的消息类型:block,getdata,inv(type为block),getblocks,version;
undefined1、开始
每一个网络节点,可能是挖矿节点,也可能是全节点,或者既是挖矿节点,也是全节点。
// Network 节点的数据结构type Network struct {Host host.Host//主机GeneralChannel *Channel//通用节点MiningChannel *Channel//挖矿节点FullNodesChannel *Channel//全节点Blockchain *blockchain.Blockchain//Blocks和Transactions消息队列,用于存储新产生的Block或Transaction//一般而言,我们在主程序执行send交易过程中,根据send参数mineNow决定是否立即挖矿,mineNow为true则存储Block消息队列(立即挖矿)//mineNow为false则存储Transaction消息队列(不立即挖矿)。两个消息由节点在startNode后启动消息处理协程进行处理//一般情况下,为提高挖矿效率,我们会汇聚几个交易在一起,然后挖矿一次Blocks chan *blockchain.Block//Block类型的通道(带缓冲的通道:在StartNode函数中构建Network实例,缓冲数量200)Transactions chan *blockchain.Transaction//Transaction类型的通道(带缓冲的通道:在StartNode函数中构建Network实例,缓冲数量200)//是否是挖矿节点Miner bool}
通过startNode函数,实例化Network时候,将根据参数,对GeneralChannel、MiningChannel、FullNodesChannel全部进行实例化。
//GeneralChannel 通道订阅消息generalChannel, _ := JoinChannel(ctx, pubsub, host.ID(), GeneralChannel, true)//如果是挖矿节点, miningChannel 订阅消息,否则 miningChannel 不订阅消息subscribe := falseif miner {subscribe = true}miningChannel, _ := JoinChannel(ctx, pubsub, host.ID(), MiningChannel, subscribe)//如果是全节点, fullNodesChannel 订阅消息,否则 fullNodesChannel 不订阅消息subscribe = falseif fullNode {subscribe = true}fullNodesChannel, _ := JoinChannel(ctx, pubsub, host.ID(), FullNodesChannel, subscribe)
主程序为有界面命令控制程序,三个实例将作为参数传递给cliui:
// 各通信通道建立命令行界面对象,监控来自三个通道实例的消息ui := NewCLIUI(generalChannel, miningChannel, fullNodesChannel)
undefined2、所有对network的操作命令,将会通过generalChannel实例(其它实例可能没有实例化),publish到全网
对network发送的消息指令类型有:
- block
- inv
- getblocks
- getdata
- tx
- gettxfrompool
version
如:发送inv命令:
func (net *Network) SendInv(peerId string, _type string, items [][]byte) {inventory := Inv{net.Host.ID().Pretty(), _type, items}payload := GobEncode(inventory)request := append(CmdToBytes("inv"), payload...)net.GeneralChannel.Publish("接收到 inventory 命令", request, peerId)}
undefined3、这些事件消息,将在cliui中开协程进行处理:
// HandleStream 处理来自全网发来的且可以处理的消息内容func (ui *CLIUI) HandleStream(net *Network, content *ChannelContent) {// ui.displayContent(content)if content.Payload != nil {command := BytesToCmd(content.Payload\[:commandLength\])log.Infof("Received %s command \\n", command)switch command {case "block":net.HandleBlocks(content)case "inv":net.HandleInv(content)case "getblocks":net.HandleGetBlocks(content)case "getdata":net.HandleGetData(content)case "tx":net.HandleTx(content)case "gettxfrompool":net.HandleGetTxFromPool(content)case "version":net.HandleVersion(content)default:log.Warn("Unknown Command")}}}
cliui的RUN函数,需要处理来自network的各种事件消息:
// Run 开启协程处理各种事件消息func (ui *CLIUI) Run(net *Network) error {go ui.handleEvents(net)defer ui.end()return ui.app.Run()}
handleEvents:
func (ui *CLIUI) handleEvents(net *Network) {peerRefreshTicker := time.NewTicker(time.Second)defer peerRefreshTicker.Stop()go ui.readFromLogs(net.Blockchain.InstanceId)log.Info("HOST ADDR: ", net.Host.Addrs())for {select {case input := <-ui.inputCh:err := ui.GeneralChannel.Publish(input, nil, "")//未指定消息接收者,意味着所有节点(peer)均会收到if err != nil {log.Errorf("Publish error: %s", err)}ui.displaySelfMessage(input)case <-peerRefreshTicker.C:// 定期刷新peers listui.refreshPeers()case m := <-ui.GeneralChannel.Content://如果 GeneralChannel 收到消息ui.HandleStream(net, m)case m := <-ui.MiningChannel.Content://如果 MiningChannel 收到消息ui.HandleStream(net, m)case m := <-ui.FullNodesChannel.Content://如果 FullNodesChannel 收到消息ui.HandleStream(net, m)case <-ui.GeneralChannel.ctx.Done():returncase <-ui.doneCh:return}}}
undefined4、一切从Inv(block)消息开始
每次挖出新区块,节点向全网广播inv(资产,在区块链里面inv包括block和交易池中的tx两类,但狭义上说,只有block)消息:
func (net *Network) MineTx(memopoolTxs map[string]blockchain.Transaction) {var txs []\*blockchain.Transactionlog.Infof("挖矿的交易数: %d", len(memopoolTxs))chain := net.Blockchain.ContinueBlockchain()for id := range memopoolTxs {log.Infof("tx: %s \\n", memopoolTxs[id].ID)tx := memopoolTxs[id\]log.Info("tx校验: ", chain.VerifyTransaction(&tx))if chain.VerifyTransaction(&tx) {txs = append(txs, &tx)}}if len(txs) == 0 {log.Info("无合法的交易")}cbTx := blockchain.MinerTx(MinerAddress, "")txs = append(txs, cbTx)newBlock := chain.MineBlock(txs)UTXOs := blockchain.UTXOSet{Blockchain:chain}UTXOs.Compute()log.Info("挖出新的区块")** //peerId为空,SendInv发布给全网**net.SendInv("", "block", [][]byte{newBlock.Hash})memoryPool.ClearAll()//清除内存池中的全部交易memoryPool.Wg.Done()}
当然,收到请求消息getblocks后,节点应向请求方发一个block类型的inv:
func (net *Network) HandleGetBlocks(content *ChannelContent) {var buff bytes.Buffervar payload GetBlocksbuff.Write(content.Payload[commandLength:])dec := gob.NewDecoder(&buff)err := dec.Decode(&payload)if err != nil {log.Panic(err)}chain := net.Blockchain.ContinueBlockchain()blockHashes := chain.GetBlockHashes(payload.Height)log.Info("LENGTH:", len(blockHashes))net.SendInv(payload.SendFrom, "block", blockHashes)}
undefined4、每个network节点启动后做什么
1、向全网请求区块信息,以补全本地区块链// 每一个节点均有区块链的一个完整副本err = RequestBlocks(network)//全网发布version命令,接收到该命令的节点,要么将本地version回给它(接收者的区块链height更大),要么向它发送getblocks命令请求自己没有的区块(接收者的区块链height更小)2、启用协程,处理来自自身网络节点事件**(只有启用了rpc才会有相关事件发生)**//本地的Blocks有了新的block加入队列,或者本地的Transactions有了新的tx加入队列,则将新的block或tx发布给请求者或全网(根据请求者是否为空)go HandleEvents(network)3、如果是矿工节点,启用协程,不断发送ping命令给全节点if miner {// 矿工事件循环,以不断地发送一个 ping 给全节点,目的是得到新的交易,为新交易挖矿,并添加到区块链//矿工节点将定时向全网发送gettxfrompool指令,请求每个节点交易内存池(pending)的待挖矿交易go network.MinersEventLoop()}if err != nil {panic(err)}4、运行UI界面,将在Run函数体中启动协程,通过三个通道,循环接收并处理全网节点publish的消息//(包括generalChannel, miningChannel, fullNodesChannel节点)if err = ui.Run(network); err != nil {log.Error("运行文字UI发生错误: %s", err)}
