Micro源码系列 - Go-Micro服务的构造过程

在每一个技术领域,特别是微服务这一分支,Go-Micro尝试以最简单的方式帮助大家以最快的速度构建微服务。我们有比较详细但是仍需要大量持续更新、增进的文档,也有足够多的示例代码帮助大家验证每个功能点是否满足大家的业务需求。同时,我们在总结很多朋友的常询问的问题后,发现大家对Go-Micro的源码实现都很有兴趣。

故而,在大家对了解如何使用Go-Micro来构建服务后,我们专门用这一系列来从设计到源码系统地讲解Go-Micro

想尝试Go-Micro?点击了解 中文教程

总架构

首先,我们从Go-Micro的架构图开始,回顾一下:

如上图所示,Go-Micro由三层设计共5大模块组成。

最上层的service是基于Go-Micro所构建的服务,属于应用层,它属于Go-Micro末端。

中层的ClientServer是第一层中service所包含的请求端响应端,它们存在于service中,处于设计中的中游,是Go-Micro体系中一切请求与响应的出入口。

最下层的便是Go-Micro核心5模块所在,broker负责消息,Codec负责编码,Registry负责注册发现,Selector负责负载均衡,Transport负责接收请求与响应。

Go-Micro本质是一个网络库(networking lib)

三段式

Go-Micro中原生服务通常由三段组成:NewServiceInitRun。它们功能分别是创建服务,初始化服务,运行服务。

func main() {
    service := micro.NewService(
        micro.Name("go.micro.srv.example"),
    )

    service.Init()

    if err := service.Run(); err != nil {
        log.Fatal(err)
    }
}

上面的代码便可以启动一个微服务。接下来,我们便从经典三段入手,先讲解这三段背后各自隐藏的逻辑。

NewService

NewService负责创建新服务。它的参数是micro.Option函数数组,此类函数专用来装配启动参数或配置选项。

func NewService(opts ...Option) Service {
    return newService(opts...)
}

整个NewService有三个步骤:

参考上图,对2,3步骤可以有更好的理解。服务中有两个对象Client与Server,分别负责发送请求与接收请求。

构建Service服务核心代码:

service.go

func newService(opts ...Option) Service {

    // 初始化配置选项
    options := newOptions(opts...)

    // 包装客户端
    options.Client = &clientWrapper{
        options.Client,
        metadata.Metadata{
            HeaderPrefix + "From-Service": options.Server.Options().Name,
        },
    }

    // 组装服务
    return &service{
        opts: options,
    }
}

1.初始化配置选项

配置选项通过Option函数来传递

Option

type Option func(*Options)

我们就用micro.Name("go.micro.srv.example")来说明

options.go

// Name of the service
func Name(n string) Option {
    return func(o *Options) {
        o.Server.Init(server.Name(n))
    }
}

上述代码中接收服务名n后即返回一个回参为Option的匿名函数。函数中的o.Server.Init(server.Name(n)),我们这里不讲,大家只需要知道是服务的配置初始化逻辑,我们把它放到后面的专门讲解Server(MockServer,RpcServer,HttpServer)的章节里讲解。

更多选项配置函数参考:options.go

在上面的newService代码中,第一部分就是初始化传入的配置:

func newService(opts ...Option) Service {

    // 初始化配置选项
    options := newOptions(opts...)

    // ...
}

我们再来看看newOptions方法

func newOptions(opts ...Option) Options {
    opt := Options{
        Broker:    broker.DefaultBroker,
        Cmd:       cmd.DefaultCmd,
        Client:    client.DefaultClient,
        Server:    server.DefaultServer,
        Registry:  registry.DefaultRegistry,
        Transport: transport.DefaultTransport,
        Context:   context.Background(),
    }

    for _, o := range opts {
        o(&opt)
    }

    return opt
}

可见,初始化配置中,预置有基础组件默认配置。再调用opts中传入的参数,逐个执行定制的配置函数。

Option函数的参数是Options,里面维护服务配置信息,比如注册、命令行信息等。

每个Option函数执行完后即返回配置对象Options。

2.包装客户端

包装客户端在配置初始化之后,客户端由包装器clientWrapper封装。

func newService(opts ...Option) Service {

    options.Client = &clientWrapper{
        options.Client,
        metadata.Metadata{
            HeaderPrefix + "From-Service": options.Server.Options().Name,
        },
    }
}

包装器继承了客户端,并自带由元数据组成的头:

wrapper.go

type clientWrapper struct {
    client.Client
    headers metadata.Metadata
}

go-micro中的Client都有三个能力:

  • 调用RPC Call
  • 流式请求 Stream
  • 广播消息 Publish
func (c *clientWrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
    ctx = c.setHeaders(ctx)
    return c.Client.Call(ctx, req, rsp, opts...)
}

func (c *clientWrapper) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
    ctx = c.setHeaders(ctx)
    return c.Client.Stream(ctx, req, opts...)
}

func (c *clientWrapper) Publish(ctx context.Context, p client.Message, opts ...client.PublishOption) error {
    ctx = c.setHeaders(ctx)
    return c.Client.Publish(ctx, p, opts...)
}

其实Service中Client并没有过多改变原生Client行为,只是在其发送请求或广播前把元数据放到上下文中传向下游服务传递。见setHeaders:

setHeaders

func (c *clientWrapper) setHeaders(ctx context.Context) context.Context {

    // 复制元数据信息
    mda, _ := metadata.FromContext(ctx)
    md := metadata.Copy(mda)

    // 预置头信息塞到元数据
    for k, v := range c.headers {
        if _, ok := md[k]; !ok {
            md[k] = v
        }
    }

    // 元数据塞到上下文中
    return metadata.NewContext(ctx, md)
}

3.组装服务

Go-Micro中的server非常简单

type service struct {
    opts Options
    once sync.Once
}

服务结构中只有配置项与单次锁。

以上就是service的构造过程。其最核心的地方便是初始化配置选项包装客户端。本章的主要内容是介绍服务构建流程,所以不会深入讲解配置项,后面的章节会讲。

Init

初始化函数在构造服务之后,由服务调用,它的工作主要是再次渲染传入Init的Option配置项,然后再初始化命令行参数。

func (s *service) Init(opts ...Option) {
    // process options
    for _, o := range opts {
        o(&s.opts)
    }

    s.once.Do(func() {
        // 初始化命令行参数,会覆盖的参数
        _ = s.opts.Cmd.Init(
            cmd.Broker(&s.opts.Broker),
            cmd.Registry(&s.opts.Registry),
            cmd.Transport(&s.opts.Transport),
            cmd.Client(&s.opts.Client),
            cmd.Server(&s.opts.Server),
        )
    })
}

由于在NewService中加载过一次传入的配置项,而Init中如果有配置项传入,会改变之此前的值,故而,在Go-Micro体系中,配置项的生效顺序由小到大依次是:默认值 < 硬编码值(随构造函数传入)< 命令行参数

我们再用micro.Name("go.micro.srv.example")来说明,假设我们通过下列4种方式指定服务名:

  • 默认值
service := micro.NewService()
  • 随参传入
service := micro.NewService(
        micro.Name("go.micro.api.new"), 
        )
  • 随参传入 Init
service.Init(micro.Name("go.micro.api.init"),)
  • 命令行参数
go run main.go --server_name=go.micro.api.cmd

它们各自打印的服务名分别是:

  • server
  • go.micro.api.new
  • go.micro.api.init
  • go.micro.api.cmd

Init函数的本质是再次加载配置,并确保权重最高的命令行参数生效

Run

在服务构造与初始化完成后,便可以执行Run函数,它负责新增协程把服务启动(Start),然后通过select指令阻塞并侦听相关的关闭信号,随后执行关闭程序。

Run

func (s *service) Run() error {

    // 启动服务
    if err := s.Start(); err != nil {
        return err
    }

    ch := make(chan os.Signal, 1)
    signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT)

    select {
    // 侦听kill信号
    case <-ch:
    // 侦听上下文关闭信号
    case <-s.opts.Context.Done():
    }

    return s.Stop()
}

在上述代码中我们看到,服务启动后由signal.Notify方法将三个信号量绑定到信道上:

  • syscall.SIGTERM 结束进程
  • syscall.SIGINT ctrl + c
  • syscall.SIGQUIT ctrl + \

题外,类Unix标准系统中,只有如下几个信号绑定了键盘

SIGHUP、SIGINT、SIGTTOU、SIGTTIN、SIGQUIT、SIGTSTP

服务启动

服务的启动由Run函数中的启动方法Start完成

func (s *service) Run() error {

    // 启动服务
    if err := s.Start(); err != nil {
        return err
    }

    // ...
}

Start

func (s *service) Start() error {

    // 启动勾子
    for _, fn := range s.opts.BeforeStart {
        if err := fn(); err != nil {
            return err
        }
    }

    if err := s.opts.Server.Start(); err != nil {
        return err
    }

    // 启动完成勾子
    for _, fn := range s.opts.AfterStart {
        if err := fn(); err != nil {
            return err
        }
    }

    return nil
}

启动勾子

启动函数Start中前后各有启动开始和结束两个勾子选项,可以在NewService或Init中将其传入:

    service.Init(
        micro.BeforeStart(func() error {
            log.Log(1)
            return nil
        }),
        micro.BeforeStart(func() error {
            log.Log(2)
            return nil
        }),
        micro.BeforeStop(func() error {
            return nil
        }),
    )

需要多个就传入多个,在渲染配置时会将它们附加到指定启动函数数组中:

func BeforeStart(fn func() error) Option {
    return func(o *Options) {
        // 有多个则会附加进去
        o.BeforeStart = append(o.BeforeStart, fn)
    }
}

服务启动

func (s *service) Start() error {

    // ...

    if err := s.opts.Server.Start(); err != nil {
        return err
    }

    // ...
}

服务启动是通过在配置项的中的Server对象的Start方法触发。

整个Start分为以下几个步骤

  1. 注册调试接口endpoint,Debug.{Method}
  2. transport 分配地址
  3. broker 分配地址并连接
  4. register 注册
  5. 侦听连接请求与关闭信号
  6. 循环保活

我们来看关键代码(删除了部分逻辑,增加注释)

func (s *rpcServer) Start() error {

    // 1. 注册调试接口endpoint,Debug.{Method}
    registerDebugHandler(s)
    config := s.Options()

    // 2. transport 分配地址
    ts, err := config.Transport.Listen(config.Address)
    if err != nil {
        return err
    }

    // 默认端口是0,意味着会随机分配,但是我们需要反填随机端口,以让请求能够到达
    s.Lock()
    addr := s.opts.Address
    s.opts.Address = ts.Addr()
    s.Unlock()

    // 3. broker 分配地址并连接
    if err := config.Broker.Connect(); err != nil {
        return err
    }

    // 4. register 注册
    if err := s.Register(); err != nil {
        log.Log("Server register error: ", err)
    }

    exit := make(chan bool)

    // 新协程侦听请求与关闭信号
    go func() {
        for {
            // listen for connections
            err := ts.Accept(s.ServeConn)

            select {
            // check if we're supposed to exit
            case <-exit:
                return
            // check the error and backoff
            default:
                if err != nil {
                    log.Logf("Accept error: %v", err)
                    time.Sleep(time.Second)
                    continue
                }
            }

            // no error just exit
            return
        }
    }()

    // 6. 循环保活
    go func() {
        t := new(time.Ticker)

        // only process if it exists
        if s.opts.RegisterInterval > time.Duration(0) {
            // new ticker
            t = time.NewTicker(s.opts.RegisterInterval)
        }

        // return error chan
        var ch chan error

    Loop:
        for {
            select {
            // 重新注册,保持节点在线
            case <-t.C:
                if err := s.Register(); err != nil {
                    log.Log("Server register error: ", err)
                }
            // 侦听退出信号
            case ch = <-s.exit:
                t.Stop()
                close(exit)
                break Loop
            }
        }

        // 退出后往注册中心卸载节点
        if err := s.Deregister(); err != nil {
            log.Log("Server deregister error: ", err)
        }

        // 等待所有请求都返回
        if wait(s.opts.Context) {
            s.wg.Wait()
        }

        // 关闭侦听
        ch <- ts.Close()

        // 关闭broker
        config.Broker.Disconnect()

        // 归还原始值,不重要,只是为了回归原始
        s.Lock()
        s.opts.Address = addr
        s.Unlock()
    }()

    return nil
}

我们在代码中注释了具体过程。但是其中非常重要的环节,比如register注册、broker连接等我们不深入讲解,会放到其实章节中演示,这也是为了让本章主题更向服务构建与启动靠拢。

总结

我们在这一章节中讲了Go-Micro服务是如何构造与启动的,下一章我们会讲解micro服务的注册是如何实现的。

Micro源码系列

  1. Go-Micro服务的构造过程
  2. [Go-Micro注册解读(in progress)]

Micro 中文资源

  1. 中文示例集
  2. 中文教程
  3. 中文博客
  4. Micro服务治理控制台
已邀请:

weisd

赞同来自:

用go-micro两年了,支持一下

要回复问题请先登录注册