P2P区块链最简体验
undefined前言
作为最简准备,旨在熟悉libp2p library的使用方法。一个节点的区块链每隔5秒广播本地的区块链,启动监听,并在对端节点连接到后,将本地的区块链数据以json字符串形式写入网络,对端节点读取后,解析为区块链,如果解析而来的区块链长度长于本地的区块链,则直接更新本地区块链,否则丢弃。
libp2p很特别,采用mutiaddress,我们可以自定义系统的网络地址形式(称为网络协议),比如,在本文中,我们设计为:/ip4/127.0.0.1/tcp/listenPort,这个地址用于服务器监听,其中listenPort为服务器监听端口;另外一个地址是:/ipfs/ID,其中ID为全网唯一,唯一标识本服务器,用于被其它节点发现,例如:/ipfs/QmYyQSo1c1Ym7orWxLYvCrM2EmxFTANf8wXmmE7DWjhx5N
实际改造我们的区块链时候,会有更细粒度的控制。
undefined导入所需的包
其中大部分的包来自于go-libp2p:
"github.com/davecgh/go-spew/spew"golog "github.com/ipfs/go-log"libp2p "github.com/libp2p/go-libp2p"crypto "github.com/libp2p/go-libp2p-crypto"host "github.com/libp2p/go-libp2p-host"net "github.com/libp2p/go-libp2p-net"peer "github.com/libp2p/go-libp2p-peer"pstore "github.com/libp2p/go-libp2p-peerstore"ma "github.com/multiformats/go-multiaddr"gologging "github.com/whyrusleeping/go-logging"
spew包是为了能够友好地打印区块链数据。
undefined创建一个LibP2P节点主机
// makeBasicHost 创建一个LibP2P主机//randseed:一个随机数,提供随机数创建主机,程序会更健壮//listenPort:监听端口// secio:是否对数据流进行加密,推荐打开func makeBasicHost(listenPort int, secio bool, randseed int64) (host.Host, error) {// 如果seed为0,使用真实的密码随机源,//否则,使用确定性随机性源,以使生成的密钥在多次运行中保持不变var r io.Readerif randseed == 0 {r = rand.Reader} else {r = mrand.New(mrand.NewSource(randseed))}// 为主机产生一对钥匙. 我们将使用它 来获得一个合法的主机IDpriv, _, err := crypto.GenerateKeyPairWithReader(crypto.RSA, 2048, r)if err != nil {return nil, err}//选项opts := []libp2p.Option{libp2p.ListenAddrStrings(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", listenPort)),//监听地址和端口libp2p.Identity(priv),}//默认是开启的if !secio {opts = append(opts, libp2p.NoSecurity)}//利用相关参数,创建主机basicHost,获得全网唯一的IPFS ID,唯一标识本服务器节点basicHost, err := libp2p.New(context.Background(), opts...)if err != nil {return nil, err}// 建立主机的Multiaddr,libp2p使用一种独特的Mutliaddr,而非传统的IP+端口,用于节点之间互相发现hostAddr, _ := ma.NewMultiaddr(fmt.Sprintf("/ipfs/%s", basicHost.ID().Pretty()))//服务器的ipfs地址,用于被其它节点发现// 现在,我们可以通过封装两个地址来构建一个可抵达主机的完整的Multiaddraddr := basicHost.Addrs()[0]fullAddr := addr.Encapsulate(hostAddr)log.Printf("I am %s\n", fullAddr)if secio {log.Printf("现在运行命令: \"go run main.go -l %d -d %s -secio\" 在一个不同的终端\n", listenPort+1, fullAddr)} else {log.Printf("现在运行命令: \"go run main.go -l %d -d %s\" 在一个不同的终端\n", listenPort+1, fullAddr)}return basicHost, nil}
undefined流处理
我们需要让我们的主机处理传入的数据流。既要处理读取,也要处理写操作。
在流处理中,本地需要对Blockchain使用互斥锁进行读写保护,此外,将数据写入到网络上也需要使用互斥锁
func handleStream(s net.Stream) {log.Println("Got a new stream!")// bufio为非阻塞的读和写操作创建一个读写缓冲流rw := bufio.NewReadWriter(bufio.NewReader(s), bufio.NewWriter(s))go readData(rw)go writeData(rw)//流将保持打开直到你关闭它(或者其它方关闭它)}
undefined读取
func readData(rw *bufio.ReadWriter) {for {//无限循环,永不停歇地读取外面进来的数据//我们使用`ReadString`解析从其它节点发送过来的新的区块链(JSON字符串)。str, err := rw.ReadString('\n')//以'\n'为分隔符读取。返回的是字符串拷贝。if err != nil {//只有一种情况会返回err:没有遇到分隔符。log.Fatal(err)}if str == "" {//没有读取到任何数据return}if str != "\n" {chain := make([]Block, 0)//make只用于映射、切片和程道,不返回指针,这里创建一个[]Block类型的切片if err := json.Unmarshal([]byte(str), &chain); err != nil {//json转为结构对象log.Fatal(err)}mutex.Lock()//独占互斥锁,保护的是Blockchainif len(chain) > len(Blockchain) {//如果流中解析的区块链长度大于本地区块链的长度,替换本地区块链为读取的区块链Blockchain = chainbytes, err := json.MarshalIndent(Blockchain, "", " ")//缩进一个空格,对象Blockchain转为jsonif err != nil {log.Fatal(err)}fmt.Printf("\x1b[32m%s\x1b[0m> ", string(bytes))}mutex.Unlock()}}}
undefined写
我们用一个Go例程启动函数,它每隔5秒广播我们的区块链的最新状态给我们的对等体。如果长度比他们的短,它们会接受并扔掉。如果更长,他们会接受的。无论是哪种方式,所有的对等体都在不断地通过网络的最新状态来更新他们的链链。
func writeData(rw *bufio.ReadWriter) {go func() {//每隔5秒广播我们的区块链的最新状态给我们的对等体for {time.Sleep(5 * time.Second)mutex.Lock()//互斥锁,保护Blockchainbytes, err := json.Marshal(Blockchain)//本地Blockchain转为json字符串if err != nil {log.Println(err)}mutex.Unlock()mutex.Lock()//互斥锁,独占rwrw.WriteString(fmt.Sprintf("%s\n", string(bytes)))rw.Flush()//将缓存的数据真正写入到网络上mutex.Unlock()}}()stdReader := bufio.NewReader(os.Stdin)//从终端读取待发送到信息(心率数)for {//无限循环,随时读取终端填写的心律数据,发送到网上fmt.Print("> ")sendData, err := stdReader.ReadString('\n')if err != nil {log.Fatal(err)}sendData = strings.Replace(sendData, "\n", "", -1)bpm, err := strconv.Atoi(sendData)//将读取的字符串转为数字(心率为数字)if err != nil {log.Fatal(err)}newBlock := generateBlock(Blockchain[len(Blockchain)-1], bpm)//创建区块if isBlockValid(newBlock, Blockchain[len(Blockchain)-1]) {mutex.Lock()//凡是读或写Blockchain,均需要开启互斥锁Blockchain = append(Blockchain, newBlock)//将新区块加入到区块链mutex.Unlock()}mutex.Lock()bytes, err := json.Marshal(Blockchain)//读取本地区块,转为jsonif err != nil {log.Println(err)}mutex.Unlock()//使用spew.Dump 这个函数可以以非常美观和方便阅读的方式将 struct、slice 等数据打印在控制台里,方便我们调试spew.Dump(Blockchain)mutex.Lock()//互斥锁,独占rwrw.WriteString(fmt.Sprintf("%s\n", string(bytes)))rw.Flush()//将本地区块数据写入到网络mutex.Unlock()}}
secio是否允许安全传输。我们会一直把这个开关打开的。target指定想要连接的host地址,这里我们其实扮演的节点去连接其他host。listenF打开指定端口让其他节点连接,这里我们扮演的host。我们用一个Go例程启动函数,它每隔5秒广播我们的区块链的最新状态给我们的对等体。如果长度比他们的短,他们会接受并扔掉。如果更长,他们会接受并更新本地的区块链。无论是哪种方式,所有的对等体都在不断地通过网络的最新状态来更新他们的链链。
我们进行一些字符串操作,以确保输入的BPM是一个整数,并且格式正确,可以添加为新块。我们通过我们的标准BangLink函数(见上面的“Blockchain stuff”部分)。然后,我们
Marshal它,使它看起来漂亮,打印到我们的控制台,用spew.Dump验证。然后我们用rw.WriteString将它广播到我们的连接的对等体。创建了我们的处理程序和读写逻辑来处理输入和输出的块链。通过这些函数,我们已经为每个对等点创建了一种方法,以连续地相互检查其块链的状态,并且在同一时间,它们都被更新到最新状态(最长的有效块链)。
undefinedmain
t := time.Now()genesisBlock := Block{}genesisBlock = Block{0, t.String(), 0, calculateHash(genesisBlock), ""}Blockchain = append(Blockchain, genesisBlock)// LibP2P 使用 golog记录消息日志. 我们可以控制日志的详细程度golog.SetAllLoggers(gologging.INFO) // 变更为 DEBUG 可以获得额外的信息// 从命令行解析出选项listenF := flag.Int("l", 0, "等待到来的连接")target := flag.String("d", "", "连接的目标节点")secio := flag.Bool("secio", false, "启用 secio")seed := flag.Int64("seed", 0, "设定产生ID的随机种子")flag.Parse()if *listenF == 0 {log.Fatal("请提供绑定的端口 -l")}// 创建一个主机ha, err := makeBasicHost(*listenF, *secio, *seed)if err != nil {log.Fatal(err)}//target为我们指定要连接的另一主机的地址,这意味着如果使用此标志,我们将充当主机的对等方if *target == "" {//只充当主机log.Println("listening for connections")// 将流处理器设定在主机: A。 /p2p/1.0.0 是一个用户自定义的协议ha.SetStreamHandler("/p2p/1.0.0", handleStream)select {} // 一直挂起} else {//作为主机的对等方,连接到target主机ha.SetStreamHandler("/p2p/1.0.0", handleStream)//对等端仍然要开启监听,接受其它节点的连接//**下面的代码,是作为对等端B主机所做的工作:连接到A主机节点,执行读和写,实现两个节点之间的网络通信**// 下面的代码从目标节点的mutiaddress中展开获得节点的IDipfsaddr, err := ma.NewMultiaddr(*target)if err != nil {log.Fatalln(err)}pid, err := ipfsaddr.ValueForProtocol(ma.P_IPFS)if err != nil {log.Fatalln(err)}peerid, err := peer.IDB58Decode(pid)//将58位的string转化为idif err != nil {log.Fatalln(err)}// 从目标主机解封装 /ipfs/<peerID>// /ip4/<a.b.c.d>/ipfs/<peer> 变成 /ip4/<a.b.c.d>targetPeerAddr, _ := ma.NewMultiaddr(fmt.Sprintf("/ipfs/%s", peer.IDB58Encode(peerid)))targetAddr := ipfsaddr.Decapsulate(targetPeerAddr)// 我们有了一个节点ID和targetAddr,将它添加到peerstore// 这样LibP2就知道如何联系到它ha.Peerstore().AddAddr(peerid, targetAddr, pstore.PermanentAddrTTL)log.Println("正在打开B到A的网络流...")// 创建一个新的从主机B到A的流// 它应当被主机A通过我们上面设定的处理器进行处理// 因为我们使用相同的 /p2p/1.0.0 协议s, err := ha.NewStream(context.Background(), peerid, "/p2p/1.0.0")if err != nil {log.Fatalln(err)}// 创建一个新的缓冲流,这样读和写将不会阻塞rw := bufio.NewReadWriter(bufio.NewReader(s), bufio.NewWriter(s))// 创建一个线程读和写数据go writeData(rw)go readData(rw)select {} // 永远挂起}
我们设置所有的命令行标志。
secio我们以前覆盖并允许安全流。我们将确保在运行程序时始终使用这个标志。target让我们指定要连接的另一主机的地址,这意味着如果使用此标志,我们将充当主机的对等方。listenF打开了我们希望允许连接的端口,这意味着我们作为主机。我们既可以是主机(接收连接),也可以是对等体(连接到其他主机)。这就是这个系统真正成为P2P的原因!seed是可选的随机播种器,用来构造我们的地址,其他节点可以用来连接我们。然后,我们创建了一个新的主机,我们之前创建了
makeBasicHost函数。如果我们只充当主机(即,我们没有连接到其他主机),我们指定如果*target==“”,则使用我们之前创建的setStreamHandle函数启动处理程序,这是我们的监听器代码的结束。如果我们确实想要连接到另一个主机,我们移动到
else部分。我们再次设置我们的处理程序,因为我们作为一个主机和一个连接的对等体。接下来的几行解构了我们提供给目标的字符串,这样我们就可以找到我们想要连接的主机。这也被称为解封装。
我们最终得到要连接的主机的
peerID和目标地址targetAddr,并将该记录添加到“存储”中,以便跟踪我们与谁连接。然后,我们使用
ha.NewStream创建想要连接到的对等体连接。我们希望能够接收和发送数据流(我们的区块链),因此就像我们在处理程序中做的那样,我们创建一个ReadWriter,并为readData和writeData创建单独的Go例程。最后我们通过空的select来阻塞程序,这样程序不会停止。
