前言

在设计公链时,节点与节点之间建立连接需要 P2P 协议,从而实现数据的同步,于此同时上层应用还需要封装一些通信逻辑,比如节点之间的区块同步、交易数据同步等。
本篇文章将对 P2P 网络发展进行简单概述,同时将从源码角度对以太坊中的节点发现机制、分布式哈希表、节点查找、节点新增、节点移除等进行简单介绍,并对其 P2P 网络安全性设计进行简要分析。

基础知识

P2P 网络

P2P 网络不同于传统的 CS 结构,在 P2P 网络中每个节点既可以是客户端也可以是服务端,节点之间的通信协议一般直接通过 Socket 实现。
P2P 技术发展至今经历了以下四个发展阶段:

_一 . 集中式:_是 P2P 网络模式中最简单的路由方式,即存在一个中心节点,它保存了其他所有节点的索引信息,索引信息一般包括节点 IP、端口、节点资源等,集中式路由的优点是结构简单、实现容易,但缺点也很明显,由于中心节点需要存储所有节点的路由信息,当节点规模不断扩展时,就很容易出现性能瓶颈,而且也存在单节点故障问题

_二 . 分布式:_是指移除了中心节点,在 P2P 节点之间建立随机网络,在新加入节点与 P2P 网络中的某个节点之间随机建立连接通道,从而形成一个随机拓扑结构,新节点加入该网络时随机选择一个已经存在的节点并建立邻居关系,在节点与邻居节点建立连接后,还需要进行全网广播,让整个网络知道该节点的存在
具体的广播步骤是:该节点首先向邻居节点广播,邻居节点收到广播消息后,在继续向自己的邻居节点广播,以此类推,这种广播方式也被称之为 " 泛洪机制 ",而泛洪机制的问题在于可控性差,其主要包括两个较大的问题:一个是容易形成泛洪循环,比如节点 A 给发出的消息结果节点 B 到节点 C,节点 C 再广播到节点 A,这就形成了一个循环,另一个问题是消息响应分包,比如节点 A 想要请求的资源被很多节点所拥有,那么在短时间内,会出现大量节点同时向 A 节点发送响应消息,这就很可能让节点 A 瞬间奔溃
而消除泛洪循环的方法可以借鉴 IP 网络路由协议中有关泛洪广播的控制,一种方法是对每一个查询消息设置 TTL 值,泛洪消息每被转发一次,TTL 值减 1,当节点接受的 TTL 为 0 时,不再转发消息,这样可以避免查询消息在网络中产生死循环,还可以为泛洪消息设置唯一的标志,对接收到的重复消息不再进行转发从而规避死循环,解决响应风暴的方法可以在数据链路层进行网络分段,减少消息跨段广播

_三 . 混合式:_混合式其实就是混合集中式和分布式结构,网络中存在很多超级节点组成的分布式网络,而每个超级节点有多个普通节点与它组成局部集中网络,一个新的普通节点加入是可以先选择一个超级节点进行通信,该超级节点再推送其他超级节点列表给新加入节点,加入节点根据列表中的超级节点状态决定选择那个具体的超级节点作为父节点,这种结构的泛洪广播只是发生在超级节点之间,因此可以避免大规模泛洪问题,在实际应用中,混合式结构是相对灵活且比较有效的组网架构,实现难度也相对较小,因此目前较多系统基于混合式结构进行开发实现

_四 . 结构化:_结构化 P2P 网络是一种分布式网络结构,与上面所讲的分布式结构不同,分布式网络就是一个随机网络,而结构化网络则将所有节点按照某种结构进行有序组织,比如形成一个环状网络或树状网络,结构化网络在具体实现上普遍基于分布式哈希表 (Distributed Hash Table,DHI) 算法,具体的实现方案有 Chord、Pasty、CAN、Kademlia 等算法

四种网络结构对比如下:
知道创宇区块链安全实验室 | 深入理解以太坊 P2P 网络设计

节点发现

节点发现是任何节点接入 P2P 网络的第一步,节点发现可以分为两种:-初始节点发现:_指节点是一个全新的、从未运行的节点,该节点没有网络中的其他节点的任何数据,此时节点发现只能依靠节点中的硬编码的种子节点获得 P2P 网络的信息-已知节点发现:_节点之前运行过,节点数据库中保存着网络中的其他节点信息,此时节点发现可以依靠节点数据库汇总的节点获取 P2P 网络的信息,从而构建自己的网络拓扑

种子节点

在 P2P 网络中,初始节点在启动时会通过一些长期稳定运行的节点快速发现网络中的其他节点,这些节点被称为 " 种子节点 "(一般代码中会硬编码种子节点信息),一般情况下种子节点可以分为两种:_DNS-Seed:_也被称之为 "DNS 种子节点 ",DNS 是互联网提供的一种域名查询服务,它将域名和 IP 地址相互映射保存在一个分布式的数据库中,当我们访问 DNS 服务器时,给它提供一个域名,DNS 服务器会将该域名对应的 IP 地址返回_IP-Seed:_即将种子节点的 IP 地址硬编码到代码中去,硬编码的这些节点的地址被称为种子节点

KDA 算法

Kademlia 是一种分布式哈希表 (DHT) 技术,与其他 DHT 技术相比,KDA 算法使用异或算法计算节点之间的距离,进而建立了全新的 DHT 拓扑结构,这种算法可以极大地提高路由的查询速度。

HashTable

哈希表是用于存储键值对的一种容器,键值对有被称为 Key/Value 对,哈希表数据结构中包含 N 个 bucket(桶),对于某个具体的哈希表,N (桶的数量) 通常是固定不变的,于是可以对每个桶编号,0~N-1,桶是用来存储键值对的,可以简单的将其理解为一个动态数组,里面存放多个键值对。
下图展示了哈希表的查找原理,我们可以方便快速地通过 Key 来获取 value,当使用某个 key 进行查找时,先用某个哈希函数计算这个 key 的哈希值,得到的哈希值通常是一个整数,之后使用哈希值对 N(桶数) 进行取模运算 (除法求余数),就可以算出对应的桶编号
知道创宇区块链安全实验室 | 深入理解以太坊 P2P 网络设计

HashCollision

说到哈希表不得不提一下哈希表碰撞,当两个不同的 Key 进行哈希计算得到相同的哈希值时,就是所谓的哈希函数碰撞,一旦出现这种情况,这两个 key 对应的两个键值对就会被存在在同一个桶中 (bucket) 中,另一中散列碰撞是虽然计算出来的哈希值不同,但经过取模运算之后得到相同的桶编号,这时候也会将两个键值对存储在一个桶中,哈希碰撞原理如下图所示:
如果某个哈希表在存储数据时完全没有碰撞,那么每个桶里都只有 0 个或 1 个键值对,这样查找起来就非常快,反之,如果某个哈希表在存储数据时出现严重碰撞,那么就会导致某些桶里存储了很多键值对,那么在查找 key 的时候需要在这个桶里面逐一对比 key 是否相同,查找效率会变得很低~
知道创宇区块链安全实验室 | 深入理解以太坊 P2P 网络设计

分布式哈希表

分布式哈希表在概念上类似于传统的哈希表,差异在于传统的哈希表主要用于单机上的某个软件中,分布式哈希表主要用于分布式系统 (此时,分布式系统的节点可以通俗的理解为 hash 表中的 bucket),分布式哈希表主要用于存储大量 (甚至海量) 的数据,分布式哈希表的原理如下图所示:
知道创宇区块链安全实验室 | 深入理解以太坊 P2P 网络设计

源码分析

以太坊底层的 P2PServer 大致可以分为以下三层:-顶层:_以太坊中各个协议的具体实现-中层:以太坊中的 p2p 通信链路层,负责启动监听、处理新加入连接或维护连接-底层:_以太坊中的数据通信网络 IO 层,主要负责路由表的管理以及数据库的读写操作
知道创宇区块链安全实验室 | 深入理解以太坊 P2P 网络设计

表的结构

表数据结构如下所示:

// filedir:go-ethereum-1.10.2\p2p\discover\table.go L40
const (
    alpha           = 3  // Kademlia concurrency factor
    bucketSize      = 16 // Kademlia bucket size
    maxReplacements = 10 // Size of per-bucket replacement list
    // We keep buckets for the upper 1/15 of distances because
    // it's very unlikely we'll ever encounter a node that's closer.
    hashBits          = len(common.Hash{}) * 8
    nBuckets          = hashBits / 15       // Number of buckets
    bucketMinDistance = hashBits - nBuckets // Log distance of closest bucket
    // IP address limits.
    bucketIPLimit, bucketSubnet = 2, 24 // at most 2 addresses from the same /24
    tableIPLimit, tableSubnet   = 10, 24
    refreshInterval    = 30 * time.Minute
    revalidateInterval = 10 * time.Second
    copyNodesInterval  = 30 * time.Second
    seedMinTableTime   = 5 * time.Minute
    seedCount          = 30
    seedMaxAge         = 5 * 24 * time.Hour
)

type Table struct {
    mutex   sync.Mutex        // protects buckets, bucket content, nursery, rand
    buckets [nBuckets]*bucket // index of known nodes by distance
    nursery []*node           // bootstrap nodes
    rand    *mrand.Rand       // source of randomness, periodically reseeded
    ips     netutil.DistinctNetSet
    log        log.Logger
    db         *enode.DB // database of known nodes
    net        transport
    refreshReq chan chan struct{}
    initDone   chan struct{}
    closeReq   chan struct{}
    closed     chan struct{}
    nodeAddedHook func(*node) // for testing
}

type bucket struct {
    entries      []*node // live entries, sorted by time of last contact
    replacements []*node // recently seen nodes to be used if revalidation fails
    ips          netutil.DistinctNetSet
}

关键的几个变量:

  • buckets:K 桶,每个 K 桶包含节点 (依据最近活跃情况进行降序排列),用于按距离列出已知节点索引

  • nursery:种子节点,一个节点启动的时候最多能够链接 35 个种子节点,其中有五个是以太坊官方指定的,另外 30 个从数据库里面提取

  • db:用于存储 P2P 节点的数据库 (以太坊中有两个,另一个用于存储链上数据)

  • refreshReq:刷新 K-桶事件的管道

表的创建

newTable 函数用于创建新的表:

// filedir:go-ethereum-1.10.2\p2p\discover\table.go L102
func newTable(t transport, db *enode.DB, bootnodes []*enode.Node, log log.Logger) (*Table, error) {
    tab := &Table{
        net:        t,
        db:         db,
        refreshReq: make(chan chan struct{}),
        initDone:   make(chan struct{}),
        closeReq:   make(chan struct{}),
        closed:     make(chan struct{}),
        rand:       mrand.New(mrand.NewSource(0)),
        ips:        netutil.DistinctNetSet{Subnet: tableSubnet, Limit: tableIPLimit},
        log:        log,
    }
    if err := tab.setFallbackNodes(bootnodes); err != nil {
        return nil, err
    }
    for i := range tab.buckets {
        tab.buckets[i] = &bucket{
            ips: netutil.DistinctNetSet{Subnet: bucketSubnet, Limit: bucketIPLimit},
        }
    }
    tab.seedRand()
    tab.loadSeedNodes()
    return tab, nil
}

在上述代码中首先使用传入的参数初始化了一个 Table 的对象 tab,调用 setFallbackNodes 函数设置初始链接节点 (即获得 5 个 nursey 节点,后面如果 table 为空或者数据库中没有节点信息时这些节点将被用于去链接网络),之后通过一个 for 循环结合函数 ValidateComplete 来验证节点是否有效
之后初始化 K 桶:

    for i := range tab.buckets {
        tab.buckets[i] = &bucket{
            ips: netutil.DistinctNetSet{Subnet: bucketSubnet, Limit: bucketIPLimit},
        }
    }

之后从 table.buckets 中随机取 30 个节点加载种子节点到相应的 bucket:

    tab.seedRand()
    tab.loadSeedNodes()
    return tab, nil

loadSeedNodes 函数的具体实现如下所示 (这里的 seedCount 为 table.go 中最上方定义的全局变量,值为 30):

// filedir: go-ethereum-1.10.2\p2p\discover\table.go    L302
func (tab *Table) loadSeedNodes() {
    seeds := wrapNodes(tab.db.QuerySeeds(seedCount, seedMaxAge))
    seeds = append(seeds, tab.nursery...)
    for i := range seeds {
        seed := seeds[i]
        age := log.Lazy{Fn: func() interface{} { return time.Since(tab.db.LastPongReceived(seed.ID(), seed.IP())) }}
        tab.log.Trace("Found seed node in database", "id", seed.ID(), "addr", seed.addr(), "age", age)
        tab.addSeenNode(seed)
    }
}

这里的 addSeenNode 即用于添加节点到 bucket,在这里会检查要添加的节点是否已经存在以及 bucket 是否已满,如果已满则调用 tab.addReplacement(b, n) 将节点添加到 replacement 列表中去,之后添加 IP,之后更新 bucket:
// filedir: go-ethereum-1.10.2\p2p\discover\table.go L458
func (tab *Table) addSeenNode(n *node) {
if n.ID() == tab.self().ID() {
return
}
tab.mutex.Lock()
defer tab.mutex.Unlock()
b := tab.bucket(n.ID())
if contains(b.entries, n.ID()) {
// Already in bucket, don't add.
return
}
if len(b.entries) >= bucketSize {
// Bucket full, maybe add as replacement.
tab.addReplacement(b, n)
return
}
if !tab.addIP(b, n.IP()) {
// Can't add: IP limit reached.
return
}
// Add to end of bucket:
b.entries = append(b.entries, n)
b.replacements = deleteNode(b.replacements, n)
n.addedAt = time.Now()
if tab.nodeAddedHook != nil {
tab.nodeAddedHook(n)
}
}

事件监听

loop 函数是 table.go 中的主循环,在函数开头出定义了后续会使用到的局部变量,之后通过 deRefresh 进行刷新桶操作,在这里的 loop 循环会每隔 30 分钟自动刷新一次 K 桶,每隔 10 秒钟验证 K 桶节点是否可以 ping 通,每 30 秒将 K 桶中存在超过 5 分钟的节点存储本地数据库,视作稳定节点:
// filedir: go-ethereum-1.10.2\p2p\discover\table.go L55
refreshInterval = 30 * time.Minute
revalidateInterval = 10 * time.Second
copyNodesInterval = 30 * time.Second
// filedir: go-ethereum-1.10.2\p2p\discover\table.go L218
// loop schedules runs of doRefresh, doRevalidate and copyLiveNodes.
func (tab *Table) loop() {
var (
revalidate = time.NewTimer(tab.nextRevalidateTime())
refresh = time.NewTicker(refreshInterval)
copyNodes = time.NewTicker(copyNodesInterval)
refreshDone = make(chan struct{}) // where doRefresh reports completion
revalidateDone chan struct{} // where doRevalidate reports completion
waiting = []chan struct{}{tab.initDone} // holds waiting callers while doRefresh runs
)
defer refresh.Stop()
defer revalidate.Stop()
defer copyNodes.Stop()
// Start initial refresh.
go tab.doRefresh(refreshDone)
loop:
for {
select {
case <-refresh.C: // 定时刷新 k 桶事件 ,refreshInterval=30 min
tab.seedRand()
if refreshDone == nil {
refreshDone = make(chan struct{})
go tab.doRefresh(refreshDone)
}
case req := <-tab.refreshReq: // 刷新 k 桶的请求事件
waiting = append(waiting, req)
if refreshDone == nil {
refreshDone = make(chan struct{})
go tab.doRefresh(refreshDone)
}
case <-refreshDone: // doRefresh 完成
for_, ch := range waiting {
close(ch)
}
waiting, refreshDone = nil, nil
case <-revalidate.C: // 验证 k 桶节点有效性,10 second
revalidateDone = make(chan struct{})
go tab.doRevalidate(revalidateDone)
case <-revalidateDone: // 验证 K 桶节点有效性完成
revalidate.Reset(tab.nextRevalidateTime())
revalidateDone = nil
case <-copyNodes.C: // 定时 (30 秒) 将节点存入数据库,如果某个节点在 k 桶中存在超过 5 分钟,则认为它是一个稳定的节点
go tab.copyLiveNodes()
case <-tab.closeReq:
break loop
}
}
if refreshDone != nil {
<-refreshDone
}
for_, ch := range waiting {
close(ch)
}
if revalidateDone != nil {
<-revalidateDone
}
close(tab.closed)
}

节点查找

getNode 函数用于根据 ID 来查找节点,如果不存在则返回 nil:
// getNode returns the node with the given ID or nil if it isn't in the table.
func (tab *Table) getNode(id enode.ID) *enode.Node {
tab.mutex.Lock()
defer tab.mutex.Unlock()
b := tab.bucket(id)
for_, e := range b.entries {
if e.ID() == id {
return unwrapNode(e)
}
}
return nil
}

节点发现

以太坊分布式网络采用了结构化网络模型,其实现方案使用 Kademlia 协议,下面我们对节点发现进行简单介绍,在以太坊中 k 值是 16,也就是说每个 k 桶包含 16 个节点,一共 256 个 k 桶,K 桶中记录节点的 NodeId,Distance,Endpoint,IP 等信息,并按照与 Target 节点的距离排序,节点查找由 doRefresh() 实现:

// filedir:go-ethereum-1.10.2\p2p\discover\table.go L278

func (tab *Table) doRefresh(done chan struct{}) {
    defer close(done)

    tab.loadSeedNodes()
    // Run self lookup to discover new neighbor nodes.
    tab.net.lookupSelf()

    for i := 0; i < 3; i++ {
        tab.net.lookupRandom()
    }
}

从上述代码中可以看到这里首先调用 tab.loadSeedNodes() 从数据库中加载节点并将其插入到表中去:

// filedir: go-ethereum-1.10.2\p2p\discover\table.go L302
func (tab *Table) loadSeedNodes() {
    seeds := wrapNodes(tab.db.QuerySeeds(seedCount, seedMaxAge))
    seeds = append(seeds, tab.nursery...)
    for i := range seeds {
        seed := seeds[i]
        age := log.Lazy{Fn: func() interface{} { return time.Since(tab.db.LastPongReceived(seed.ID(), seed.IP())) }}
        tab.log.Trace("Found seed node in database", "id", seed.ID(), "addr", seed.addr(), "age", age)
        tab.addSeenNode(seed)
    }
}
// filedir:go-ethereum-1.10.2\p2p\enode\nodedb.go L440
// QuerySeeds retrieves random nodes to be used as potential seed nodes
// for bootstrapping.
func (db *DB) QuerySeeds(n int, maxAge time.Duration) []*Node {
    var (
        now   = time.Now()
        nodes = make([]*Node, 0, n)
        it    = db.lvl.NewIterator(nil, nil)
        id    ID
    )
    defer it.Release()
seek:
    for seeks := 0; len(nodes) < n && seeks < n*5; seeks++ {

        ctr := id[0]
        rand.Read(id[:])
        id[0] = ctr + id[0]%16
        it.Seek(nodeKey(id))
        n := nextNode(it)
        if n == nil {
            id[0] = 0
            continue seek // iterator exhausted
        }
        if now.Sub(db.LastPongReceived(n.ID(), n.IP())) > maxAge {
            continue seek
        }
        for i := range nodes {
            if nodes[i].ID() == n.ID() {
                continue seek // duplicate
            }
        }
        nodes = append(nodes, n)
    }
    return nodes
}

之后通过 lookupSelf 来发现新的节点,这里会优先使用当前节点的 ID 来运行 newLookup 发现邻居节点:

// filedir: go-ethereum-1.10.2\p2p\discover\v5_udp.go L281
// lookupSelf looks up our own node ID.
// This is needed to satisfy the transport interface.
func (t *UDPv5) lookupSelf() []*enode.Node {
    return t.newLookup(t.closeCtx, t.Self().ID()).run()
}
// filedir: go-ethereum-1.10.2\p2p\discover\v5_udp.go L292
func (t *UDPv5) newLookup(ctx context.Context, target enode.ID) *lookup {
    return newLookup(ctx, t.tab, target, func(n *node) ([]*node, error) {
        return t.lookupWorker(n, target)
    })
}
func newLookup(ctx context.Context, tab *Table, target enode.ID, q queryFunc) *lookup {
    it := &lookup{
        tab:       tab,
        queryfunc: q,
        asked:     make(map[enode.ID]bool),
        seen:      make(map[enode.ID]bool),
        result:    nodesByDistance{target: target},
        replyCh:   make(chan []*node, alpha),
        cancelCh:  ctx.Done(),
        queries:   -1,
    }
    // Don't query further if we hit ourself.
    // Unlikely to happen often in practice.
    it.asked[tab.self().ID()] = true
    return it
}

最后随机一个 target,进行 lookup:
// filedir: go-ethereum-1.10.2\p2p\discover\v5_udp.go L275
func (t *UDPv5) lookupRandom() []*enode.Node {
return t.newRandomLookup(t.closeCtx).run()
}
// filedir: go-ethereum-1.10.2\p2p\discover\v5_udp.go L287
func (t UDPv5) newRandomLookup(ctx context.Context) *lookup {
var target enode.ID
crand.Read(target[:])
return t.newLookup(ctx, target)
}
func (t *UDPv5) newLookup(ctx context.Context, target enode.ID) *lookup {
return newLookup(ctx\, t.tab\, target\, func(n *node) ([]*node\, error) {
return t.lookupWorker(n, target)
})
}
// lookupWorker performs FINDNODE calls against a single node during lookup.
func (t *UDPv5) lookupWorker(destNode *node\, target enode.ID) ([]*node\, error) {
var (
dists = lookupDistances(target, destNode.ID())
nodes = nodesByDistance{target: target}
err error
)
var r []
enode.Node
r, err = t.findnode(unwrapNode(destNode), dists)
if err == errClosed {
return nil, err
}
for_, n := range r {
if n.ID() != t.Self().ID() {
nodes.push(wrapNode(n), findnodeResultLimit)
}
}
return nodes.entries, err
}
// filedir:go-ethereum-1.10.2\p2p\discover\v5_udp.go L362
// findnode calls FINDNODE on a node and waits for responses.
func (t *UDPv5) findnode(n *enode.Node\, distances []uint) ([]*enode.Node\, error) {
resp := t.call(n, v5wire.NodesMsg, &v5wire.Findnode{Distances: distances})
return t.waitForNodes(resp, distances)
}

func (t *UDPv5) call(node *enode.Node, responseType byte, packet v5wire.Packet) *callV5 {
c := &callV5{
node: node,
packet: packet,
responseType: responseType,
reqid: make([]byte, 8),
ch: make(chan v5wire.Packet, 1),
err: make(chan error, 1),
}
// Assign request ID.
crand.Read(c.reqid)
packet.SetRequestID(c.reqid)
// Send call to dispatch.
select {
case t.callCh <- c:
case <-t.closeCtx.Done():
c.err <- errClosed
}
return c
}

服务结构

server 端的的数据结构如下所示:

// filedir:go-ethereum-1.10.2\p2p\server.go  L160
// Server manages all peer connections.
type Server struct {
    // Config fields may not be modified while the server is running.
    Config
    // Hooks for testing. These are useful because we can inhibit
    // the whole protocol stack.
    newTransport func(net.Conn, *ecdsa.PublicKey) transport
    newPeerHook  func(*Peer)
    listenFunc   func(network, addr string) (net.Listener, error)
    lock    sync.Mutex // protects running
    running bool
    listener     net.Listener
    ourHandshake *protoHandshake
    loopWG       sync.WaitGroup // loop, listenLoop
    peerFeed     event.Feed
    log          log.Logger
    nodedb    *enode.DB
    localnode *enode.LocalNode
    ntab      *discover.UDPv4
    DiscV5    *discover.UDPv5
    discmix   *enode.FairMix
    dialsched *dialScheduler
    // Channels into the run loop.
    quit                    chan struct{}
    addtrusted              chan *enode.Node
    removetrusted           chan *enode.Node
    peerOp                  chan peerOpFunc
    peerOpDone              chan struct{}
    delpeer                 chan peerDrop
    checkpointPostHandshake chan *conn
    checkpointAddPeer       chan *conn
    // State of run loop and listenLoop.
    inboundHistory expHeap
}

Server 配置 (本地节点秘钥、拨号比率、节点最大链接数、拨号比率、事件记录等):
// Config holds Server options.
type Config struct {

PrivateKey *ecdsa.PrivateKey `toml:"-"`

MaxPeers int

MaxPendingPeers int `toml:"\,omitempty"`

DialRatio int `toml:"\,omitempty"`

NoDiscovery bool

DiscoveryV5 bool `toml:"\,omitempty"`

Name string `toml:"-"`

BootstrapNodes []*enode.Node

BootstrapNodesV5 []*enode.Node `toml:"\,omitempty"`

StaticNodes []*enode.Node

TrustedNodes []*enode.Node

NetRestrict *netutil.Netlist `toml:"\,omitempty"`

NodeDatabase string `toml:"\,omitempty"`

Protocols []Protocol `toml:"-"`

ListenAddr string

NAT nat.Interface `toml:"\,omitempty"`

Dialer NodeDialer `toml:"-"`

NoDial bool `toml:"\,omitempty"`

EnableMsgEvents bool

Logger log.Logger `toml:"\,omitempty"`
clock mclock.Clock
}

新增节点

AddPeer 函数用于新增一个给定的节点,其实现代码如下所示:

// filedir:go-ethereum-1.10.2\p2p\server.go  L318

func (srv *Server) AddPeer(node *enode.Node) {
    srv.dialsched.addStatic(node)
}
// filedir:go-ethereum-1.10.2\p2p\dial.go  L190
// addStatic adds a static dial candidate.
func (d *dialScheduler) addStatic(n *enode.Node) {
    select {
    case d.addStaticCh <- n:
    case <-d.ctx.Done():
    }
}

AddTrustedPeer 函数用于新增一个可信任节点:
// AddTrustedPeer adds the given node to a reserved whitelist which allows the
// node to always connect, even if the slot are full.
func (srv *Server) AddTrustedPeer(node *enode.Node) {
select {
case srv.addtrusted <- node:
case <-srv.quit:
}
}

移除节点

RemovePeer 函数用于移除节点并断开与节点之间的连接:

// filedir:go-ethereum-1.10.2\p2p\server.go  L325
func (srv *Server) RemovePeer(node *enode.Node) {
    var (
        ch  chan *PeerEvent
        sub event.Subscription
    )
    // Disconnect the peer on the main loop.
    srv.doPeerOp(func(peers map[enode.ID]*Peer) {
        srv.dialsched.removeStatic(node)
        if peer := peers[node.ID()]; peer != nil {
            ch = make(chan *PeerEvent, 1)
            sub = srv.peerFeed.Subscribe(ch)
            peer.Disconnect(DiscRequested)
        }
    })
    // Wait for the peer connection to end.
    if ch != nil {
        defer sub.Unsubscribe()
        for ev := range ch {
            if ev.Peer == node.ID() && ev.Type == PeerEventTypeDrop {
                return
            }
        }
    }
}
// filedir:go-ethereum-1.10.2\p2p\dial.go  L198
// removeStatic removes a static dial candidate.
func (d *dialScheduler) removeStatic(n *enode.Node) {
    select {
    case d.remStaticCh <- n:
    case <-d.ctx.Done():
    }
}

RemoveTrustedPeer 函数用于移除一个可信任节点:
// RemoveTrustedPeer removes the given node from the trusted peer set.
func (srv *Server) RemoveTrustedPeer(node *enode.Node) {
select {
case srv.removetrusted <- node:
case <-srv.quit:
}
}

终止服务

Stop 函数用于终止节点运行,具体代码如下所示:

func (srv *Server) Stop() {
    srv.lock.Lock()
    if !srv.running {
        srv.lock.Unlock()
        return
    }
    srv.running = false
    if srv.listener != nil {
        // this unblocks listener Accept
        srv.listener.Close()
    }
    close(srv.quit)
    srv.lock.Unlock()
    srv.loopWG.Wait()
}

服务启动

位于 go-ethereum-1.10.2\p2p\server.go 中的 start 函数用于启动一个 P2P 节点:

// filedir:go-ethereum-1.10.2\p2p\server.go  L433
func (srv *Server) Start() (err error) {
    srv.lock.Lock()
    defer srv.lock.Unlock()
    if srv.running {
        return errors.New("server already running")
    }
    srv.running = true
    srv.log = srv.Config.Logger
    if srv.log == nil {
        srv.log = log.Root()
    }
    if srv.clock == nil {
        srv.clock = mclock.System{}
    }
    if srv.NoDial && srv.ListenAddr == "" {
        srv.log.Warn("P2P server will be useless, neither dialing nor listening")
    }
    // static fields
    if srv.PrivateKey == nil {
        return errors.New("Server.PrivateKey must be set to a non-nil key")
    }
    if srv.newTransport == nil {
        srv.newTransport = newRLPX
    }
    if srv.listenFunc == nil {
        srv.listenFunc = net.Listen
    }
    srv.quit = make(chan struct{})
    srv.delpeer = make(chan peerDrop)
    srv.checkpointPostHandshake = make(chan *conn)
    srv.checkpointAddPeer = make(chan *conn)
    srv.addtrusted = make(chan *enode.Node)
    srv.removetrusted = make(chan *enode.Node)
    srv.peerOp = make(chan peerOpFunc)
    srv.peerOpDone = make(chan struct{})
    if err := srv.setupLocalNode(); err != nil {
        return err
    }
    if srv.ListenAddr != "" {
        if err := srv.setupListening(); err != nil {
            return err
        }
    }
    if err := srv.setupDiscovery(); err != nil {
        return err
    }
    srv.setupDialScheduler()
    srv.loopWG.Add(1)
    go srv.run()
    return nil
}

在这里首先检查当前节点是否处于运行状态,如果是则直接返回并给出错误提示信息,如果不是则将 srv.running 设置为 true,之后进入服务启动流程,之后检查 log 是否开启等,之后初始化配置 P2P 服务信息:

// Start starts running the server.
// Servers can not be re-used after stopping.
func (srv *Server) Start() (err error) {
    srv.lock.Lock()
    defer srv.lock.Unlock()
    if srv.running {
        return errors.New("server already running")
    }
    srv.running = true
    srv.log = srv.Config.Logger
    if srv.log == nil {
        srv.log = log.Root()
    }
    if srv.clock == nil {
        srv.clock = mclock.System{}
    }
    if srv.NoDial && srv.ListenAddr == "" {
        srv.log.Warn("P2P server will be useless, neither dialing nor listening")
    }
    // static fields
    if srv.PrivateKey == nil {
        return errors.New("Server.PrivateKey must be set to a non-nil key")
    }
    if srv.newTransport == nil {
        srv.newTransport = newRLPX
    }
    if srv.listenFunc == nil {
        srv.listenFunc = net.Listen
    }
    srv.quit = make(chan struct{})
    srv.delpeer = make(chan peerDrop)
    srv.checkpointPostHandshake = make(chan *conn)
    srv.checkpointAddPeer = make(chan *conn)
    srv.addtrusted = make(chan *enode.Node)
    srv.removetrusted = make(chan *enode.Node)
    srv.peerOp = make(chan peerOpFunc)
    srv.peerOpDone = make(chan struct{})

之后调用 setupLocalNode 来启动一个本地节点,并建立本地监听,然后配置一个 DiscoveryV5 网络协议,生成节点路由表。
之后调用 setupDialScheduler 启动主动拨号连接过程,然后开一个协程,在其中做 peer 的维护:

    srv.setupDialScheduler()
    srv.loopWG.Add(1)
    go srv.run()
    return nil
}

setupDialScheduler 代码如下所示,这里通过 newDialScheduler 来建立连接,参数 discmix 确定了进行主动建立连接时的节点集,它是一个迭代器 ,同时将 setupConn 连接建立函数传入:

func (srv *Server) setupDialScheduler() {
    config := dialConfig{
        self:           srv.localnode.ID(),
        maxDialPeers:   srv.maxDialedConns(),
        maxActiveDials: srv.MaxPendingPeers,
        log:            srv.Logger,
        netRestrict:    srv.NetRestrict,
        dialer:         srv.Dialer,
        clock:          srv.clock,
    }
    if srv.ntab != nil {
        config.resolver = srv.ntab
    }
    if config.dialer == nil {
        config.dialer = tcpDialer{&net.Dialer{Timeout: defaultDialTimeout}}
    }
    srv.dialsched = newDialScheduler(config, srv.discmix, srv.SetupConn)
    for_, n := range srv.StaticNodes {
        srv.dialsched.addStatic(n)
    }
}

newDialScheduler 函数如下所示,在这里通过 d.readNodes(it) 从迭代器中取得节点,之后通过通道传入 d.loop(it) 中进行连接:

// filedir:go-ethereum-1.10.2\p2p\dial.go   L162
func newDialScheduler(config dialConfig, it enode.Iterator, setupFunc dialSetupFunc) *dialScheduler {
    d := &dialScheduler{
        dialConfig:  config.withDefaults(),
        setupFunc:   setupFunc,
        dialing:     make(map[enode.ID]*dialTask),
        static:      make(map[enode.ID]*dialTask),
        peers:       make(map[enode.ID]connFlag),
        doneCh:      make(chan *dialTask),
        nodesIn:     make(chan *enode.Node),
        addStaticCh: make(chan *enode.Node),
        remStaticCh: make(chan *enode.Node),
        addPeerCh:   make(chan *conn),
        remPeerCh:   make(chan *conn),
    }
    d.lastStatsLog = d.clock.Now()
    d.ctx, d.cancel = context.WithCancel(context.Background())
    d.wg.Add(2)
    go d.readNodes(it)
    go d.loop(it)
    return d
}

服务监听

在上面的服务启动过程中有一个 setupListening 函数,该函数用于监听事件,具体代码如下所示:

func (srv *Server) setupListening() error {
    // Launch the listener.
    listener, err := srv.listenFunc("tcp", srv.ListenAddr)
    if err != nil {
        return err
    }
    srv.listener = listener
    srv.ListenAddr = listener.Addr().String()
    // Update the local node record and map the TCP listening port if NAT is configured.
    if tcp, ok := listener.Addr().(*net.TCPAddr); ok {
        srv.localnode.Set(enr.TCP(tcp.Port))
        if !tcp.IP.IsLoopback() && srv.NAT != nil {
            srv.loopWG.Add(1)
            go func() {
                nat.Map(srv.NAT, srv.quit, "tcp", tcp.Port, tcp.Port, "ethereum p2p")
                srv.loopWG.Done()
            }()
        }
    }
    srv.loopWG.Add(1)
    go srv.listenLoop()
    return nil
}

在上述代码中又调用了一个 srv.listenLoop(),该函数是一个死循环的 goroutine,它会监听端口并接收外部的请求:

// listenLoop runs in its own goroutine and accepts
// inbound connections.
func (srv *Server) listenLoop() {
    srv.log.Debug("TCP listener up", "addr", srv.listener.Addr())
    // The slots channel limits accepts of new connections.
    tokens := defaultMaxPendingPeers
    if srv.MaxPendingPeers > 0 {
        tokens = srv.MaxPendingPeers
    }
    slots := make(chan struct{}, tokens)
    for i := 0; i < tokens; i++ {
        slots <- struct{}{}
    }
    // Wait for slots to be returned on exit. This ensures all connection goroutines
    // are down before listenLoop returns.
    defer srv.loopWG.Done()
    defer func() {
        for i := 0; i < cap(slots); i++ {
            <-slots
        }
    }()
    for {
        // Wait for a free slot before accepting.
        <-slots
        var (
            fd      net.Conn
            err     error
            lastLog time.Time
        )
        for {
            fd, err = srv.listener.Accept()
            if netutil.IsTemporaryError(err) {
                if time.Since(lastLog) > 1*time.Second {
                    srv.log.Debug("Temporary read error", "err", err)
                    lastLog = time.Now()
                }
                time.Sleep(time.Millisecond * 200)
                continue
            } else if err != nil {
                srv.log.Debug("Read error", "err", err)
                slots <- struct{}{}
                return
            }
            break
        }
        remoteIP := netutil.AddrIP(fd.RemoteAddr())
        if err := srv.checkInboundConn(remoteIP); err != nil {
            srv.log.Debug("Rejected inbound connection", "addr", fd.RemoteAddr(), "err", err)
            fd.Close()
            slots <- struct{}{}
            continue
        }
        if remoteIP != nil {
            var addr *net.TCPAddr
            if tcp, ok := fd.RemoteAddr().(*net.TCPAddr); ok {
                addr = tcp
            }
            fd = newMeteredConn(fd, true, addr)
            srv.log.Trace("Accepted connection", "addr", fd.RemoteAddr())
        }
        go func() {
            srv.SetupConn(fd, inboundConn, nil)
            slots <- struct{}{}
        }()
    }
}

这里的 SetupConn 主要执行执行握手协议,并尝试把链接创建为一个 peer 对象:

// SetupConn runs the handshakes and attempts to add the connection
// as a peer. It returns when the connection has been added as a peer
// or the handshakes have failed.
func (srv *Server) SetupConn(fd net.Conn, flags connFlag, dialDest *enode.Node) error {
    c := &conn{fd: fd, flags: flags, cont: make(chan error)}
    if dialDest == nil {
        c.transport = srv.newTransport(fd, nil)
    } else {
        c.transport = srv.newTransport(fd, dialDest.Pubkey())
    }
    err := srv.setupConn(c, flags, dialDest)
    if err != nil {
        c.close(err)
    }
    return err
}

在上述代码中又去调用了 srv.setupConn(c, flags, dialDest) 函数,该函数用于执行握手协议 :

func (srv *Server) setupConn(c *conn, flags connFlag, dialDest *enode.Node) error {
    // Prevent leftover pending conns from entering the handshake.
    srv.lock.Lock()
    running := srv.running
    srv.lock.Unlock()
    if !running {
        return errServerStopped
    }
    // If dialing, figure out the remote public key.
    var dialPubkey *ecdsa.PublicKey
    if dialDest != nil {    //  dest=nil 被动连接,dest!=nil 主动连接诶
        dialPubkey = new(ecdsa.PublicKey)
        if err := dialDest.Load((*enode.Secp256k1)(dialPubkey)); err != nil {
            err = errors.New("dial destination doesn't have a secp256k1 public key")
            srv.log.Trace("Setting up connection failed", "addr", c.fd.RemoteAddr(), "conn", c.flags, "err", err)
            return err
        }
    }
    // Run the RLPx handshake.
    remotePubkey, err := c.doEncHandshake(srv.PrivateKey)    // 公钥交换,确定共享秘钥 RLPx 层面的握手一来一去
    if err != nil {
        srv.log.Trace("Failed RLPx handshake", "addr", c.fd.RemoteAddr(), "conn", c.flags, "err", err)
        return err
    }
    if dialDest != nil {
        c.node = dialDest
    } else {
        c.node = nodeFromConn(remotePubkey, c.fd)
    }
    clog := srv.log.New("id", c.node.ID(), "addr", c.fd.RemoteAddr(), "conn", c.flags)
    err = srv.checkpoint(c, srv.checkpointPostHandshake)
    if err != nil {
        clog.Trace("Rejected peer", "err", err)
        return err
    }
    // Run the capability negotiation handshake.
    phs, err := c.doProtoHandshake(srv.ourHandshake)  // 进行协议层面的握手 , 也即 p2p 握手,一来一去
    if err != nil {
        clog.Trace("Failed p2p handshake", "err", err)
        return err
    }
    if id := c.node.ID(); !bytes.Equal(crypto.Keccak256(phs.ID), id[:]) {
        clog.Trace("Wrong devp2p handshake identity", "phsid", hex.EncodeToString(phs.ID))
        return DiscUnexpectedIdentity
    }
    c.caps, c.name = phs.Caps, phs.Name
    err = srv.checkpoint(c, srv.checkpointAddPeer)  // 状态校验
    if err != nil {
        clog.Trace("Rejected peer", "err", err)
        return err
    }
    return nil
}

秘钥握手通过 deEncHandshake 函数实现,在函数之中调用了 Handshake() 函数:

// filedir:go-ethereum-1.10.2\p2p\transport.go  L123
func (t *rlpxTransport) doEncHandshake(prv *ecdsa.PrivateKey) (*ecdsa.PublicKey, error) {
    t.conn.SetDeadline(time.Now().Add(handshakeTimeout))
    return t.conn.Handshake(prv)
}

Handshake 代码如下所示,在这里会根据是主动握手还是被动握手来进行执行对应的握手逻辑:

// filedir:go-ethereum-1.10.2\p2p\rlpx\rlpx.go   L253
func (c *Conn) Handshake(prv *ecdsa.PrivateKey) (*ecdsa.PublicKey, error) {
    var (
        sec Secrets
        err error
    )
    if c.dialDest != nil {   // 主动握手
        sec, err = initiatorEncHandshake(c.conn, prv, c.dialDest)  // 主动发起秘钥验证握手结束,确定共享秘钥
    } else { // 被动握手
        sec, err = receiverEncHandshake(c.conn, prv)
    }
    if err != nil {
        return nil, err
    }
    c.InitWithSecrets(sec)
    return sec.remote, err
}

主动发起握手过程过程如下,在这里会调用 makeAuthMsg 来生成 Auth 身份信息,包含签名,随机 nonce 生成的与签名对应的公钥和版本号,之后调用 sealEIP8 方法进行 rlpx 编码,之后发起加密握手,之后接收返回的 authResp 消息,并验证解密,获取对方公钥,之后生成 AES,MAC:

// filedir:go-ethereum-1.10.2\p2p\rlpx\rlpx.go  L477
func initiatorEncHandshake(conn io.ReadWriter, prv *ecdsa.PrivateKey, remote *ecdsa.PublicKey) (s Secrets, err error) {
    h := &encHandshake{initiator: true, remote: ecies.ImportECDSAPublic(remote)}
    authMsg, err := h.makeAuthMsg(prv)
    if err != nil {
        return s, err
    }
    authPacket, err := sealEIP8(authMsg, h)
    if err != nil {
        return s, err
    }
    if_, err = conn.Write(authPacket); err != nil {
        return s, err
    }
    authRespMsg := new(authRespV4)
    authRespPacket, err := readHandshakeMsg(authRespMsg, encAuthRespLen, prv, conn)
    if err != nil {
        return s, err
    }
    if err := h.handleAuthResp(authRespMsg); err != nil {
        return s, err
    }
    return h.secrets(authPacket, authRespPacket)
}

receiverEncHandshake 如下所示,和 initiatorEncHandshake 相差无几:

func receiverEncHandshake(conn io.ReadWriter, prv *ecdsa.PrivateKey) (s Secrets, err error) {
    authMsg := new(authMsgV4)
    authPacket, err := readHandshakeMsg(authMsg, encAuthMsgLen, prv, conn)
    if err != nil {
        return s, err
    }
    h := new(encHandshake)
    if err := h.handleAuthMsg(authMsg, prv); err != nil {
        return s, err
    }
    authRespMsg, err := h.makeAuthResp()
    if err != nil {
        return s, err
    }
    var authRespPacket []byte
    if authMsg.gotPlain {
        authRespPacket, err = authRespMsg.sealPlain(h)
    } else {
        authRespPacket, err = sealEIP8(authRespMsg, h)
    }
    if err != nil {
        return s, err
    }
    if_, err = conn.Write(authRespPacket); err != nil {
        return s, err
    }
    return h.secrets(authPacket, authRespPacket)
}

之后通过 doProtoHandshake 来完成协议握手操作,在这里调用 send 发送一次握手操作,之后通过 readProtocolHandshake 来读取返回信息,之后进行检查:
func (t *rlpxTransport) doProtoHandshake(our *protoHandshake) (their *protoHandshake, err error) {

werr := make(chan error, 1)
go func() { werr <- Send(t, handshakeMsg, our) }()
if their, err = readProtocolHandshake(t); err != nil {
<-werr // make sure the write terminates too
return nil, err
}
if err := <-werr; err != nil {
return nil, fmt.Errorf("write error: %v", err)
}
// If the protocol version supports Snappy encoding, upgrade immediately
t.conn.SetSnappy(their.Version >= snappyProtocolVersion)
return their, nil
}

服务循环

run 函数是服务的主循环,监听服务器终止、增加信任节点、移除信任节点、增加检查节点等:
func (srv Server) run() {
srv.log.Info("Started P2P networking", "self", srv.localnode.Node().URLv4())
defer srv.loopWG.Done()
defer srv.nodedb.Close()
defer srv.discmix.Close()
defer srv.dialsched.stop()
var (
peers = make(map[enode.ID]
Peer)
inboundCount = 0
trusted = make(map[enode.ID]bool, len(srv.TrustedNodes))
)

for_, n := range srv.TrustedNodes {
trusted[n.ID()] = true
}
running:
for {
select {
case <-srv.quit:

break running
case n := <-srv.addtrusted:

srv.log.Trace("Adding trusted node", "node", n)
trusted[n.ID()] = true
if p, ok := peers[n.ID()]; ok {
p.rw.set(trustedConn, true)
}
case n := <-srv.removetrusted:

srv.log.Trace("Removing trusted node", "node", n)
delete(trusted, n.ID())
if p, ok := peers[n.ID()]; ok {
p.rw.set(trustedConn, false)
}
case op := <-srv.peerOp:

op(peers)
srv.peerOpDone <- struct{}{}
case c := <-srv.checkpointPostHandshake:

if trusted[c.node.ID()] {

c.flags |= trustedConn
}

c.cont <- srv.postHandshakeChecks(peers, inboundCount, c)
case c := <-srv.checkpointAddPeer:

err := srv.addPeerChecks(peers, inboundCount, c)
if err == nil {

p := srv.launchPeer(c)
peers[c.node.ID()] = p
srv.log.Debug("Adding p2p peer", "peercount", len(peers), "id", p.ID(), "conn", c.flags, "addr", p.RemoteAddr(), "name", p.Name())
srv.dialsched.peerAdded(c)
if p.Inbound() {
inboundCount++
}
}
c.cont <- err
case pd := <-srv.delpeer:
// A peer disconnected.
d := common.PrettyDuration(mclock.Now() - pd.created)
delete(peers, pd.ID())
srv.log.Debug("Removing p2p peer", "peercount", len(peers), "id", pd.ID(), "duration", d, "req", pd.requested, "err", pd.err)
srv.dialsched.peerRemoved(pd.rw)
if pd.Inbound() {
inboundCount--
}
}
}
srv.log.Trace("P2P networking is spinning down")

if srv.ntab != nil {
srv.ntab.Close()
}
if srv.DiscV5 != nil {
srv.DiscV5.Close()
}

for_, p := range peers {
p.Disconnect(DiscQuitting)
}

for len(peers) > 0 {
p := <-srv.delpeer
p.log.Trace("<-delpeer (spindown)")
delete(peers, p.ID())
}
}

节点信息

NodeInfo 用于查看节点信息,PeersInfo 用于查看连接的节点信息:
// NodeInfo gathers and returns a collection of metadata known about the host.
func (srv *Server) NodeInfo() *NodeInfo {

node := srv.Self()
info := &NodeInfo{
Name: srv.Name,
Enode: node.URLv4(),
ID: node.ID().String(),
IP: node.IP().String(),
ListenAddr: srv.ListenAddr,
Protocols: make(map[string]interface{}),
}
info.Ports.Discovery = node.UDP()
info.Ports.Listener = node.TCP()
info.ENR = node.String()

for_, proto := range srv.Protocols {
if_, ok := info.Protocols[proto.Name]; !ok {
nodeInfo := interface{}("unknown")
if query := proto.NodeInfo; query != nil {
nodeInfo = proto.NodeInfo()
}
info.Protocols[proto.Name] = nodeInfo
}
}
return info
}

func (srv *Server) PeersInfo() []*PeerInfo {
// Gather all the generic and sub-protocol specific infos
infos := make([]*PeerInfo, 0, srv.PeerCount())
for_, peer := range srv.Peers() {
if peer != nil {
infos = append(infos, peer.Info())
}
}
// Sort the result array alphabetically by node identifier
for i := 0; i < len(infos); i++ {
for j := i + 1; j < len(infos); j++ {
if infos[i].ID > infos[j].ID {
infos[i], infos[j] = infos[j], infos[i]
}
}
}
return infos
}

请求处理

下面为 peer.run 函数的代码:

func (p *Peer) run() (remoteRequested bool, err error) {
    var (
        writeStart = make(chan struct{}, 1)
        writeErr   = make(chan error, 1)
        readErr    = make(chan error, 1)
        reason     DiscReason // sent to the peer
    )
    p.wg.Add(2)
    go p.readLoop(readErr)
    go p.pingLoop()
    // Start all protocol handlers.
    writeStart <- struct{}{}
    p.startProtocols(writeStart, writeErr)
    // Wait for an error or disconnect.
loop:
    for {
        select {
        case err = <-writeErr:
            // A write finished. Allow the next write to start if
            // there was no error.
            if err != nil {
                reason = DiscNetworkError
                break loop
            }
            writeStart <- struct{}{}
        case err = <-readErr:
            if r, ok := err.(DiscReason); ok {
                remoteRequested = true
                reason = r
            } else {
                reason = DiscNetworkError
            }
            break loop
        case err = <-p.protoErr:
            reason = discReasonForError(err)
            break loop
        case err = <-p.disc:
            reason = discReasonForError(err)
            break loop
        }
    }
    close(p.closed)
    p.rw.close(reason)
    p.wg.Wait()
    return remoteRequested, err
}

从上述代码中可以看到函数的开头首先定义了一些局部变量,之后启用了两个协程,一个是 readLoop,它通过调用 ReadMsg() 读取 msg,之后又通过调用 peer.handle(msg) 来处理 msg
如果 msg 是 pingMsg,则发送一个 pong 回应,如果 msg 与下述特殊情况不相匹配则将 msg 交给 http://proto.in 通道,等待 protocolManager.handleMsg() 从通道中取出。另一个协程是 pingLoop,它主要通过调用 SendItems(p.rw, pingMsg) 来发起 ping 请求
之后调用 starProtocols() 函数让协议运行起来:

func (p *Peer) startProtocols(writeStart <-chan struct{}, writeErr chan<- error) {
    p.wg.Add(len(p.running))
    for_, proto := range p.running {
        proto := proto
        proto.closed = p.closed
        proto.wstart = writeStart
        proto.werr = writeErr
        var rw MsgReadWriter = proto
        if p.events != nil {
            rw = newMsgEventer(rw, p.events, p.ID(), proto.Name, p.Info().Network.RemoteAddress, p.Info().Network.LocalAddress)
        }
        p.log.Trace(fmt.Sprintf("Starting protocol %s/%d", proto.Name, proto.Version))
        go func() {
            defer p.wg.Done()
            err := proto.Run(p, rw)
            if err == nil {
                p.log.Trace(fmt.Sprintf("Protocol %s/%d returned", proto.Name, proto.Version))
                err = errProtocolReturned
            } else if err != io.EOF {
                p.log.Trace(fmt.Sprintf("Protocol %s/%d failed", proto.Name, proto.Version), "err", err)
            }
            p.protoErr <- err
        }()
    }
}

最后通过一个 loop 循环来处理错误或者断开连接等操作:
// Wait for an error or disconnect.
loop:
for {
select {
case err = <-writeErr:
// A write finished. Allow the next write to start if
// there was no error.
if err != nil {
reason = DiscNetworkError
break loop
}
writeStart <- struct{}{}
case err = <-readErr:
if r, ok := err.(DiscReason); ok {
remoteRequested = true
reason = r
} else {
reason = DiscNetworkError
}
break loop
case err = <-p.protoErr:
reason = discReasonForError(err)
break loop
case err = <-p.disc:
reason = discReasonForError(err)
break loop
}
}
close(p.closed)
p.rw.close(reason)
p.wg.Wait()
return remoteRequested, err

创数据库

newPersistentDB 函数用于创建一个持久化的数据库用于存储节点信息:
// filedir:go-ethereum-1.10.2\p2p\enode\nodedb.go L95
// newPersistentNodeDB creates/opens a leveldb backed persistent node database,
// also flushing its contents in case of a version mismatch.
func newPersistentDB(path string) (DB, error) {
opts := &opt.Options{OpenFilesCacheCapacity: 5}
db, err := leveldb.OpenFile(path, opts)
if_, iscorrupted := err.(
errors.ErrCorrupted); iscorrupted {
db, err = leveldb.RecoverFile(path, nil)
}
if err != nil {
return nil, err
}

currentVer := make([]byte, binary.MaxVarintLen64)
currentVer = currentVer[:binary.PutVarint(currentVer, int64(dbVersion))]
blob, err := db.Get([]byte(dbVersionKey), nil)
switch err {
case leveldb.ErrNotFound:
// Version not found (i.e. empty cache), insert it
if err := db.Put([]byte(dbVersionKey), currentVer, nil); err != nil {
db.Close()
return nil, err
}
case nil:
// Version present, flush if different
if !bytes.Equal(blob, currentVer) {
db.Close()
if err = os.RemoveAll(path); err != nil {
return nil, err
}
return newPersistentDB(path)
}
}
return &DB{lvl: db, quit: make(chan struct{})}, nil
}

节点超时

ensureExpirer 函数用于检查节点是否超时,具体实现代码如下所示:
func (db *DB) ensureExpirer() {
db.runner.Do(func() { go db.expirer() })
}

func (db *DB) expirer() {
tick := time.NewTicker(dbCleanupCycle)
defer tick.Stop()
for {
select {
case <-tick.C:
db.expireNodes()
case <-db.quit:
return
}
}
}

func (db *DB) expireNodes() {
it := db.lvl.NewIterator(util.BytesPrefix([]byte(dbNodePrefix)), nil)
defer it.Release()
if !it.Next() {
return
}
var (
threshold = time.Now().Add(-dbNodeExpiration).Unix()
youngestPong int64
atEnd = false
)
for !atEnd {
id, ip, field := splitNodeItemKey(it.Key())
if field == dbNodePong {
time,:= binary.Varint(it.Value())
if time > youngestPong {
youngestPong = time
}
if time < threshold {
// Last pong from this IP older than threshold, remove fields belonging to it.
deleteRange(db.lvl, nodeItemKey(id, ip, ""))
}
}
atEnd = !it.Next()
nextID,
:= splitNodeKey(it.Key())
if atEnd || nextID != id {

if youngestPong > 0 && youngestPong < threshold {
deleteRange(db.lvl, nodeKey(id))
}
youngestPong = 0
}
}
}

状态更新

下面是一些状态更新函数:
// LastPingReceived retrieves the time of the last ping packet received from
// a remote node.
func (db *DB) LastPingReceived(id ID, ip net.IP) time.Time {
if ip = ip.To16(); ip == nil {
return time.Time{}
}
return time.Unix(db.fetchInt64(nodeItemKey(id, ip, dbNodePing)), 0)
}
// UpdateLastPingReceived updates the last time we tried contacting a remote node.
func (db *DB) UpdateLastPingReceived(id ID, ip net.IP, instance time.Time) error {
if ip = ip.To16(); ip == nil {
return errInvalidIP
}
return db.storeInt64(nodeItemKey(id, ip, dbNodePing), instance.Unix())
}
// LastPongReceived retrieves the time of the last successful pong from remote node.
func (db *DB) LastPongReceived(id ID, ip net.IP) time.Time {
if ip = ip.To16(); ip == nil {
return time.Time{}
}
// Launch expirer
db.ensureExpirer()
return time.Unix(db.fetchInt64(nodeItemKey(id, ip, dbNodePong)), 0)
}
// UpdateLastPongReceived updates the last pong time of a node.
func (db *DB) UpdateLastPongReceived(id ID, ip net.IP, instance time.Time) error {
if ip = ip.To16(); ip == nil {
return errInvalidIP
}
return db.storeInt64(nodeItemKey(id, ip, dbNodePong), instance.Unix())
}
// FindFails retrieves the number of findnode failures since bonding.
func (db *DB) FindFails(id ID, ip net.IP) int {
if ip = ip.To16(); ip == nil {
return 0
}
return int(db.fetchInt64(nodeItemKey(id, ip, dbNodeFindFails)))
}
// UpdateFindFails updates the number of findnode failures since bonding.
func (db *DB) UpdateFindFails(id ID, ip net.IP, fails int) error {
if ip = ip.To16(); ip == nil {
return errInvalidIP
}
return db.storeInt64(nodeItemKey(id, ip, dbNodeFindFails), int64(fails))
}
// FindFailsV5 retrieves the discv5 findnode failure counter.
func (db *DB) FindFailsV5(id ID, ip net.IP) int {
if ip = ip.To16(); ip == nil {
return 0
}
return int(db.fetchInt64(v5Key(id, ip, dbNodeFindFails)))
}
// UpdateFindFailsV5 stores the discv5 findnode failure counter.
func (db *DB) UpdateFindFailsV5(id ID, ip net.IP, fails int) error {
if ip = ip.To16(); ip == nil {
return errInvalidIP
}
return db.storeInt64(v5Key(id, ip, dbNodeFindFails), int64(fails))
}

节点挑选

QuerySeeds 函数用于从数据库里面随机挑选合适种子节点:

// QuerySeeds retrieves random nodes to be used as potential seed nodes
// for bootstrapping.
func (db *DB) QuerySeeds(n int, maxAge time.Duration) []*Node {
    var (
        now   = time.Now()
        nodes = make([]*Node, 0, n)
        it    = db.lvl.NewIterator(nil, nil)
        id    ID
    )
    defer it.Release()
seek:
    for seeks := 0; len(nodes) < n && seeks < n*5; seeks++ {

        ctr := id[0]
        rand.Read(id[:])
        id[0] = ctr + id[0]%16
        it.Seek(nodeKey(id))
        n := nextNode(it)
        if n == nil {
            id[0] = 0
            continue seek // iterator exhausted
        }
        if now.Sub(db.LastPongReceived(n.ID(), n.IP())) > maxAge {
            continue seek
        }
        for i := range nodes {
            if nodes[i].ID() == n.ID() {
                continue seek // duplicate
            }
        }
        nodes = append(nodes, n)
    }
    return nodes
}

func nextNode(it iterator.Iterator) *Node {
    for end := false; !end; end = !it.Next() {
        id, rest := splitNodeKey(it.Key())
        if string(rest) != dbDiscoverRoot {
            continue
        }
        return mustDecodeNode(id[:], it.Value())
    }
    return nil
}

总结

P2P 网络是区块链分布式网络结构的基础,本篇文章详细介绍了 P2P 网络的基本原理,包括节点发现机制、分布式哈希表、节点查找、节点新增、节点移除、请求处理等,同时从源码角度对以太坊源码中 P2P 网络的实现做了较为细致的分析,探索了以太坊 P2P 网络的工作流程以以及安全设计,而公链安全体系的建设依旧是长路漫漫,有待进一步深入探索