P2P区块链最简体验

undefined前言

作为最简准备,旨在熟悉libp2p library的使用方法。一个节点的区块链每隔5秒广播本地的区块链,启动监听,并在对端节点连接到后,将本地的区块链数据以json字符串形式写入网络,对端节点读取后,解析为区块链,如果解析而来的区块链长度长于本地的区块链,则直接更新本地区块链,否则丢弃。

libp2p很特别,采用mutiaddress,我们可以自定义系统的网络地址形式(称为网络协议),比如,在本文中,我们设计为:/ip4/127.0.0.1/tcp/listenPort,这个地址用于服务器监听,其中listenPort为服务器监听端口;另外一个地址是:/ipfs/ID,其中ID为全网唯一,唯一标识本服务器,用于被其它节点发现,例如:/ipfs/QmYyQSo1c1Ym7orWxLYvCrM2EmxFTANf8wXmmE7DWjhx5N

实际改造我们的区块链时候,会有更细粒度的控制。

undefined导入所需的包

其中大部分的包来自于go-libp2p

  1. "github.com/davecgh/go-spew/spew"
  2. golog "github.com/ipfs/go-log"
  3. libp2p "github.com/libp2p/go-libp2p"
  4. crypto "github.com/libp2p/go-libp2p-crypto"
  5. host "github.com/libp2p/go-libp2p-host"
  6. net "github.com/libp2p/go-libp2p-net"
  7. peer "github.com/libp2p/go-libp2p-peer"
  8. pstore "github.com/libp2p/go-libp2p-peerstore"
  9. ma "github.com/multiformats/go-multiaddr"
  10. gologging "github.com/whyrusleeping/go-logging"

spew包是为了能够友好地打印区块链数据。

undefined创建一个LibP2P节点主机

  1. // makeBasicHost 创建一个LibP2P主机
  2. //randseed:一个随机数,提供随机数创建主机,程序会更健壮
  3. //listenPort:监听端口
  4. // secio:是否对数据流进行加密,推荐打开
  5. func makeBasicHost(listenPort int, secio bool, randseed int64) (host.Host, error) {
  6. // 如果seed为0,使用真实的密码随机源,
  7. //否则,使用确定性随机性源,以使生成的密钥在多次运行中保持不变
  8. var r io.Reader
  9. if randseed == 0 {
  10. r = rand.Reader
  11. } else {
  12. r = mrand.New(mrand.NewSource(randseed))
  13. }
  14. // 为主机产生一对钥匙. 我们将使用它 来获得一个合法的主机ID
  15. priv, _, err := crypto.GenerateKeyPairWithReader(crypto.RSA, 2048, r)
  16. if err != nil {
  17. return nil, err
  18. }
  19. //选项
  20. opts := []libp2p.Option{
  21. libp2p.ListenAddrStrings(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", listenPort)),//监听地址和端口
  22. libp2p.Identity(priv),
  23. }
  24. //默认是开启的
  25. if !secio {
  26. opts = append(opts, libp2p.NoSecurity)
  27. }
  28. //利用相关参数,创建主机basicHost,获得全网唯一的IPFS ID,唯一标识本服务器节点
  29. basicHost, err := libp2p.New(context.Background(), opts...)
  30. if err != nil {
  31. return nil, err
  32. }
  33. // 建立主机的Multiaddr,libp2p使用一种独特的Mutliaddr,而非传统的IP+端口,用于节点之间互相发现
  34. hostAddr, _ := ma.NewMultiaddr(fmt.Sprintf("/ipfs/%s", basicHost.ID().Pretty()))//服务器的ipfs地址,用于被其它节点发现
  35. // 现在,我们可以通过封装两个地址来构建一个可抵达主机的完整的Multiaddr
  36. addr := basicHost.Addrs()[0]
  37. fullAddr := addr.Encapsulate(hostAddr)
  38. log.Printf("I am %s\n", fullAddr)
  39. if secio {
  40. log.Printf("现在运行命令: \"go run main.go -l %d -d %s -secio\" 在一个不同的终端\n", listenPort+1, fullAddr)
  41. } else {
  42. log.Printf("现在运行命令: \"go run main.go -l %d -d %s\" 在一个不同的终端\n", listenPort+1, fullAddr)
  43. }
  44. return basicHost, nil
  45. }

undefined流处理

我们需要让我们的主机处理传入的数据流。既要处理读取,也要处理写操作。

在流处理中,本地需要对Blockchain使用互斥锁进行读写保护,此外,将数据写入到网络上也需要使用互斥锁

  1. func handleStream(s net.Stream) {
  2. log.Println("Got a new stream!")
  3. // bufio为非阻塞的读和写操作创建一个读写缓冲流
  4. rw := bufio.NewReadWriter(bufio.NewReader(s), bufio.NewWriter(s))
  5. go readData(rw)
  6. go writeData(rw)
  7. //流将保持打开直到你关闭它(或者其它方关闭它)
  8. }

undefined读取

  1. func readData(rw *bufio.ReadWriter) {
  2. for {//无限循环,永不停歇地读取外面进来的数据
  3. //我们使用`ReadString`解析从其它节点发送过来的新的区块链(JSON字符串)。
  4. str, err := rw.ReadString('\n')//以'\n'为分隔符读取。返回的是字符串拷贝。
  5. if err != nil {//只有一种情况会返回err:没有遇到分隔符。
  6. log.Fatal(err)
  7. }
  8. if str == "" {//没有读取到任何数据
  9. return
  10. }
  11. if str != "\n" {
  12. chain := make([]Block, 0)//make只用于映射、切片和程道,不返回指针,这里创建一个[]Block类型的切片
  13. if err := json.Unmarshal([]byte(str), &chain); err != nil {//json转为结构对象
  14. log.Fatal(err)
  15. }
  16. mutex.Lock()//独占互斥锁,保护的是Blockchain
  17. if len(chain) > len(Blockchain) {//如果流中解析的区块链长度大于本地区块链的长度,替换本地区块链为读取的区块链
  18. Blockchain = chain
  19. bytes, err := json.MarshalIndent(Blockchain, "", " ")//缩进一个空格,对象Blockchain转为json
  20. if err != nil {
  21. log.Fatal(err)
  22. }
  23. fmt.Printf("\x1b[32m%s\x1b[0m> ", string(bytes))
  24. }
  25. mutex.Unlock()
  26. }
  27. }
  28. }

undefined写

我们用一个Go例程启动函数,它每隔5秒广播我们的区块链的最新状态给我们的对等体。如果长度比他们的短,它们会接受并扔掉。如果更长,他们会接受的。无论是哪种方式,所有的对等体都在不断地通过网络的最新状态来更新他们的链链。

  1. func writeData(rw *bufio.ReadWriter) {
  2. go func() {//每隔5秒广播我们的区块链的最新状态给我们的对等体
  3. for {
  4. time.Sleep(5 * time.Second)
  5. mutex.Lock()//互斥锁,保护Blockchain
  6. bytes, err := json.Marshal(Blockchain)//本地Blockchain转为json字符串
  7. if err != nil {
  8. log.Println(err)
  9. }
  10. mutex.Unlock()
  11. mutex.Lock()//互斥锁,独占rw
  12. rw.WriteString(fmt.Sprintf("%s\n", string(bytes)))
  13. rw.Flush()//将缓存的数据真正写入到网络上
  14. mutex.Unlock()
  15. }
  16. }()
  17. stdReader := bufio.NewReader(os.Stdin)//从终端读取待发送到信息(心率数)
  18. for {//无限循环,随时读取终端填写的心律数据,发送到网上
  19. fmt.Print("> ")
  20. sendData, err := stdReader.ReadString('\n')
  21. if err != nil {
  22. log.Fatal(err)
  23. }
  24. sendData = strings.Replace(sendData, "\n", "", -1)
  25. bpm, err := strconv.Atoi(sendData)//将读取的字符串转为数字(心率为数字)
  26. if err != nil {
  27. log.Fatal(err)
  28. }
  29. newBlock := generateBlock(Blockchain[len(Blockchain)-1], bpm)//创建区块
  30. if isBlockValid(newBlock, Blockchain[len(Blockchain)-1]) {
  31. mutex.Lock()//凡是读或写Blockchain,均需要开启互斥锁
  32. Blockchain = append(Blockchain, newBlock)//将新区块加入到区块链
  33. mutex.Unlock()
  34. }
  35. mutex.Lock()
  36. bytes, err := json.Marshal(Blockchain)//读取本地区块,转为json
  37. if err != nil {
  38. log.Println(err)
  39. }
  40. mutex.Unlock()
  41. //使用spew.Dump 这个函数可以以非常美观和方便阅读的方式将 struct、slice 等数据打印在控制台里,方便我们调试
  42. spew.Dump(Blockchain)
  43. mutex.Lock()//互斥锁,独占rw
  44. rw.WriteString(fmt.Sprintf("%s\n", string(bytes)))
  45. rw.Flush()//将本地区块数据写入到网络
  46. mutex.Unlock()
  47. }
  48. }
  • secio 是否允许安全传输。我们会一直把这个开关打开的。
  • target 指定想要连接的host地址,这里我们其实扮演的节点去连接其他host。
  • listenF打开指定端口让其他节点连接,这里我们扮演的host。

    我们用一个Go例程启动函数,它每隔5秒广播我们的区块链的最新状态给我们的对等体。如果长度比他们的短,他们会接受并扔掉。如果更长,他们会接受并更新本地的区块链。无论是哪种方式,所有的对等体都在不断地通过网络的最新状态来更新他们的链链。

    我们进行一些字符串操作,以确保输入的BPM是一个整数,并且格式正确,可以添加为新块。我们通过我们的标准BangLink函数(见上面的“Blockchain stuff”部分)。然后,我们Marshal它,使它看起来漂亮,打印到我们的控制台,用spew.Dump验证。然后我们用rw.WriteString将它广播到我们的连接的对等体。

    创建了我们的处理程序和读写逻辑来处理输入和输出的块链。通过这些函数,我们已经为每个对等点创建了一种方法,以连续地相互检查其块链的状态,并且在同一时间,它们都被更新到最新状态(最长的有效块链)。

undefinedmain

  1. t := time.Now()
  2. genesisBlock := Block{}
  3. genesisBlock = Block{0, t.String(), 0, calculateHash(genesisBlock), ""}
  4. Blockchain = append(Blockchain, genesisBlock)
  5. // LibP2P 使用 golog记录消息日志. 我们可以控制日志的详细程度
  6. golog.SetAllLoggers(gologging.INFO) // 变更为 DEBUG 可以获得额外的信息
  7. // 从命令行解析出选项
  8. listenF := flag.Int("l", 0, "等待到来的连接")
  9. target := flag.String("d", "", "连接的目标节点")
  10. secio := flag.Bool("secio", false, "启用 secio")
  11. seed := flag.Int64("seed", 0, "设定产生ID的随机种子")
  12. flag.Parse()
  13. if *listenF == 0 {
  14. log.Fatal("请提供绑定的端口 -l")
  15. }
  16. // 创建一个主机
  17. ha, err := makeBasicHost(*listenF, *secio, *seed)
  18. if err != nil {
  19. log.Fatal(err)
  20. }
  21. //target为我们指定要连接的另一主机的地址,这意味着如果使用此标志,我们将充当主机的对等方
  22. if *target == "" {//只充当主机
  23. log.Println("listening for connections")
  24. // 将流处理器设定在主机: A。 /p2p/1.0.0 是一个用户自定义的协议
  25. ha.SetStreamHandler("/p2p/1.0.0", handleStream)
  26. select {} // 一直挂起
  27. } else {//作为主机的对等方,连接到target主机
  28. ha.SetStreamHandler("/p2p/1.0.0", handleStream)//对等端仍然要开启监听,接受其它节点的连接
  29. //**下面的代码,是作为对等端B主机所做的工作:连接到A主机节点,执行读和写,实现两个节点之间的网络通信**
  30. // 下面的代码从目标节点的mutiaddress中展开获得节点的ID
  31. ipfsaddr, err := ma.NewMultiaddr(*target)
  32. if err != nil {
  33. log.Fatalln(err)
  34. }
  35. pid, err := ipfsaddr.ValueForProtocol(ma.P_IPFS)
  36. if err != nil {
  37. log.Fatalln(err)
  38. }
  39. peerid, err := peer.IDB58Decode(pid)//将58位的string转化为id
  40. if err != nil {
  41. log.Fatalln(err)
  42. }
  43. // 从目标主机解封装 /ipfs/<peerID>
  44. // /ip4/<a.b.c.d>/ipfs/<peer> 变成 /ip4/<a.b.c.d>
  45. targetPeerAddr, _ := ma.NewMultiaddr(
  46. fmt.Sprintf("/ipfs/%s", peer.IDB58Encode(peerid)))
  47. targetAddr := ipfsaddr.Decapsulate(targetPeerAddr)
  48. // 我们有了一个节点ID和targetAddr,将它添加到peerstore
  49. // 这样LibP2就知道如何联系到它
  50. ha.Peerstore().AddAddr(peerid, targetAddr, pstore.PermanentAddrTTL)
  51. log.Println("正在打开B到A的网络流...")
  52. // 创建一个新的从主机B到A的流
  53. // 它应当被主机A通过我们上面设定的处理器进行处理
  54. // 因为我们使用相同的 /p2p/1.0.0 协议
  55. s, err := ha.NewStream(context.Background(), peerid, "/p2p/1.0.0")
  56. if err != nil {
  57. log.Fatalln(err)
  58. }
  59. // 创建一个新的缓冲流,这样读和写将不会阻塞
  60. rw := bufio.NewReadWriter(bufio.NewReader(s), bufio.NewWriter(s))
  61. // 创建一个线程读和写数据
  62. go writeData(rw)
  63. go readData(rw)
  64. select {} // 永远挂起
  65. }

我们设置所有的命令行标志。

  • secio 我们以前覆盖并允许安全流。我们将确保在运行程序时始终使用这个标志。
  • target 让我们指定要连接的另一主机的地址,这意味着如果使用此标志,我们将充当主机的对等方。
  • listenF打开了我们希望允许连接的端口,这意味着我们作为主机。我们既可以是主机(接收连接),也可以是对等体(连接到其他主机)。这就是这个系统真正成为P2P的原因!
  • seed 是可选的随机播种器,用来构造我们的地址,其他节点可以用来连接我们。

    然后,我们创建了一个新的主机,我们之前创建了makeBasicHost函数。如果我们只充当主机(即,我们没有连接到其他主机),我们指定如果*target==“”,则使用我们之前创建的setStreamHandle函数启动处理程序,这是我们的监听器代码的结束。

    如果我们确实想要连接到另一个主机,我们移动到else部分。我们再次设置我们的处理程序,因为我们作为一个主机和一个连接的对等体。

    接下来的几行解构了我们提供给目标的字符串,这样我们就可以找到我们想要连接的主机。这也被称为解封装。

    我们最终得到要连接的主机的peerID和目标地址targetAddr,并将该记录添加到“存储”中,以便跟踪我们与谁连接。

    然后,我们使用ha.NewStream创建想要连接到的对等体连接。我们希望能够接收和发送数据流(我们的区块链),因此就像我们在处理程序中做的那样,我们创建一个ReadWriter,并为readDatawriteData创建单独的Go例程。最后我们通过空的select来阻塞程序,这样程序不会停止。