Go 每日一库之 gron

简介

gron是一个比较小巧、灵活的定时任务库,可以执行定时的、周期性的任务。gron提供简洁的、并发安全的接口。我们先介绍gron库的使用,然后简单分析一下源码。

快速使用

先安装:

1
复制代码$ go get github.com/roylee0704/gron

后使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
复制代码package main

import (
"fmt"
"sync"
"time"

"github.com/roylee0704/gron"
)

func main() {
var wg sync.WaitGroup
wg.Add(1)

c := gron.New()
c.AddFunc(gron.Every(5*time.Second), func() {
fmt.Println("runs every 5 seconds.")
})
c.Start()

wg.Wait()
}

gron的使用比较简单:

  • 首先调用gron.New()创建一个管理器,这是一个定时任务的管理器;
  • 然后调用管理器的AddFunc()Add()方法向它添加任务,在启动时添加也是可以的,见下文分析;
  • 最后调用管理器的Start()方法启动它。

gron支持两种添加任务的方式,一种是使用无参数的函数,另一种是实现任务接口。上面例子中使用的是前一种方式,实现接口的方式我们后面会介绍。添加任务时通过gron.Every()指定周期任务的间隔,上面添加了一个 5s 的周期任务,每隔 5s 输出一行文字。

需要注意的是,我们使用sync.WaitGroup保证主 goroutine 不退出。因为c.Start()中只是启动了一个 goroutine,如果主 goroutine 退出了,整个程序就停止了。

运行程序,每隔 5s 输出:

1
2
3
复制代码runs every 5 seconds.
runs every 5 seconds.
runs every 5 seconds.

该程序需要按下ctrl + c停止!

时间格式

gron接受time.Duration类型的时间间隔,除了time包中定义的基础Second/Minute/Hourgron中的xtime子包还提供了Day/Week单位的时间。有一点需要注意,gron支持的时间精度为 1s,小于 1s 的间隔是不支持的。除了单位时间间隔,我们还可以使用4m10s这样的时间:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
复制代码func main() {
var wg sync.WaitGroup
wg.Add(1)

c := gron.New()
c.AddFunc(gron.Every(1*time.Second), func() {
fmt.Println("runs every second.")
})
c.AddFunc(gron.Every(1*time.Minute), func() {
fmt.Println("runs every minute.")
})
c.AddFunc(gron.Every(1*time.Hour), func() {
fmt.Println("runs every hour.")
})
c.AddFunc(gron.Every(1*xtime.Day), func() {
fmt.Println("runs every day.")
})
c.AddFunc(gron.Every(1*xtime.Week), func() {
fmt.Println("runs every week.")
})
t, _ := time.ParseDuration("4m10s")
c.AddFunc(gron.Every(t), func() {
fmt.Println("runs every 4 minutes 10 seconds.")
})
c.Start()

wg.Wait()
}

通过gron.Every()设置每隔多长时间执行一次任务。对于大于 1 天的时间间隔,我们还可以使用gron.Every().At()指定其在某个时间点执行。例如下面的程序,从第二天的22:00开始,每隔一天触发一次,即每天的22:00触发:

1
2
3
4
5
6
7
8
9
10
11
12
复制代码func main() {
var wg sync.WaitGroup
wg.Add(1)

c := gron.New()
c.AddFunc(gron.Every(1*xtime.Day).At("22:00"), func() {
fmt.Println("runs every second.")
})
c.Start()

wg.Wait()
}

自定义任务

实现自定义任务也很简单,只需要实现gron.Job接口即可:

1
2
3
4
复制代码// src/github.com/roylee0704/gron/cron.go
type Job interface {
Run()
}

我们需要调用调度器的Add()方法向管理器添加自定义任务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
复制代码type GreetingJob struct {
Name string
}

func (g GreetingJob) Run() {
fmt.Println("Hello ", g.Name)
}

func main() {
var wg sync.WaitGroup
wg.Add(1)

g1 := GreetingJob{Name: "dj"}
g2 := GreetingJob{Name: "dajun"}

c := gron.New()
c.Add(gron.Every(5*time.Second), g1)
c.Add(gron.Every(10*time.Second), g2)
c.Start()

wg.Wait()
}

上面我们编写了一个GreetingJob结构,实现gron.Job接口,然后创建两个对象g1/g2,一个 5s 触发一次,一个 10s 触发一次。使用自定义任务的方式可以比较好地处理携带状态的任务,如上面的Name字段。

实际上,AddFunc()方法内部也是通过Add()实现的:

1
2
3
4
5
6
7
8
9
10
复制代码// src/github.com/roylee0704/gron/cron.go
func (c *Cron) AddFunc(s Schedule, j func()) {
c.Add(s, JobFunc(j))
}

type JobFunc func()

func (j JobFunc) Run() {
j()
}

AddFunc()内部,将传入的函数转为JobFunc类型,而gronJobFunc实现了gron.Job接口。是不是与net/http包中的HandleFuncHandle很像。如果注意观察的话,在很多 Go 语言的代码中都有此类模式。

一点源码

gron的源码只有两个文件cron.goschedule.gocron.go中实现添加任务和调度的方法,schedule.go中是时间策略相关的代码。两个文件算上注释一共才 260 行!我们添加的任务在gron内部都是以Entry结构表示的:

1
2
3
4
5
6
复制代码type Entry struct {
Schedule Schedule
Job Job
Next time.Time
Prev time.Time
}

Next为下次执行时间,Prev为上次执行时间,Job是要执行的任务,Schedulegron.Schedule接口类型,调用其Next()可计算出下次执行的时间点。

管理器使用gron.Cron结构表示:

1
2
3
4
5
6
复制代码type Cron struct {
entries []*Entry
running bool
add chan *Entry
stop chan struct{}
}

任务的调度在另外一个 goroutine 中。如果调度未开始,添加任务可直接appendentries切片中;如果调度已开始(Start()方法已调用),需要向通道add发送待添加的任务。任务调度的核心逻辑在Run()方法中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
复制代码func (c *Cron) run() {
var effective time.Time
now := time.Now().Local()

// to figure next trig time for entries, referenced from now
for _, e := range c.entries {
e.Next = e.Schedule.Next(now)
}

for {
sort.Sort(byTime(c.entries))
if len(c.entries) > 0 {
effective = c.entries[0].Next
} else {
effective = now.AddDate(15, 0, 0) // to prevent phantom jobs.
}

select {
case now = <-after(effective.Sub(now)):
// entries with same time gets run.
for _, entry := range c.entries {
if entry.Next != effective {
break
}
entry.Prev = now
entry.Next = entry.Schedule.Next(now)
go entry.Job.Run()
}
case e := <-c.add:
e.Next = e.Schedule.Next(time.Now())
c.entries = append(c.entries, e)
case <-c.stop:
return // terminate go-routine.
}
}
}

执行流程如下:

  1. 调度器刚启动时,先计算所有任务的下次执行时间;
  2. 然后在一个for循环中,按照执行时间从早到晚排序,取出最近需要执行任务的时间点;
  3. select语句中等待到这个时间点,启动新的 goroutine 执行到期的任务,每个任务一个新的 goroutine;
  4. 如果在等待的过程中,又添加了新的任务(通过通道c.add),计算这个新任务的首次执行时间。跳到步骤 2,因为新添加的任务可能最早执行。

有几个细节需要注意一下:

  1. 任务到期判断使用的是本地时间:time.Now().Local()
  2. 如果没有任务,等待时间设置为now.AddDate(15, 0, 0),即 15 年,防止 CPU 空转;
  3. 任务都是在独立的 goroutine 中执行的;
  4. 通过实现sort.Interface接口可以实现自定义排序(代码中的byTime)。

最后,我们来看一下时间策略的代码。我们知道在Entry结构中存储了一个gron.Schedule类型的对象,调用该对象的Next()方法返回下次执行的时间点:

1
2
3
4
复制代码// src/github.com/roylee0704/gron/schedule.go
type Schedule interface {
Next(t time.Time) time.Time
}

gron内置实现了两种Schedule,一种是periodicSchedule,即周期触发gron.Every()函数返回的就是这个对象:

1
2
3
4
复制代码// src/github.com/roylee0704/gron/schedule.go
type periodicSchedule struct {
period time.Duration
}

一种是固定时刻的周期触发,它实际上也是周期触发,只是固定了时间点:

1
2
3
4
5
复制代码type atSchedule struct {
period time.Duration
hh int
mm int
}

他们的核心逻辑在Next()方法中,periodicSchedule只需要用当前时间加上周期即可得到下次触发时间。这里Truncate()方法截掉了当前时间中小于 1s 的部分:

1
2
3
复制代码func (ps periodicSchedule) Next(t time.Time) time.Time {
return t.Truncate(time.Second).Add(ps.period)
}

atScheduleNext()方法先计算当天该时间点,再加上周期就是下次触发的时间:

1
2
3
4
5
6
7
8
9
10
11
复制代码func (as atSchedule) reset(t time.Time) time.Time {
return time.Date(t.Year(), t.Month(), t.Day(), as.hh, as.mm, 0, 0, time.UTC)
}

func (as atSchedule) Next(t time.Time) time.Time {
next := as.reset(t)
if t.After(next) {
return next.Add(as.period)
}
return next
}

periodicSchedule提供了At()方法可以转为atSchedule

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
复制代码func (ps periodicSchedule) At(t string) Schedule {
if ps.period < xtime.Day {
panic("period must be at least in days")
}

// parse t naively
h, m, err := parse(t)

if err != nil {
panic(err.Error())
}

return &atSchedule{
period: ps.period,
hh: h,
mm: m,
}
}

自定义时间策略

我们可以很轻松的实现一个自定义的时间策略。例如,我们要实现一个“指数退避”的时间序列,先等待 1s,然后 2s、4s…

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
复制代码type ExponentialBackOffSchedule struct {
last int
}

func (e *ExponentialBackOffSchedule) Next(t time.Time) time.Time {
interval := time.Duration(math.Pow(2.0, float64(e.last))) * time.Second
e.last += 1
return t.Truncate(time.Second).Add(interval)
}

func main() {
var wg sync.WaitGroup
wg.Add(1)

c := gron.New()
c.AddFunc(&ExponentialBackOffSchedule{}, func() {
fmt.Println(time.Now().Local().Format("2006-01-02 15:04:05"), "hello")
})
c.Start()

wg.Wait()
}

运行结果如下:

1
2
3
4
复制代码2020-04-20 23:47:11 hello
2020-04-20 23:47:13 hello
2020-04-20 23:47:17 hello
2020-04-20 23:47:25 hello

第二次输出与第一次相差 2s,第三次与第二次相差 4s,第4次与第三次相差 8s,完美!

总结

本文介绍了gron这个小巧的定时任务库,如何使用,如何自定义任务和时间策略,顺带分析了一下源码。gron源码实现非常简洁,非常推荐阅读!

大家如果发现好玩、好用的 Go 语言库,欢迎到 Go 每日一库 GitHub 上提交 issue😄

参考

  1. gron GitHub:github.com/roylee0704/…
  2. Go 每日一库 GitHub:github.com/darjun/go-d…

我的博客:darjun.github.io

欢迎关注我的微信公众号【GoUpUp】,共同学习,一起进步~

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%