以太坊源码解读-第5.4讲-http rpc server相关源码详解

前言

在上一篇文章以太坊源码解读-第5.3讲-http rpc的启动流程分析的基础上,我们继续深入探索以太坊的rpc机制.
以太坊源码解读-第5.2讲-rpc源码解读中,小编也介绍过rpc的实现,但涉及的有点广,可能并不太容易理解,为此,本文将结合http rpc的注册流程来讲解一下rpc的实现。

我们已经得知,http rpc的调用流程方式是:
创建server:rpc.NewServer()->注册service:handler.RegisterName(api.Namespace, api.Service)->启动监听:rpc.NewHTTPServer(cors, vhosts, handler).Serve(listener)->停止监听:n.httpListener.Close()->服务停止:n.httpHandler.Stop()
下面我们一步步来了解每个过程是如何进行的。

ps:先前我么已经得知,rpc总共有4类,这里小编由于时间和精力,只介绍http rpc的调用细节。
pps:注意server和service的区别,下文中很多地方将会涉及到。

第一步:创建server

进入到这个
先来看一下创建server(rpc->server.go)的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func NewServer() *Server {
//实例化一个server
server := &Server{
services: make(serviceRegistry),
codecs: set.New(),
run: 1,
}
//先注册一个RPC Service,该services是一个空的
//将server加入到RPCService中
rpcService := &RPCService{server}
//此处MetadataApi=“rpc”
server.RegisterName(MetadataApi, rpcService)
return server
}

这段代码中,可以得知以下几个重要信息:

  1. 创建的server是被加入到RPCService中的。这个RPCService结构体如下:
1
2
3
type RPCService struct {
server *Server
}
  1. 返回的是一个server
  2. 可以大胆想象,之后所有的service注册后,都会被添加到第一个RPCService中的server,也就是说,RPCService管理着其余所有的service

第一步:Server相关结构体

创建过程中,涉及到了Server相关结构,很重要,很有必要掌握。
rpc->types.go中,对serverservice的定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//service使用传入的名称作为key来区分
type serviceRegistry map[string]*service

//用于标准化注册进来的服务
//这是以太坊每个功能模块提供的服务,都是在模块的api.service中
type service struct {
name string // service的名称
typ reflect.Type // 类型,反射的
callbacks callbacks // 回调方法的集合
subscriptions subscriptions // 订阅/发布集合
}

type Server struct {
services serviceRegistry //用来存储service
run int32 //用来控制server是否可运行,1为运行,非1为不可运行
codecsMu sync. //用来保护多线程访问codecs的锁
codecs *set.Set //用来存储所有的编码解码器,其实就是所有的连接。
}

可以看出,每一个service都会被装填到server中(也就是注册,具体注册细节后面会专门讲到),了解过geth启动流程后,会明白,这里的service表示的是每个以太坊功能模块api.service中的东西。这个api在rpc中是有定义的(同样是在types.go中):

1
2
3
4
5
6
type API struct {
Namespace string // 在该命名空间下,service的方法被公开
Version string // 给dapp展示的api版本号
Service interface{} // 这个就是要被注册到server中的service
Public bool // 在公共使用中,指示方法是否为安全的
}

service中的callbacks和subscriptions其实都是callbacks,这个是rpc回调用到的。结构如下:

1
2
3
4
5
6
7
8
9
10
11
type callbacks map[string]*callback      // 回调方法的集合,可以看出是一个map
type subscriptions map[string]*callback // 订阅的集合,也可以看出是一个map

type callback struct {
rcvr reflect.Value // 反射出方法的值
method reflect.Method // 反射出方法本身
argTypes []reflect.Type // 输入的参数
hasCtx bool // 检测第一个参数是否为context,
errPos int // 返回错误的索引err,无法返回则为-1
isSubscribe bool // 该callback是否为订阅
}

第二步:注册service

  1. 前面创建server时候,我们已经发现其注册了一个RPCService,并且将server填充到了RPCService中。创建返回的结果是该server,之后的service都是在该server下注册的,也就是说,RPCService管理着之后的所有service
  2. 前面我们也知道了,http rpc会调用rpc->server.go中的RegisterName()方法来进行注册。
  3. 接下来我们就进一步分析整个注册流程。

RegisterName()

先来看一下rpc->server.go注册的源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
//RegisterName 注册服务
//传入的rcvr可以是任意的,传入后都会被标准化处理
func (s *Server) RegisterName(name string, rcvr interface{}) error {
if s.services == nil {
s.services = make(serviceRegistry)
}

//注册进来的rcvr都被标准化为service
svc := new(service)
svc.typ = reflect.TypeOf(rcvr)
rcvrVal := reflect.ValueOf(rcvr)

if name == "" {
return fmt.Errorf("no service name for type %s", svc.typ.String())
}

//判断传进来的服务类型首字母是否为大写
if !isExported(reflect.Indirect(rcvrVal).Type().Name()) {
fmt.Println(reflect.Indirect(rcvrVal).Type().Name())
return fmt.Errorf("%s is not exported", reflect.Indirect(rcvrVal).Type().Name())
}
//获取到服务的回调方法和订阅
methods, subscriptions := suitableCallbacks(rcvrVal, svc.typ)

//根据name判断,若services中已经有了该service,则直接更新方法和订阅
if regsvc, present := s.services[name]; present {
if len(methods) == 0 && len(subscriptions) == 0 {
return fmt.Errorf("service %T doesn't have any suitable methods/subscriptions to expose", rcvr)
}
for _, m := range methods {
//使用首字母小写的方法名作为key来保存该service的最新的方法
regsvc.callbacks[formatName(m.method.Name)] = m
}
for _, s := range subscriptions {
//使用首字母小写的订阅名作为key来保存该service的最新的订阅
regsvc.subscriptions[formatName(s.method.Name)] = s
}
return nil //更新完后,则退出
}
svc.name = name
svc.callbacks, svc.subscriptions = methods, subscriptions
if len(svc.callbacks) == 0 && len(svc.subscriptions) == 0 {
return fmt.Errorf("service %T doesn't have any suitable methods/subscriptions to expose", rcvr)
}
//将service保存在server中
s.services[svc.name] = svc
return nil
}

注册阶段,会反射解析传入的service,有可能是普通的rpc,也有可能是订阅,根据各自的规则进行判断,比如订阅规则,可参考以太坊源码解读-第5.5讲-rpc 订阅模块解读.
从上面代码也可知道,真正解析service的是suitableCallbacks()方法,它会将rpc普通方法和订阅解析出来。

第三步:启动http监听

服务器注册完毕后,就该启动网络监听了。
rpc->http.go中,NewHTTPServer()方法启动监听。
再来看看http rpc是怎样启动网络监听的:go rpc.NewHTTPServer(cors, vhosts, handler).Serve(listener)
异步,开了个goroutine。
那就看看NewHTTPServer()的源码是什么:
ps:大概看了下最新的代码2018.11.09的,结构稍微发生了点变化,但依旧用到了NewHTTPServer(),虽然该方法一直被标注为Deprecated,然并卵。

1
2
3
4
5
6
7
//小编看的代码是两个月前的,貌似和最新版的的有些差距
//最新版本的代码在现有基础上,又加入了延时等操作,但不影响整体阅读
func NewHTTPServer(cors []string, vhosts []string, srv *Server) *http.Server {
handler := newCorsHandler(srv, cors) //主要是浏览器读取时候的些问题设置
handler = newVHostHandler(vhosts, handler) //host的设置
return &http.Server{Handler: handler}
}

第四、五步:停止监听

这本身就是http内部的工作机制,
net.Listener.Close()
server.stop()这个可以看源码去,不复杂,就不解释了。

总结

至此,rpc server端的启动就结束了,应该还好理解吧

Donate
  • Copyright: Copyright is owned by the author. For commercial reprints, please contact the author for authorization. For non-commercial reprints, please indicate the source.
  • Copyrights © 2017-2023 Jason
  • Visitors: | Views:

谢谢打赏~

微信