新手问题 kubernetes 依赖库apimachinery中的 wait 库功能(1)

jemygraw · 2018年09月06日 · 687 次阅读

该库提供了很多基于周期性执行的方法,以及约束周期性执行的方法。

周期性执行一个函数

在某些情况下,我们需要周期性地执行一些动作,比如发送心跳请求给 master,那么可以使用 wait 库中的 Forever 功能。 这里给一个简单的例子,每隔一秒钟输出当前的时间。

package main

import (
    "fmt"
    "time"

    "k8s.io/apimachinery/pkg/util/wait"
)

func main() {
    wait.Forever(func() {
        fmt.Println(time.Now().String())
    }, time.Second)
}

带 StopSignal 的周期性执行函数

上面的 Wait 函数其实是 Util 的变体,Util 本身还带有一个 stopSignal 选项。比如我们要删除一个 CDN 资源,然后删除之后周期性地检查文件是否还可以访问。可以用下面的逻辑。我们这里用 counter 来代替检查资源状态的判断逻辑。

package main

import (
    "fmt"
    "time"

    "k8s.io/apimachinery/pkg/util/wait"
)

var stopSignal = make(chan struct{})

func main() {
    var counter = 1
    wait.Until(func() {
        if counter > 10 {
            close(stopSignal)
        }
        fmt.Println(time.Now().String())
        counter++
    }, time.Second, stopSignal)

}

sync.WaitGroup 的封装及扩展

最简单的是对 WaitGroup 的简单封装

package main

import (
    "fmt"

    "k8s.io/apimachinery/pkg/util/wait"
)

func main() {
    g := wait.Group{}
    for i := 0; i < 100; i++ {
        j := i
        g.Start(func() {
            fmt.Println(j)
        })
    }
    g.Wait()
}

我们再假设一个场景,老大说大家去抓网页,差不多抓满 1000 个网页就结束。这个时候大家并发去抓,想要同步是比较困难的,另外什么时候通知大家结束也比较麻烦。这里,我们可以用下面的这样的框架代码。

package main

import (
    "fmt"
    "time"

    "sync/atomic"

    "k8s.io/apimachinery/pkg/util/wait"
)

var stopSignal = make(chan struct{})

func main() {
    g := wait.Group{}
    var counter int32
    for i := 0; i < 100; i++ {
        j := i
        g.StartWithChannel(stopSignal, func(stopCh <-chan struct{}) {
            for {
                //quit if
                if atomic.LoadInt32(&counter) > 1000 {
                    return
                }
                //otherwise
                select {
                case <-stopSignal:
                    return
                default:
                    fmt.Println(j, time.Now().String())
                    atomic.AddInt32(&counter, 1)
                    <-time.After(time.Second)
                }
            }
        })
    }
    g.Wait()
}

刚刚的场景还可以使用StartWithContext方法来实现。

package main

import (
    "context"
    "fmt"
    "time"

    "sync/atomic"

    "k8s.io/apimachinery/pkg/util/wait"
)

func main() {
    g := wait.Group{}
    var counter int32
    ctx, cancelFunc := context.WithCancel(context.Background())
    for i := 0; i < 100; i++ {
        j := i
        g.StartWithContext(ctx, func(ctx context.Context) {
            for {
                //quit if
                if atomic.LoadInt32(&counter) > 1000 {
                    cancelFunc() //fire cancel signal
                }
                //otherwise
                select {
                case <-ctx.Done(): //cancel signal received
                    return
                default:
                    fmt.Println(j, time.Now().String())
                    atomic.AddInt32(&counter, 1)
                    <-time.After(time.Second)
                }
            }
        })
    }
    g.Wait()
}
更多原创文章干货分享,请关注公众号
  • 加微信实战群请加微信(注明:实战群):gocnio
暂无回复。
需要 登录 后方可回复, 如果你还没有账号请点击这里 注册