以太坊源码研读0xa一,以太坊多元之7

dial.go阅读手记

dial.go是肩负和peer建立连接关系的地点,主借使贯彻

type dialer interface {
/*
    peers已经有的结点
 */
    newTasks(running int, peers map[discover.NodeID]*Peer, now time.Time) []task
    taskDone(task, time.Time)
    addStatic(*discover.Node)
    removeStatic(*discover.Node)
}
// dialstate schedules dials and discovery lookups.
// it get's a chance to compute new tasks on every iteration
// of the main loop in Server.run.
type dialstate struct {
    maxDynDials int
    ntab        discoverTable
    netrestrict *netutil.Netlist

    lookupRunning bool
    dialing       map[discover.NodeID]connFlag  //正在创建的连接
    lookupBuf     []*discover.Node // current discovery lookup results
    randomNodes   []*discover.Node // filled from Table
    static        map[discover.NodeID]*dialTask
    hist          *dialHistory

    start     time.Time        // time when the dialer was first used
    bootnodes []*discover.Node // default dials when there are no peers
}

在那之中最复杂的是newTasks,是树立新的接连,从test代码中得以观看,
要在钦赐的最艾哈迈达巴德接数(peers)基之上去制造新的连日

lookupBuf // current discovery lookup results
重中之重是在得了taskdone的时候拉长已经发现的?

在不当先maxDynDials的情事下,首先减去peers已有的一连,然后是static中的职分,
假使过还有多余,富余空间中最多八分之四(依据贯彻,恐怕为0)用ReadRandomNodes填充,剩下的
就用lookupBuf来填充,假若lookbuf中绝非可行的职责,那么就创办3个discoverTask,
一经还有空间,就创制2个waitExpireTask

同理可得newTasks就是在尽量的状态下,多创制职责.为Server.run
scheduleTasks时劳务,
保障其可以接连到尽大概多的节点.假诺节点数量不够的气象下,还从未dialTask就成立discoverTask,不然就创办一个WaitExpireTask

dial

下边包车型大巴table类完成了Kademlia算法,udp完结了意识节点时节点间的网络通信。发现节点后,就能够对三个节点发起连接了。devp2p中肩负在两个节点建立连接的的正是dial类。

// NodeDialer is used to connect to nodes in the network, typically by using// an underlying net.Dialer but also using net.Pipe in tests// 连接网络中的节点,通常使用底层的net.Dialer,但也在测试中使用net.Pipetype NodeDialer interface { Dial(*discover.Node) (net.Conn, error)}// TCPDialer implements the NodeDialer interface by using a net.Dialer to// create TCP connections to nodes in the network// 通过使用net.Dialer创建与网络中节点的TCP连接来实现NodeDialer接口type TCPDialer struct { *net.Dialer}// Dial creates a TCP connection to the node// 与节点创建一个tcp连接func (t TCPDialer) Dial(dest *discover.Node) (net.Conn, error) { addr := &net.TCPAddr{IP: dest.IP, Port: int} return t.Dialer.Dial("tcp", addr.String}// dialstate schedules dials and discovery lookups.// it get's a chance to compute new tasks on every iteration// of the main loop in Server.run.type dialstate struct { // 最大的动态节点连接数 maxDynDials int // discoverTable接口实现节点查询 ntab discoverTable netrestrict *netutil.Netlist lookupRunning bool // 正在连接的节点 dialing map[discover.NodeID]connFlag // 当前查询的节点结果 lookupBuf []*discover.Node // current discovery lookup results // 从k桶表随机查询的节点 randomNodes []*discover.Node // filled from Table // 静态节点 static map[discover.NodeID]*dialTask // 连接历史 hist *dialHistory // dialer首次使用的时间 start time.Time // time when the dialer was first used // 内置节点,没有找到其他节点 连接这些节点 bootnodes []*discover.Node // default dials when there are no peers}type discoverTable interface { Self() *discover.Node Close() Resolve(target discover.NodeID) *discover.Node Lookup(target discover.NodeID) []*discover.Node ReadRandomNodes([]*discover.Node) int}// the dial history remembers recent dials.type dialHistory []pastDial// pastDial is an entry in the dial history.type pastDial struct { id discover.NodeID exp time.Time}type task interface { Do}

那里有个接口定义了Do方法,同时能够看到上边有二种不相同的task,可知每一种task都会有照应的Do方法来拍卖task。

type task interface { Do}// A dialTask is generated for each node that is dialed. Its// fields cannot be accessed while the task is running.// 每个连接的节点会生成一个dialTasktype dialTask struct { flags connFlag dest *discover.Node lastResolved time.Time resolveDelay time.Duration}// discoverTask runs discovery table operations.// Only one discoverTask is active at any time.// discoverTask.Do performs a random lookup.// 发现节点任务type discoverTask struct { results []*discover.Node}// A waitExpireTask is generated if there are no other tasks// to keep the loop in Server.run ticking.// 如果没有任务在server.run中循环就会生成waitExpireTask任务type waitExpireTask struct { time.Duration}

有二种档次的task,那么怎么来生成1个task呢?

// 新建一个任务func (s *dialstate) newTasks(nRunning int, peers map[discover.NodeID]*Peer, now time.Time) []task { if s.start.IsZero() { s.start = now } var newtasks []task // 检查节点,然后设置状态,最后把节点加入newtasks队列 addDial := func(flag connFlag, n *discover.Node) bool { if err := s.checkDial; err != nil { log.Trace("Skipping dial candidate", "id", n.ID, "addr", &net.TCPAddr{IP: n.IP, Port: int}, "err", err) return false } s.dialing[n.ID] = flag newtasks = append(newtasks, &dialTask{flags: flag, dest: n}) return true } // Compute number of dynamic dials necessary at this point. // 计算所需的动态连接数 needDynDials := s.maxDynDials // 首先统计已经建立连接的节点中动态连接数 for _, p := range peers { // 动态类型 if p.rw.is(dynDialedConn) { needDynDials-- } } // 其次统计正在建立的连接的动态连接数 for _, flag := range s.dialing { if flag&dynDialedConn != 0 { needDynDials-- } } // Expire the dial history on every invocation. // 每次调用使连接记录到期 s.hist.expire // Create dials for static nodes if they are not connected. // 为所有静态节点建立连接 for id, t := range s.static { err := s.checkDial(t.dest, peers) switch err { case errNotWhitelisted, errSelf: log.Warn("Removing static dial candidate", "id", t.dest.ID, "addr", &net.TCPAddr{IP: t.dest.IP, Port: int(t.dest.TCP)}, "err", err) delete(s.static, t.dest.ID) case nil: s.dialing[id] = t.flags newtasks = append(newtasks, t) } } // If we don't have any peers whatsoever, try to dial a random bootnode. This // scenario is useful for the testnet (and private networks) where the discovery // table might be full of mostly bad peers, making it hard to find good ones. // 当前还没有任何连接,并且fallbackInterval时间内仍未创建连接 使用内置节点 if len == 0 && len(s.bootnodes) > 0 && needDynDials > 0 && now.Sub > fallbackInterval { bootnode := s.bootnodes[0] s.bootnodes = append(s.bootnodes[:0], s.bootnodes[1:]...) s.bootnodes = append(s.bootnodes, bootnode) if addDial(dynDialedConn, bootnode) { needDynDials-- } } // Use random nodes from the table for half of the necessary // dynamic dials. // 使用1/2的随机节点创建连接 randomCandidates := needDynDials / 2 if randomCandidates > 0 { n := s.ntab.ReadRandomNodes(s.randomNodes) for i := 0; i < randomCandidates && i < n; i++ { if addDial(dynDialedConn, s.randomNodes[i]) { needDynDials-- } } } // Create dynamic dials from random lookup results, removing tried // items from the result buffer. // 为随机查找的节点创建动态连接,并从结果缓冲区中删除尝试的节点 i := 0 for ; i < len(s.lookupBuf) && needDynDials > 0; i++ { if addDial(dynDialedConn, s.lookupBuf[i]) { needDynDials-- } } s.lookupBuf = s.lookupBuf[:copy(s.lookupBuf, s.lookupBuf[i:])] // Launch a discovery lookup if more candidates are needed. // 如果还需要更多的连接,则启动发现节点 if len(s.lookupBuf) < needDynDials && !s.lookupRunning { s.lookupRunning = true newtasks = append(newtasks, &discoverTask{}) } // Launch a timer to wait for the next node to expire if all // candidates have been tried and no task is currently active. // This should prevent cases where the dialer logic is not ticked // because there are no pending events. // 如果当前没有任何任务,创建一个waitExpireTask if nRunning == 0 && len == 0 && s.hist.Len() > 0 { t := &waitExpireTask{s.hist.min().exp.Sub} newtasks = append(newtasks, t) } return newtasks}

里面包车型客车checkDial方法用来检查是否须要树立连接。

// 检查dial状态func (s *dialstate) checkDial(n *discover.Node, peers map[discover.NodeID]*Peer) error { _, dialing := s.dialing[n.ID] switch { case dialing: // 正在创建 return errAlreadyDialing case peers[n.ID] != nil // 已经创建过连接 return errAlreadyConnected case s.ntab != nil && n.ID == s.ntab.Self().ID: // 创建的对象不是自己 return errSelf case s.netrestrict != nil && !s.netrestrict.Contains: // 网络限制。对方IP不在白名单 return errNotWhitelisted case s.hist.contains: return errRecentlyDialed } return nil}

开创任务之后,针对分歧的task会有两样的Do达成。大家3个一个来看那多少个task的Do处理。首先来看看dialTask的Do处理,那里的dialTask首要在两个节点之间创造连接。

func (t *dialTask) Do(srv *Server) { // 目标节点dest ip地址为空 使用resolve方法去查找目标节点并解析出ip地址 if t.dest.Incomplete() { if !t.resolve { return } } // 建立连接 err := t.dial(srv, t.dest) if err != nil { log.Trace("Dial error", "task", t, "err", err) // Try resolving the ID of static nodes if dialing failed. // 如果是静态节点连接失败,尝试重新解析其节点ip地址 因为静态节点的ip是配置的,可能发生变动 if _, ok := err.(*dialError); ok && t.flags&staticDialedConn != 0 { if t.resolve { t.dial(srv, t.dest) } } }}// resolve attempts to find the current endpoint for the destination// using discovery.//// Resolve operations are throttled with backoff to avoid flooding the// discovery network with useless queries for nodes that don't exist.// The backoff delay resets when the node is found.// 当目标节点ip地址为空时使用该方法发现节点并解析ip地址func (t *dialTask) resolve(srv *Server) bool { if srv.ntab == nil { log.Debug("Can't resolve node", "id", t.dest.ID, "err", "discovery is disabled") return false } if t.resolveDelay == 0 { t.resolveDelay = initialResolveDelay } if time.Since(t.lastResolved) < t.resolveDelay { return false } // 查找到节点 resolved := srv.ntab.Resolve(t.dest.ID) t.lastResolved = time.Now() if resolved == nil { t.resolveDelay *= 2 if t.resolveDelay > maxResolveDelay { t.resolveDelay = maxResolveDelay } log.Debug("Resolving node failed", "id", t.dest.ID, "newdelay", t.resolveDelay) return false } // The node was found. t.resolveDelay = initialResolveDelay t.dest = resolved log.Debug("Resolved node", "id", t.dest.ID, "addr", &net.TCPAddr{IP: t.dest.IP, Port: int(t.dest.TCP)}) return true}type dialError struct { error}// dial performs the actual connection attempt.// 节点连接的实现func (t *dialTask) dial(srv *Server, dest *discover.Node) error { fd, err := srv.Dialer.Dial if err != nil { return &dialError{err} } // 新建一个计量连接 mfd := newMeteredConn(fd, false) // 执行握手并尝试将连接方作为一个peer return srv.SetupConn(mfd, t.flags, dest)}

那里的SetupConn方法就是上边Server对象的法子了。里面经过握手球组织议后将节点添加到peers队列。

看完了dialTask,接着来探望其余多个task:
discoverTask和waitExpireTask的Do处理。

// discoverTask的Do处理func (t *discoverTask) Do(srv *Server) { // newTasks generates a lookup task whenever dynamic dials are // necessary. Lookups need to take some time, otherwise the // event loop spins too fast. // 查找任务 next := srv.lastLookup.Add(lookupInterval) if now := time.Now(); now.Before { time.Sleep(next.Sub } srv.lastLookup = time.Now() var target discover.NodeID rand.Read(target[:]) // 查找发现节点的函数 t.results = srv.ntab.Lookup}...func (t waitExpireTask) Do { time.Sleep(t.Duration)}

那般树立连接的代码就看完了。综上,dial通过task职务来在多少个节点之间创造连接。当新建1个dialTask时会检查有着节点并安装情形,然后发起连接。假诺三番五次的节点未有直达动态连接数时,新建discoverTask来发现更加多节点。

前方已经看了一片段p二p源码,明日持续后面包车型大巴来研读源码。

peer

dial对象在四个节点node之间建立连接,当节点建立连接之后正是peer。peer主要处理四个节点建立连接之后的协议处理。

const ( // PeerEventTypeAdd is the type of event emitted when a peer is added // to a p2p.Server // 一个远程节点被添加到服务器 PeerEventTypeAdd PeerEventType = "add" // PeerEventTypeDrop is the type of event emitted when a peer is // dropped from a p2p.Server PeerEventTypeDrop PeerEventType = "drop" // PeerEventTypeMsgSend is the type of event emitted when a // message is successfully sent to a peer PeerEventTypeMsgSend PeerEventType = "msgsend" // PeerEventTypeMsgRecv is the type of event emitted when a // message is received from a peer PeerEventTypeMsgRecv PeerEventType = "msgrecv")// PeerEvent is an event emitted when peers are either added or dropped from// a p2p.Server or when a message is sent or received on a peer connection// Server添加或删除peer时或在peer连接上发送或接收消息时发出的事件type PeerEvent struct { Type PeerEventType `json:"type"` Peer discover.NodeID `json:"peer"` Error string `json:"error,omitempty"` Protocol string `json:"protocol,omitempty"` MsgCode *uint64 `json:"msg_code,omitempty"` MsgSize *uint32 `json:"msg_size,omitempty"`}// Peer represents a connected remote node.// 连接的远程节点type Peer struct { // 节点间连接的底层信息,比如使用的socket以及对端节点支持的协议 rw *conn // 节点间生效运行的协议簇 running map[string]*protoRW log log.Logger created mclock.AbsTime wg sync.WaitGroup protoErr chan error closed chan struct{} disc chan DiscReason // events receives message send / receive events if set // 事件接收消息发送/接收事件 events *event.Feed}

此间定义了devp二p的两种新闻类型,具体的协商项目参见wiki。

const ( // devp2p message codes // 握手消息 handshakeMsg = 0x00 // 断开消息 discMsg = 0x01 // ping pingMsg = 0x02 // ping消息的回复 pongMsg = 0x03)

peer里至关心爱慕要的三个意义正是运维扶助的协议族。

// 运行上层协议func  run() (remoteRequested bool, err error) { var ( // 写入开始的通道 writeStart = make(chan struct{}, 1) writeErr = make(chan error, 1) readErr = make(chan error, 1) reason DiscReason // sent to the peer ) // 开启两个协程,一个用于读取,一个用于ping操作 p.wg.Add // readLoop协程用于接收协议数据 go p.readLoop // pingLoop协程用于保持节点在线 go p.pingLoop() // Start all protocol handlers. // 启动协议 writeStart <- struct{}{} p.startProtocols(writeStart, writeErr) // Wait for an error or disconnect. // 循环执行直到发生错误或断开loop: for { select { case err = <-writeErr: // A write finished. Allow the next write to start if // there was no error. if err != nil { reason = DiscNetworkError break loop } writeStart <- struct{}{} case err = <-readErr: if r, ok := err.(DiscReason); ok { remoteRequested = true reason = r } else { reason = DiscNetworkError } break loop case err = <-p.protoErr: reason = discReasonForError break loop case err = <-p.disc: reason = discReasonForError break loop } } close p.rw.close p.wg.Wait() return remoteRequested, err}...// 开启遍历协议func  startProtocols(writeStart <-chan struct{}, writeErr chan<- error) { p.wg.Add(len(p.running)) // 遍历目前运行的协议族 for _, proto := range p.running { proto := proto proto.closed = p.closed proto.wstart = writeStart proto.werr = writeErr var rw MsgReadWriter = proto if p.events != nil { rw = newMsgEventer(rw, p.events, p.ID(), proto.Name) } p.log.Trace(fmt.Sprintf("Starting protocol %s/%d", proto.Name, proto.Version)) go func() { // 每一个协议开启一个协程调用其Run方法 err := proto.Run if err == nil { p.log.Trace(fmt.Sprintf("Protocol %s/%d returned", proto.Name, proto.Version)) err = errProtocolReturned } else if err != io.EOF { p.log.Trace(fmt.Sprintf("Protocol %s/%d failed", proto.Name, proto.Version), "err", err) } p.protoErr <- err p.wg.Done }}

ping方法来维系节点在线很简单,那里看下另2个读打消息的协程readLoop。

func  readLoop(errc chan<- error) { defer p.wg.Done() for { // 接收消息 msg, err := p.rw.ReadMsg() if err != nil { errc <- err return } // 消息接收时间 msg.ReceivedAt = time.Now() // 处理消息 if err = p.handle; err != nil { errc <- err return } }}...// 处理消息func  handle error { // 判断消息类型分别处理 switch { // 收到ping消息,回复pong消息 case msg.Code == pingMsg: msg.Discard() go SendItems(p.rw, pongMsg) case msg.Code == discMsg: var reason [1]DiscReason // This is the last message. We don't need to discard or // check errors because, the connection will be closed after it. rlp.Decode(msg.Payload, &reason) return reason[0] case msg.Code < baseProtocolLength: // ignore other base protocol messages return msg.Discard() default: // it's a subprotocol message // 获取子协议消息 proto, err := p.getProto if err != nil { return fmt.Errorf("msg code out of range: %v", msg.Code) } select { case proto.in <- msg: return nil case <-p.closed: return io.EOF } } return nil}...// getProto finds the protocol responsible for handling// the given message code.func  getProto(code uint64) (*protoRW, error) { for _, proto := range p.running { if code >= proto.offset && code < proto.offset+proto.Length { return proto, nil } } return nil, newPeerError(errInvalidMsgCode, "%d", code)}

peer里还有个音讯读写类protoEscortW来完结音信的读写。

// 协议读写类type protoRW struct { // 匿名对象 Protocol // 收到消息的通道 in chan Msg // receices read messages closed <-chan struct{} // receives when peer is shutting down wstart <-chan struct{} // receives when write may start werr chan<- error // for write results offset uint64 w MsgWriter}func (rw *protoRW) WriteMsg (err error) { if msg.Code >= rw.Length { return newPeerError(errInvalidMsgCode, "not handled") } msg.Code += rw.offset select { case <-rw.wstart: err = rw.w.WriteMsg // Report write status back to Peer.run. It will initiate // shutdown if the error is non-nil and unblock the next write // otherwise. The calling protocol code should exit for errors // as well but we don't want to rely on that. rw.werr <- err case <-rw.closed: err = ErrShuttingDown } return err}func (rw *protoRW) ReadMsg() (Msg, error) { select { case msg := <-rw.in: msg.Code -= rw.offset return msg, nil case <-rw.closed: return Msg{}, io.EOF }}

接下来Protocol类定义了P二P协议;Rubiconlpx定义了P2P网络通信底层的消息加密方法,peer中确立连接的三遍握手就都是在那贯彻的。

至此p二p的中坚源码就大致看完了。首先通过discover目录下的逐条类去发现方圆节点并将它们存款和储蓄到数据库,那里重要涉及Kademlia算法的明白和促成。接着,通过dial在七个节点之间确立连接。随后建立连接的网络链路两端的节点正是peer,peer之间由此支撑的磋商进行报导,建立tcp连接进行新闻的传递。个中,底层的音信传递通过大切诺基LPx
Encayption实行加密传输。

越来越多以太坊源码解析请移驾满世界最益阳性交友网,觉得可行记得给个小star哦

….

互连网颠覆世界,区块链颠覆互连网!

————————————————–20181103 17:58