前面介绍协程时,对协程的管理和控制我们并没有进行讨论。到目前我们已经清楚认识了channel、context以及sync包,通过这三者,我们完全可以达到完美控制协程运行的目的。
通过go关键字让我们很容易启动一个协程,但难的是很好的管理和控制他们的运行。有几种方法我们可以根据场景使用:
- 使用sync.WaitGroup,它用于线程总同步,会等待一组线程集合完成,才会继续向下执行,这对监控所有子协程全部完成情况特别有用,但要控制某个协程就无能为力了;
- 使用channel来传递消息,一个协程来发送channel信号,另一个协程通过select来得到channel信息,这种方式可以满足协程之间的通信,来控制协程运行。但如果协程数量达到一定程度,就很难把控了;或者这两个协程还和其他协程也有类似通信,比如A与B,B与C,如果A发信号B退出了,C有可能等不到B的channel信号而被遗忘;
- 使用Context来传递消息,Context是层层传递机制,根节点完全控制了子节点,根节点(父节点)可以根据需要选择自动还是手动结束子节点。而每层节点所在的协程就可以根据信息来决定下一步的操作。
**下面我们来看看具体使用Context怎么来控制协程的运行:**这里用Context同时控制2个协程,这2个协程都可以收到cancel()发出的信号,甚至doNothing这样不结束协程可反复接收cancel信息。
package main
import (
"context"
"log"
"os"
"time"
)
var logs *log.Logger
func doClearn(ctx context.Context) {
// for 循环来每1秒work一下,判断ctx是否被取消了,如果是就退出
for {
time.Sleep(1 * time.Second)
select {
case <-ctx.Done():
logs.Println("doClearn:收到Cancel,做好收尾工作后马上退出。")
return
default:
logs.Println("doClearn:每隔1秒观察信号,继续观察...")
}
}
}
func doNothing(ctx context.Context) {
for {
time.Sleep(3 * time.Second)
select {
case <-ctx.Done():
logs.Println("doNothing:收到Cancel,但不退出......")
// 注释return可以观察到,ctx.Done()信号是可以一直接收到的,return不注释意味退出协程
//return
default:
logs.Println("doNothing:每隔3秒观察信号,一直运行")
}
}
}
func main() {
logs = log.New(os.Stdout, "", log.Ltime)
// 新建一个ctx
ctx, cancel := context.WithCancel(context.Background())
// 传递ctx
go doClearn(ctx)
go doNothing(ctx)
// 主程序阻塞20秒,留给协程来演示
time.Sleep(20 * time.Second)
logs.Println("cancel")
// 调用cancel:context.WithCancel 返回的CancelFunc
cancel()
// 发出cancel 命令后,主程序阻塞10秒,再看协程的运行情况
time.Sleep(10 * time.Second)
}
程序输出:
......
cancel
doClearn:收到Cancel,做好收尾工作后马上退出。
doNothing:收到Cancel,但不退出......
doNothing:收到Cancel,但不退出......
doNothing:收到Cancel,但不退出......
这里用Context嵌套控制3个协程,A,B,C。在主程序发出cancel信号后,每个协程都能接收根Context的Done()信号而退出。
package main
import (
"context"
"fmt"
"time"
)
func A(ctx context.Context) int {
ctx = context.WithValue(ctx, "AFunction", "Great")
go B(ctx)
select {
// 监测自己上层的ctx ...
case <-ctx.Done():
fmt.Println("A Done")
return -1
}
return 1
}
func B(ctx context.Context) int {
fmt.Println("A value in B:", ctx.Value("AFunction"))
ctx = context.WithValue(ctx, "BFunction", 999)
go C(ctx)
select {
// 监测自己上层的ctx ...
case <-ctx.Done():
fmt.Println("B Done")
return -2
}
return 2
}
func C(ctx context.Context) int {
fmt.Println("B value in C:", ctx.Value("AFunction"))
fmt.Println("B value in C:", ctx.Value("BFunction"))
select {
// 结束时候做点什么 ...
case <-ctx.Done():
fmt.Println("C Done")
return -3
}
return 3
}
func main() {
// 自动取消(定时取消)
{
timeout := 10 * time.Second
ctx, _ := context.WithTimeout(context.Background(), timeout)
fmt.Println("A 执行完成,返回:", A(ctx))
select {
case <-ctx.Done():
fmt.Println("context Done")
break
}
}
time.Sleep(20 * time.Second)
}
最后我们看看Context在http 是怎么传递的:
package main
import (
"context"
"net/http"
"time"
)
// ContextMiddle是http服务中间件,统一读取通行cookie并使用ctx传递
func ContextMiddle(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
cookie, _ := r.Cookie("Check")
if cookie != nil {
ctx := context.WithValue(r.Context(), "Check", cookie.Value)
next.ServeHTTP(w, r.WithContext(ctx))
} else {
next.ServeHTTP(w, r)
}
})
}
// 强制设置通行cookie
func CheckHandler(w http.ResponseWriter, r *http.Request) {
expitation := time.Now().Add(24 * time.Hour)
cookie := http.Cookie{Name: "Check", Value: "42", Expires: expitation}
http.SetCookie(w, &cookie)
}
func indexHandler(w http.ResponseWriter, r *http.Request) {
// 通过取中间件传过来的context值来判断是否放行通过
if chk := r.Context().Value("Check"); chk == "42" {
w.WriteHeader(http.StatusOK)
w.Write([]byte("Let's go! \n"))
} else {
w.WriteHeader(http.StatusNotFound)
w.Write([]byte("No Pass!"))
}
}
func main() {
mux := http.NewServeMux()
mux.HandleFunc("/", indexHandler)
// 人为设置通行cookie
mux.HandleFunc("/chk", CheckHandler)
ctxMux := ContextMiddle(mux)
http.ListenAndServe(":8080", ctxMux)
}
我们打开浏览器访问:http://localhost:8080/chk ,然后再访问:http://localhost:8080/ ,将会看到我们正常通行后结果,否则将会看到没有正常通行下的信息。Context信息的传递主要靠中间件ContextMiddle来进行。
下一节:序列化 (Serialization)是将对象的状态信息转换为可以存储或传输的形式的过程。