以太坊源码解读-第5.2讲-rpc源码解读

前言

本文我们将分析rpc模块的源码,如果对rpc概念还不是很清楚的同学,建议先看看这篇文章以太坊源码解读-第5.1讲-rpc官翻及个人理解

本文后半部分解释有误,本文可对rpc了解大概,然后看之后的内容
先来看看该模块下有哪些文件:
.
|____ipc_unix.go
|____ipc_windows.go
|____ipc.go
|____http.go
|____doc.go
|____inproc.go
|____utils.go
|____websocket.go
|____errors.go
|____server.go
|____server_test.go
|____client.go
|____client_example_test.go
|____client_test.go
|____subscription.go
|____subscription_test.go
|____utils_test.go
|____http_test.go
|____types.go
|____types_test.go
|____json.go
|____json_test.go

文件很多,一眼看去真的很头大,小编借鉴rpc源码分析-github中提供的一张图片先全局的介绍一下rpc模块中整体的文件结构,这样方便后面的理解:

图中网络协议channelsJson两部分,其请求和回应的编码和解码都是同时与服务端和客户端打交道的类。网络协议channels主要提供连接和数据传输的功能。 json的编码和解码主要提供请求和回应的序列化和反序列化功能(Json -> Go的对象)。
另外需要知道,各种对外的服务都是被注册到server中的。
ps:要区分server和service的区别,server表示服务实体,service表示被注册到server中的一项服务

types.go源码

要了解rpc,得先要了解rpc中对各大类的定义,因此我们很有必要先来看下这个文件。

对外开放的API定义

外部访问rpc的server进行通信是通过调用这个API来实现。
分析过geth启动rpc的过程后,就会发现,所有的模块,都会把自己的api放在其中。

1
2
3
4
5
6
type API struct {
Namespace string // 在该命名空间下,service的方法被公开
Version string // 给dapp展示的api版本号
Service interface{} // 带有方法的service实例
Public bool // 在公共使用中,指示方法是否为安全的
}

service的定义

service定义了一个用来被注册到server中的服务,需要知道它内部结构是如何的:

1
2
3
4
5
6
type service struct {
name string // service的名称
typ reflect.Type // 类型,反射的
callbacks callbacks // 回调方法的集合
subscriptions subscriptions // 订阅/发布集合
}

发现callbacks和subscriptions都是集合,它是如下定义的:

1
2
type callbacks map[string]*callback      // 回调方法的集合,可以看出是一个map
type subscriptions map[string]*callback // 订阅的集合,也可以看出是一个map

这两个集合都是map类型的,而它们个体本身其实就是一个callback,它的结构如下:

1
2
3
4
5
6
7
8
type callback struct {
rcvr reflect.Value // 反射出方法的值
method reflect.Method // 反射出方法本身
argTypes []reflect.Type // 输入的参数
hasCtx bool // 检测第一个参数是否为context,
errPos int // 返回错误的索引err,无法返回则为-1
isSubscribe bool // 该callback是否为订阅
}

可以看出,该callback主要就是使用反射来确定回调本身的。代码注释解释的可能不太好,大意上是那么一回事。。

server的定义

上面说的service可以理解为一项服务,而server可以理解成是各种服务的容器,service最终是要被注册到server中的,而server的定义如下:

1
2
3
4
5
6
type Server struct {
services serviceRegistry //用来存储service
run int32 //用来控制server是否可运行,1为运行,非1为不可运行
codecsMu sync. //用来保护多线程访问codecs的锁
codecs *set.Set //用来存储所有的编码解码器,其实就是所有的连接。
}

其中的services就是用来存储service集合的,可以看出它是通过serviceRegistry来定义的,而serviceRegistry本身是如下定义的:

1
type serviceRegistry map[string]*service 

呵呵,这下知道了,service在server中是存在map中的

ServerCodec的定义

server中的方法会用到这个东东,客户端发出的请求,对请求的处理,返回响应等过程,都是通过它来进行的。
这个东西很重要,它贯穿了客户端和服务器端的交流,重要的不得了。
先来看看它的定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
type ServerCodec interface {
ReadRequestHeaders() ([]rpcRequest, bool, Error) //读取客户端发来的请求
//根据给定的类型解析请求参数
ParseRequestArguments(argTypes []reflect.Type, params interface{}) ([]reflect.Value, Error)
CreateResponse(id interface{}, reply interface{}) interface{} //response返回成功
CreateErrorResponse(id interface{}, err Error) interface{} //response返回失败
//response返回失败,包括一些别的信息
CreateErrorResponseWithInfo(id interface{}, err Error, info interface{}) interface{}
CreateNotification(id, namespace string, event interface{}) interface{} // 创建发布响应
Write(msg interface{}) error // 写信息到客户端
Close() // 关闭底层的数据流
Closed() <-chan interface{} // 当底层连接关闭后,则执行该接口方法
}

期间涉及到了两个比较重要的结构体:

  • rpcRequest,它里面包含具体的请求信息:
1
2
3
4
5
6
7
8
type rpcRequest struct {
service string
method string
id interface{}
isPubSub bool
params interface{}
err Error // invalid batch element
}
  • Error,错误信息结构:
1
2
3
4
type Error interface {
Error() string // 返回错误信息
ErrorCode() int // 返回错误代码
}

BlockNumber的定义

这个就是块号的解析,根据输入的byte[]来判断返回的BlockNumber是多少。这个后面大家可以了解下。

json.go

还记得我们前面提到的那个ServerCodec吗,它其实只是一个接口定义,真正其实是该文件中的jsonCodec生成的,jsonCodec实现了ServerCodec的所有接口

jsonCodec结构

rpc消息的请求和响应,以及序列化和解析都是这个结构来实现的,这

1
2
3
4
5
6
7
8
9
type jsonCodec struct {
closer sync.Once // close closed channel once
closed chan interface{} // closed on Close
decMu sync.Mutex // guards the decoder
decode func(v interface{}) error // decoder to allow multiple transports
encMu sync.Mutex // guards the encoder
encode func(v interface{}) error // encoder to allow multiple transports
rw io.ReadWriteCloser // connection
}

创建jsonCodec的重要的两个方法

可以通过NewCodec()方法和NewJSONCodec()方法来创建jsonCodec,方法本身很简单,小编就不解释了,注意编码和解码就行。

jsonCodec方法

这些方法都是具体实现了ServerCodec的接口,想了解具体是怎么实现的,可以看看代码去,小编在这就不详述了。

server.go和server_test.go源码

对上面的定义有了整体把握后,小编结合着server_test.go文件来讲解一下server.go源码。

先来看看server_test.go

为了方便测试,server_test.go中定义了一个Service,这个Service用来被注册到server中,关于这个Service很有必要知道是如何定义的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
type Service struct{}

type Args struct {
S string
}

type Result struct {
String string
Int int
Args *Args
}

func (s *Service) NoArgsRets() {} //第1个有效的回调方法

func (s *Service) Echo(str string, i int, args *Args) Result { //第2个有效的回调方法
return Result{str, i, args}
}

func (s *Service) EchoWithCtx(ctx context.Context, str string, i int, args *Args) Result { //第3个有效的回调方法
return Result{str, i, args}
}

func (s *Service) Sleep(ctx context.Context, duration time.Duration) { //第4个有效的回调方法
select {
case <-time.After(duration):
case <-ctx.Done():
}
}

func (s *Service) Rets() (string, error) { //第5个有效的回调方法
return "", nil
}

func (s *Service) InvalidRets1() (error, string) { //无效的回调方法
return nil, ""
}

func (s *Service) InvalidRets2() (string, string) { //无效的回调方法
return "", ""
}

func (s *Service) InvalidRets3() (string, string, error) { //无效的回调方法
return "", "", nil
}

func (s *Service) Subscription(ctx context.Context) (*Subscription, error) { //一个有效的订阅
return nil, nil
}

根据以太坊源码解读-第5.1讲-rpc官翻及个人理解中描述,我们知道上述代码中,Service对应的共有5个有效回调方法(也就是开放的方法),3个无效的回调方法,还有1个有效的订阅方法。在server.services中,是有两个服务的,是不是很好奇,其实在NewServer()时候,就会有一个rpc服务被添加进去的,具体可以看小编后面的介绍。
需要知道,首字母为大写的方法名,被认为是对外开放的方法,不要问我为什么,后面代码里就是这么个逻辑。
我们通过下面的测试用例来验证有效还是无效:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func TestServerRegisterName(t *testing.T) {
server := NewServer() //server新实例
service := new(Service) //具体某服务的实例
if err := server.RegisterName("calc", service); err != nil //根据名称将某服务注册
t.Fatalf("%v", err)
if len(server.services) != 2 //是否有两个服务,其中一个是在NewServer)()时候添加的
t.Fatalf("Expected 2 service entries, got %d", len(server.services))
svc, ok := server.services["calc"] //获取某服务
if !ok
t.Fatalf("Expected service calc to be registered")
if len(svc.callbacks) != 5 //是否有5个有效方法
t.Errorf("Expected 5 callbacks for service 'calc', got %d", len(svc.callbacks))
if len(svc.subscriptions) != 1 //是否有1个有效订阅方法
t.Errorf("Expected 1 subscription for service 'calc', got %d", len(svc.subscriptions))
}

然后server_test.go中还有3个测试方法是用来测试Service中的每个方法是否正确,其中涉及到了rpc的方方面面。这几个测试方法小编就不列出来了,大家可以去看看,方法本身还是很容易理解的。

server.go介绍

经过前面这么多的讲解,大家对rpc的server已经有了一些较为深刻的映像了吧?那我们就来看一下server.go中是如何实现server的。

创建一个新server

需要先创建一个server,这样才能将service注册到该server中,具体是这样创建的,看代码:

1
2
3
4
5
6
7
8
9
10
11
12
func NewServer() *Server {
server := &Server{ //server先实例化
services: make(serviceRegistry), //开辟存储service的空间
codecs: set.New(), //不解释,看前面
run: 1, //1,运行;非1不运行
}

//注册一个默认的rpc服务,该服务可以提供server的一些基本信息,具体看前面小编描述的
rpcService := &RPCService{server} //利用server生成一个rpcService,
server.RegisterName(MetadataApi, rpcService) //MetadataApi=“rpc”
return server
}

代码里,需要注意的是,初始化一个server,然后利用这个server生成一个rpcService,最后在用server把RPCService注册进去。
RPCService的目的是给出server中的一些基本参数信息,目前来说,貌似只能给出拥有的service名称和对应的版本号,而且都是1.0。。。
RPCService的定义以及它的方法如下:

1
2
3
4
5
6
7
8
9
10
11
type RPCService struct {   //这个是RPCService的定义结构,很简单吧。。不解释
server *Server //可以看出,该server是指针引用进来的
}

func (s *RPCService) Modules() map[string]string {
modules := make(map[string]string)
for name := range s.server.services {
modules[name] = "1.0"
}
return modules //其实只是返回每个service的名称和其版本号。。。
}

个人感觉,有点鸡肋额,,

server所拥有的方法

server.go文件中,剩下的内容都是server结构体所对应的方法,都比较重要,我们一个个来介绍

注册服务

前面我们一直在用server.RegisterName()来注册一个服务,但这个服务具体是怎么来执行呢?来,上一坨代码你就知道了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
func (s *Server) RegisterName(name string, rcvr interface{}) error {
if s.services == nil {
s.services = make(serviceRegistry) //开辟存储service的空间
}

svc := new(service)
svc.typ = reflect.TypeOf(rcvr)
rcvrVal := reflect.ValueOf(rcvr)

if name == "" {
return fmt.Errorf("no service name for type %s", svc.typ.String())
}
if !isExported(reflect.Indirect(rcvrVal).Type().Name()) { //方法名中,首字母大写的被认为是对外开放的方法
return fmt.Errorf("%s is not exported", reflect.Indirect(rcvrVal).Type().Name())
}

methods, subscriptions := suitableCallbacks(rcvrVal, svc.typ) //将方法和订阅都反射解析出来

// 若services中已经有了该service,则直接合并方法和订阅
if regsvc, present := s.services[name]; present {
if len(methods) == 0 && len(subscriptions) == 0 {
return fmt.Errorf("Service %T doesn't have any suitable methods/subscriptions to expose", rcvr)
}
for _, m := range methods {
regsvc.callbacks[formatName(m.method.Name)] = m
}
for _, s := range subscriptions {
regsvc.subscriptions[formatName(s.method.Name)] = s
}
return nil
}

svc.name = name
svc.callbacks, svc.subscriptions = methods, subscriptions

if len(svc.callbacks) == 0 && len(svc.subscriptions) == 0 {
return fmt.Errorf("Service %T doesn't have any suitable methods/subscriptions to expose", rcvr)
}

s.services[svc.name] = //根据名称存入服务
return nil
}

可以发现:

  • 方法或订阅如果是对外开放的,首先要满足其名称的首字母是大写
  • 调用utils.go中的suitableCallbacks()方法,使用service反射后的结果来判断是属于对外开放的方法还是订阅
  • 如果server中先前已经注册过该服务,则将新传入的service和该服务合并。
ServeCodec()方法,异步处理请求

先来看代码:

1
2
3
4
func (s *Server) ServeCodec(codec ServerCodec, options CodecOption) {
defer codec.Close()
s.serveRequest(codec, false, options) //具体实现后面再讲
}

参数codec中存储的是客户端发来的请求,经过处理后,会将响应结果写入codec中并返回给客户端。
该方法处理完codec中的内容后,会调用codec.Close()接口方法,处理请求结束时候的一些操作。
注意,看s.serveRequest(codec, false, options),里面的false表示该方法是并发处理请求的

ServeSingleRequest()方法,同步处理请求

呵呵,这个方法和上面的那个方法刚好相反,是同步处理请求的,等处理结束后,整个过程才会结束。此结束不提供codec.Close()方法,不用想也该明白,同步结束了,后面该干嘛就干嘛。
代码就不列出来了,自己YY。

serveRequest()方法,具体处理客户端发来的请求

前面讲的两个方法,其实里面都是在调用这个方法的,这个也是我们服务器端的核心,下面小编就来好好探索一下。
在讲之前,小编建议大家好好了解下go语言中并发与并行的一些机制,也是为了更好的读懂这一部分代码,小编整理了下面几篇文章,希望大家先好好读读:
golang语言并发与并行——goroutine和channel的详细理解(一)
golang语言并发与并行——goroutine和channel的详细理解(二)
golang语言并发与并行——goroutine和channel的详细理解(三)

这个方法,其实主要就执行了两个过程:解析读取发来的请求,执行处理请求。期间用到了很多锁和协程的概念,把该方法的主要代码列出来:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
func (s *Server) serveRequest(codec ServerCodec, singleShot bool, options CodecOption) error {
/**
* golang中的同步是通过sync.WaitGroup来实现的.WaitGroup的功能:它实现了一个类似队列的结构,可以一直向队列中添加任务,当任务完成后便从队列中删除,如果队列中的任务没有完全完成,可以通过Wait()函数来出发阻塞,防止程序继续进行,直到所有的队列任务都完成为止.
* WaitGroup的特点是Wait()可以用来阻塞直到队列中的所有任务都完成时才解除阻塞,而不需要sleep一个固定的时间来等待.但是其缺点是无法指定固定的goroutine数目.可能通过使用channel解决此问题。
*/
var pend sync.WaitGroup

//结束时候的调用
defer func() {
if err := recover(); err != nil {
const size = 64 << 10
buf := make([]byte, size)
buf = buf[:runtime.Stack(buf, false)]
log.Error(string(buf))
}
s.codecsMu.Lock()
s.codecs.Remove(codec)
s.codecsMu.Unlock()
}()

//context.Background() 返回一个空的Context,这个空的Context一般用于整个Context树的根节点。
ctx, cancel := context.WithCancel(context.Background()) //创建一个可取消的子Context
defer cancel()

if options&OptionSubscriptions == OptionSubscriptions {
ctx = context.WithValue(ctx, notifierKey{}, newNotifier(codec))
}
s.codecsMu.Lock()
//接受一个*int32类型的指针值,并会返回该指针值指向的那个值
if atomic.LoadInt32(&s.run) != 1 { // server stopped
s.codecsMu.Unlock()
return &shutdownError{}
}
s.codecs.Add(codec) //把请求加入集合
s.codecsMu.Unlock()

// test if the server is ordered to stop
for atomic.LoadInt32(&s.run) == 1 { //确保当前server没有停止
reqs, batch, err := s.readRequest(codec) //从其中读到请求信息
if err != nil {
if err.Error() != "EOF" {
log.Debug(fmt.Sprintf("read error %v\n", err))
codec.Write(codec.CreateErrorResponse(nil, err))
}
pend.Wait()
return nil
}

//又是一堆验证
if atomic.LoadInt32(&s.run) != 1 {
err = &shutdownError{}
if batch {
resps := make([]interface{}, len(reqs))
for i, r := range reqs {
resps[i] = codec.CreateErrorResponse(&r.id, err)
}
codec.Write(resps)
} else {
codec.Write(codec.CreateErrorResponse(&reqs[0].id, err))
}
return nil
}

if singleShot { //非并发
if batch {
s.execBatch(ctx, codec, reqs) //批处理请求
} else {
s.exec(ctx, codec, reqs[0])
}
return nil
}
pend.Add(1) //添加一个阻塞任务

go func(reqs []*serverRequest, batch bool) {
defer pend.Done() //处理一个阻塞任务
if batch {
s.execBatch(ctx, codec, reqs) //这个批处理请求
} else {
s.exec(ctx, codec, reqs[0]) //处理单个请求
}
}(reqs, batch)
}
return nil
}

从中我们得知,主要涉及到两个方法:

  • readRequest(codec)
    该方法解析并读取有效的客户端请求,会区分哪些是方法,哪些是订阅,根据不同状况,将这些信息都组装到requests[]中;
  • execBatch()/exec(),用于处理requests[]请求。一个批量处理,一个是单一处理,然后回调。具体的解析,稍后专门来讲。
execBatch()/exec()

这也是server的方法,前面也说了,这两个方法类似,都是处理请求的,一个批量处理,一个单个处理。
小编就只列一下exec()的源码了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func (s *Server) exec(ctx context.Context, codec ServerCodec, req *serverRequest) {
var response interface{}
var callback func()
if req.err != nil {
response = codec.CreateErrorResponse(&req.id, req.err)
} else {
response, callback = s.handle(ctx, codec, req) //处理请求
}

if err := codec.Write(response); err != nil {
log.Error(fmt.Sprintf("%v\n", err))
codec.Close()
}

if callback != nil {
callback() //执行回调
}
}

方法很简单,从头到尾,能入得了法眼的也只有里面涉及到的handle()方法了。这个handle()真正的处理了请求内容。
接下来那就看一下这个handle()到底执行了哪些东西。

handle()方法

server中最长最重要的一个方法,用于真正的处理数据,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
func (s *Server) handle(ctx context.Context, codec ServerCodec, req *serverRequest) (interface{}, func()) {
if req.err != nil {
return codec.CreateErrorResponse(&req.id, req.err), nil
}

if req.isUnsubscribe { // 取消订阅, 第一个参数必须是订阅id
if len(req.args) >= 1 && req.args[0].Kind() == reflect.String {
notifier, supported := NotifierFromContext(ctx)
if !supported { // interface doesn't support subscriptions (e.g. http)
return codec.CreateErrorResponse(&req.id, &callbackError{ErrNotificationsUnsupported.Error()}), nil
}

subid := ID(req.args[0].String())
if err := notifier.unsubscribe(subid); err != nil {
return codec.CreateErrorResponse(&req.id, &callbackError{err.Error()}), nil
}

return codec.CreateResponse(req.id, true), nil
}
return codec.CreateErrorResponse(&req.id, &invalidParamsError{"Expected subscription id as first argument"}), nil
}
//如果是订阅消息。 那么创建订阅。并激活订阅
if req.callb.isSubscribe {
subid, err := s.createSubscription(ctx, codec, req)
if err != nil {
return codec.CreateErrorResponse(&req.id, &callbackError{err.Error()}), nil
}

// active the subscription after the sub id was successfully sent to the client
activateSub := func() {
notifier, _ := NotifierFromContext(ctx)
notifier.activate(subid, req.svcname)
}

return codec.CreateResponse(req.id, subid), activateSub
}

// regular RPC call, prepare arguments
if len(req.args) != len(req.callb.argTypes) {
rpcErr := &invalidParamsError{fmt.Sprintf("%s%s%s expects %d parameters, got %d",
req.svcname, serviceMethodSeparator, req.callb.method.Name,
len(req.callb.argTypes), len(req.args))}
return codec.CreateErrorResponse(&req.id, rpcErr), nil
}

arguments := []reflect.Value{req.callb.rcvr}
if req.callb.hasCtx {
arguments = append(arguments, reflect.ValueOf(ctx))
}
if len(req.args) > 0 {
arguments = append(arguments, req.args...)
}

//调用提供的rpc方法,并获取reply
reply := req.callb.method.Func.Call(arguments)
if len(reply) == 0 {
return codec.CreateResponse(req.id, nil), nil
}

if req.callb.errPos >= 0 { // test if method returned an error
if !reply[req.callb.errPos].IsNil() {
e := reply[req.callb.errPos].Interface().(error)
res := codec.CreateErrorResponse(&req.id, &callbackError{e.Error()})
return res, nil
}
}
return codec.CreateResponse(req.id, reply[0].Interface()), nil
}
Stop()方法

好吧,server的最后一个方法,调用后,将停止接收请求,并且当阻塞队列中的消息处理结束后,则完整停止。代码就不列了。

至此,server.go内容到此结束。

subscription.go

发布/订阅相关,这里面封装了消息结构,是否订阅等操作,这块的代码其实很简单,有兴趣的再进一步研读吧,小编精力有限,这里就不细看了,待有机会需要了,再来好好读读。

client.go/client_example_test.go/client_test.go源码

大家有没有发现,前面一直在讲server相关,看得越多反而越迷茫,越往后,小编越迫切的想知道,client到底是怎样对接到server端的?
这次,小编打算从client_example_test.go源码开始阅读代码了,这样子更容易理解
打开lient_example_test.go你会发现,里面有一个客户端订阅消息的例子。来,还是先瞅一眼吧:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
type Block struct {
Number *big.Int
}

func ExampleClientSubscription() {
client, _ := rpc.Dial("ws://127.0.0.1:8485") //客户端连接,
subch := make(chan Block)
go func() {
for i := 0; ; i++ {
if i > 0 {
time.Sleep(2 * time.Second)
}
subscribeBlocks(client, subch)
}
}()
for block := range subch {
fmt.Println("latest block:", block.Number)
}
}

func subscribeBlocks(client *rpc.Client, subch chan Block) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

sub, err := client.EthSubscribe(ctx, subch, "newBlocks")
if err != nil {
fmt.Println("subscribe error:", err)
return
}

var lastBlock Block
if err := client.CallContext(ctx, &lastBlock, "eth_getBlockByNumber", "latest"); err != nil {
fmt.Println("can't get latest block:", err)
return
}
subch <- lastBlock
fmt.Println("connection lost: ", <-sub.Err())
}

第1步–客户端连接

首先映入眼帘的是rpc.Dial("ws://127.0.0.1:8485"),它是要连接到websocket服务器端,可以理解为它就是为了建立起一个客户端。我们跟着进入到client.go文件中,会发现它调用的是DialContext()方法,而该方法会根据传入的不同的url类型,选择具体的网络进行连接,可以看出它目前支持的是"http", “https”, “ws” 以及 “wss”,若url没有头部,则认为是本地进行IPC连接。从上面例子我们可知传入的是ws协议的连接,因此,需要接入DialWebsocket(ctx, rawurl, "")这个方法,具体来看一下该方法吧:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func DialWebsocket(ctx context.Context, endpoint, origin string) (*Client, error) {
if origin == "" {
var err error
if origin, err = os.Hostname(); err != nil {
return nil, err
}
if strings.HasPrefix(endpoint, "wss") {
origin = "https://" + strings.ToLower(origin) //origin为websocket客户端源
} else {
origin = "http://" + strings.ToLower(origin) //
}
}
config, err := websocket.NewConfig(endpoint, origin)
if err != nil {
return nil, err
}

return newClient(ctx, func(ctx context.Context) (net.Conn, error) {
return wsDialContext(ctx, config)
})
}

代码中可知,生成对应的origin,然后进一步配置websocket。origin官方的描述意思是一个WebSocket客户端源
一切准备就绪以后,就会执行newClient()方法来生成客户端,下面来讲一下该方法。

第2步–客户端连接内部机制

好吧,真正整合建立客户端连接的方法还是newClient(),该方法有两个参数,一个是上下文,一个是某协议生成并返回连接的方法(ws或者http)
先来看代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func newClient(initctx context.Context, connectFunc func(context.Context) (net.Conn, error)) (*Client, error) {
conn, err := connectFunc(initctx)
if err != nil {
return nil, err
}
_, isHTTP := conn.(*httpConn)

c := &Client{
writeConn: conn,

isHTTP: isHTTP,
connectFunc: connectFunc,
close: make(chan struct{}),
didQuit: make(chan struct{}),
reconnected: make(chan net.Conn),
readErr: make(chan error),
readResp: make(chan []*jsonrpcMessage),
requestOp: make(chan *requestOp),
sendDone: make(chan error, 1),
respWait: make(map[string]*requestOp),
subs: make(map[string]*ClientSubscription),
}
if !isHTTP {
go c.dispatch(conn)
}
return c, nil
}

代码主要就是两个意思,先判断某协议的连接是不是http形式的,如果不是,则会启动一个goroutine调用dispatch方法。 这个方法下一个步骤专门来讲。
另外在此小编补充一句,HTTP协议和非HTTP协议有不同的处理流程, HTTP协议不支持长连接, 只支持一个请求对应一个回应的这种模式,同时也不支持发布/订阅模式。
这里还涉及到了client的结构,为了更好的理解,小编还是把这个结构列出来吧:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
type Client struct {
idCounter uint32
//生成连接的函数,客户端会调用这个函数生成一个网络连接对象。
connectFunc func(ctx context.Context) (net.Conn, error)
//HTTP协议和非HTTP协议有不同的处理流程, HTTP协议不支持长连接, 只支持一个请求对应一个回应的这种模式,同时也不支持发布/订阅模式。
isHTTP bool

//通过这里的注释可以看到,writeConn是调用这用来写入请求的网络连接对象,
//只有在dispatch方法外面调用才是安全的,而且需要通过给requestOp队列发送请求来获取锁,
//获取锁之后就可以把请求写入网络,写入完成后发送请求给sendDone队列来释放锁,供其它的请求使用。
writeConn net.Conn

// for dispatch
//下面有很多的channel,channel一般来说是goroutine之间用来通信的通道,后续会随着代码介绍channel是如何使用的。
close chan struct{}
didQuit chan struct{} // closed when client quits
reconnected chan net.Conn // where write/reconnect sends the new connection
readErr chan error // errors from read
readResp chan []*jsonrpcMessage // valid messages from read
requestOp chan *requestOp // for registering response IDs
sendDone chan error // signals write completion, releases write lock
respWait map[string]*requestOp // active requests
subs map[string]*ClientSubscription // active subscriptions
}

第3步–非http协议的dispatch处理

上一步中我们得知,因为我们的协议是ws因此会启动一个goroutine调用dispatch方法。dispatch方法是整个client的指挥中心,通过channel来和其他的goroutine来进行通信,获取信息,根据信息做出各种决策。
这个方法是客户端的一个核心方法,先来看代码吧:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
// dispatch is the main loop of the client.
// It sends read messages to waiting calls to Call and BatchCall
// and subscription notifications to registered subscriptions.
func (c *Client) dispatch(conn net.Conn) {
// Spawn the initial read loop.
go c.read(conn)

var (
lastOp *requestOp // tracks last send operation
requestOpLock = c.requestOp // nil while the send lock is held
reading = true // if true, a read loop is running
)
defer close(c.didQuit)
defer func() {
c.closeRequestOps(ErrClientQuit)
conn.Close()
if reading {
// Empty read channels until read is dead.
for {
select {
case <-c.readResp:
case <-c.readErr:
return
}
}
}
}()

for {
select {
case <-c.close:
return

// Read path.
case batch := <-c.readResp:
//读取到一个回应。调用相应的方法处理
for _, msg := range batch {
switch {
case msg.isNotification():
log.Trace("", "msg", log.Lazy{Fn: func() string {
return fmt.Sprint("<-readResp: notification ", msg)
}})
c.handleNotification(msg)
case msg.isResponse():
log.Trace("", "msg", log.Lazy{Fn: func() string {
return fmt.Sprint("<-readResp: response ", msg)
}})
c.handleResponse(msg)
default:
log.Debug("", "msg", log.Lazy{Fn: func() string {
return fmt.Sprint("<-readResp: dropping weird message", msg)
}})
// TODO: maybe close
}
}

case err := <-c.readErr:
//接收到读取失败信息,这个是read线程传递过来的。
log.Debug(fmt.Sprintf("<-readErr: %v", err))
c.closeRequestOps(err)
conn.Close()
reading = false

case newconn := <-c.reconnected:
//接收到一个重连接信息
log.Debug(fmt.Sprintf("<-reconnected: (reading=%t) %v", reading, conn.RemoteAddr()))
if reading {
//等待之前的连接读取完成。
// Wait for the previous read loop to exit. This is a rare case.
conn.Close()
<-c.readErr
}
//开启阅读的goroutine
go c.read(newconn)
reading = true
conn = newconn

// Send path.
case op := <-requestOpLock:
// Stop listening for further send ops until the current one is done.
//接收到一个requestOp消息,那么设置requestOpLock为空,
//这个时候如果有其他人也希望发送op到requestOp,会因为没有人处理而阻塞。
requestOpLock = nil
lastOp = op
//把这个op加入等待队列。
for _, id := range op.ids {
c.respWait[string(id)] = op
}

case err := <-c.sendDone:
//当op的请求信息已经发送到网络上。会发送信息到sendDone。如果发送过程出错,那么err !=nil。
if err != nil {
// Remove response handlers for the last send. We remove those here
// because the error is already handled in Call or BatchCall. When the
// read loop goes down, it will signal all other current operations.
//把所有的id从等待队列删除。
for _, id := range lastOp.ids {
delete(c.respWait, string(id))
}
}
// Listen for send ops again.
//重新开始处理requestOp的消息。
requestOpLock = c.requestOp
lastOp = nil
}
}
}

ps:由于工作上的一些意外,小编需要尽早结束本篇文章,后面剩余部分内容就全是[github-ZtesoftCS](https://github.com/ZtesoftCS/go-ethereum-code-analysis/blob/master/rpc%E6%BA%90%E7%A0%81%E5%88%86%E6%9E%90.md)中的了,在此先感谢该作者了
下面通过下面这种图来说明dispatch的主要流程。下面图片中圆形是线程。 蓝色矩形是channel。 箭头代表了channel的数据流动方向。

  • 多线程串行发送请求到网络上的流程 首先发送requestOp请求到dispatch获取到锁, 然后把请求信息写入到网络,然后发送sendDone信息到dispatch解除锁。 通过requestOp和sendDone这两个channel以及dispatch代码的配合完成了串行的发送请求到网络上的功能。
  • 读取返回信息然后返回给调用者的流程。 把请求信息发送到网络上之后, 内部的goroutine read会持续不断的从网络上读取信息。 read读取到返回信息之后,通过readResp队列发送给dispatch。 dispatch查找到对应的调用者,然后把返回信息写入调用者的resp队列中。完成返回信息的流程。
  • 重连接流程。 重连接在外部调用者写入失败的情况下被外部调用者主动调用。 调用完成后发送新的连接给dispatch。 dispatch收到新的连接之后,会终止之前的连接,然后启动新的read goroutine来从新的连接上读取信息。
  • 关闭流程。 调用者调用Close方法,Close方法会写入信息到close队列。 dispatch接收到close信息之后。 关闭didQuit队列,关闭连接,等待read goroutine停止。 所有等待在didQuit队列上面的客户端调用全部返回。

订阅部分小编暂时就不讲了。

全文总结

本篇写的有点仓促,小编也认为写的很不合格,由于一些意外,小编源码阅读暂时停止更新。
后面若有机会,会修正本文内容的。

Donate
  • Copyright: Copyright is owned by the author. For commercial reprints, please contact the author for authorization. For non-commercial reprints, please indicate the source.
  • Copyrights © 2017-2023 Jason
  • Visitors: | Views:

谢谢打赏~

微信