2. pkg/context

在 Go http包的Server中,每一个请求在都有一个对应的 goroutine 去处理。请求处理函数通常会启动额外的 goroutine 用来访问后端服务,比如数据库和RPC服务。用来处理一个请求的 goroutine 通常需要访问一些与请求特定的数据,比如终端用户的身份认证信息、验证相关的token、请求的截止时间。 当一个请求被取消或超时时,所有用来处理该请求的 goroutine 都应该迅速退出,然后系统才能释放这些 goroutine 占用的资源。

2.1. context 初识

Go1.7加入了一个新的标准库context,它定义了Context类型,专门用来简化 对于处理单个请求的多个 goroutine 之间与请求域的数据、取消信号、截止时间等相关操作,这些操作可能涉及多个 API 调用。

对服务器传入的请求应创建上下文,而对服务器的传出调用应该接受上下文。它们之间的函数调用链必须传递上下文,或者可以使用WithCancelWithDeadlineWithTimeoutWithValue创建的派生上下文。当一个上下文被取消时,它派生的所有上下文也被取消。

2.2. context 接口

context.Context是一个接口,该接口定义了四个需要实现的方法。具体签名如下:

type Context interface {
    Deadline() (deadline time.Time, ok bool)
    Done() <-chan struct{}
    Err() error
    Value(key interface{}) interface{}
}

其中:

  • Deadline方法需要返回当前Context被取消的时间,也就是完成工作的截止时间(deadline);

  • Done方法需要返回一个Channel,这个Channel会在当前工作完成或者上下文被取消之后关闭,多次调用Done方法会返回同一个Channel;

  • Err方法会返回当前 Context 结束的原因,它只会在 Done 返回的Channel被关闭时才会返回非空的值;

    • 如果当前Context被取消就会返回Canceled错误;

    • 如果当前Context超时就会返回DeadlineExceeded错误;

  • Value方法会从Context中返回键对应的值,对于同一个上下文来说,多次调用Value 并传入相同的Key会返回相同的结果,该方法仅用于传递跨API和进程间跟请求域的数据;

Background()TODO()

Go内置两个函数:Background()TODO(),这两个函数分别返回一个实现了Context接口的backgroundtodo。我们代码中最开始都是以这两个内置的上下文对象作为最顶层的partent context,衍生出更多的子上下文对象。

  • Background()主要用于main函数、初始化以及测试代码中,作为Context这个树结构的最顶层的Context,也就是根Context。

  • TODO(),它目前还不知道具体的使用场景,如果我们不知道该使用什么Context的时候,可以使用这个。

backgroundtodo本质上都是emptyCtx结构体类型,是一个不可取消,没有设置截止时间,没有携带任何值的Context。

为什么需要context

WaitGroup 和信道(channel)是常见的 2 种并发控制的方式。

如果并发启动了多个子协程,需要等待所有的子协程完成任务,WaitGroup 非常适合于这类场景,例如下面的例子:

var wg sync.WaitGroup

func doTask(n int) {
	time.Sleep(time.Duration(n))
	fmt.Printf("Task %d Done\n", n)
	wg.Done()
}

func main() {
	for i := 0; i < 3; i++ {
		wg.Add(1)
		go doTask(i + 1)
	}
	wg.Wait()
	fmt.Println("All Task Done")
}

wg.Wait() 会等待所有的子协程任务全部完成,所有子协程结束后,才会执行 wg.Wait() 后面的代码。

Task 3 Done
Task 1 Done
Task 2 Done
All Task Done

WaitGroup 只是傻傻地等待子协程结束,但是并不能主动通知子协程退出。假如开启了一个定时轮询的子协程,有没有什么办法,通知该子协程退出呢?这种场景下,可以使用 select+chan 的机制。

var stop chan bool

func reqTask(name string) {
	for {
		select {
		case <-stop:
			fmt.Println("stop", name)
			return
		default:
			fmt.Println(name, "send request")
			time.Sleep(1 * time.Second)
		}
	}
}

func main() {
	stop = make(chan bool)
	go reqTask("worker1")
	time.Sleep(3 * time.Second)
	stop <- true
	time.Sleep(3 * time.Second)
}

子协程使用 for 循环定时轮询,如果 stop 信道有值,则退出,否则继续轮询。

worker1 send request
worker1 send request
worker1 send request
stop worker1

更复杂的场景如何做并发控制呢?比如子协程中开启了新的子协程,或者需要同时控制多个子协程。这种场景下,select+chan的方式就显得力不从心了。

Go 语言提供了 Context 标准库可以解决这类场景的问题,Context 的作用和它的名字很像,上下文,即子协程的下上文。Context 有两个主要的功能:

  • 通知子协程退出(正常退出,超时退出等);

  • 传递必要的参数。

2.3. with 系列函数

此外,context包中还定义了四个With系列函数。

1. withCancel

context.WithCancel() 创建可取消的 Context 对象,即可以主动通知子协程退出。

WithCancel的函数签名如下:

func WithCancel(parent Context) (ctx Context, cancel CancelFunc)

(1)控制单个协程

使用 Context 改写上述的例子,效果与 select+chan 相同。

func reqTask(ctx context.Context, name string) {
	for {
		select {
		case <-ctx.Done():
			fmt.Println("stop", name)
			return
		default:
			fmt.Println(name, "send request")
			time.Sleep(1 * time.Second)
		}
	}
}

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	go reqTask(ctx, "worker1")
	time.Sleep(3 * time.Second)
	cancel()
	time.Sleep(3 * time.Second)
}

说明:

  • context.Backgroud() 创建根 Context,通常在 main 函数、初始化和测试代码中创建,作为顶层 Context。

  • context.WithCancel(parent) 创建可取消的子 Context,同时返回函数 cancel

  • 在子协程中,使用 select 调用 <-ctx.Done() 判断是否需要退出。

  • 主协程中,调用 cancel() 函数通知子协程退出。

(2)控制多个协程

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	
	go reqTask(ctx, "worker1")
	go reqTask(ctx, "worker2")
	
	time.Sleep(3 * time.Second)
	cancel()
	time.Sleep(3 * time.Second)
}

为每个子协程传递相同的上下文 ctx 即可,调用 cancel() 函数后该 Context 控制的所有子协程都会退出。

worker1 send request
worker2 send request
worker1 send request
worker2 send request
worker1 send request
worker2 send request
stop worker1
stop worker2

2. withDeadline

超时退出可以控制子协程的最长执行时间,那 context.WithDeadline() 则可以控制子协程的最迟退出时间

WithDeadline的函数签名如下:

func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc)

返回父上下文的副本,并将deadline调整为不迟于d。如果父上下文的deadline已经早于d,则WithDeadline(parent, d)在语义上等同于父上下文。当截止日过期时,当调用返回的cancel函数时,或者当父上下文的Done通道关闭时,返回上下文的Done通道将被关闭,以最先发生的情况为准。

取消此上下文将释放与其关联的资源,因此代码应该在此上下文中运行的操作完成后立即调用cancel。

func reqTask(ctx context.Context, name string) {
	for {
		select {
		case <-ctx.Done():
			fmt.Println("stop", name, ctx.Err())
			return
		default:
			fmt.Println(name, "send request")
			time.Sleep(1 * time.Second)
		}
	}
}

func main() {
	ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(1*time.Second))
	go reqTask(ctx, "worker1")
	go reqTask(ctx, "worker2")

	time.Sleep(3 * time.Second)
	fmt.Println("before cancel")
	cancel()
	time.Sleep(3 * time.Second)
}
  • WithDeadline 用于设置截止时间。在这个例子中,将截止时间设置为1s后,cancel() 函数在 3s 后调用,因此子协程将在调用 cancel() 函数前结束。

  • 在子协程中,可以通过 ctx.Err() 获取到子协程退出的错误原因。

运行结果如下:

worker2 send request
worker1 send request
stop worker2 context deadline exceeded
stop worker1 context deadline exceeded
before cancel

可以看到,子协程 worker1worker2 均是因为截止时间到了而退出。

3. withTimeout

如果需要控制子协程的执行时间,可以使用 context.WithTimeout 创建具有超时通知机制的 Context 对象。

func main() {
	ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
	go reqTask(ctx, "worker1")
	go reqTask(ctx, "worker2")

	time.Sleep(3 * time.Second)
	fmt.Println("before cancel")
	cancel()
	time.Sleep(3 * time.Second)
}

WithTimeout的函数签名如下:

func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)

WithTimeout返回WithDeadline(parent, time.Now().Add(timeout))

取消此上下文将释放与其相关的资源,因此代码应该在此上下文中运行的操作完成后立即调用cancel,通常用于数据库或者网络连接的超时控制。具体示例如下:

package main

import (
	"context"
	"fmt"
	"sync"

	"time"
)

// context.WithTimeout

var wg sync.WaitGroup

func worker(ctx context.Context) {
LOOP:
	for {
		fmt.Println("db connecting ...")
		time.Sleep(time.Millisecond * 10) // 假设正常连接数据库耗时10毫秒
		select {
		case <-ctx.Done(): // 50毫秒后自动调用
			break LOOP
		default:
		}
	}
	fmt.Println("worker done!")
	wg.Done()
}

func main() {
	// 设置一个50毫秒的超时
	ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*50)
	wg.Add(1)
	go worker(ctx)
	time.Sleep(time.Second * 5)
	cancel() // 通知子goroutine结束
	wg.Wait()
	fmt.Println("over")
}

4. withValue

如果需要往子协程中传递参数,可以使用 context.WithValue()

WithValue函数能够将请求作用域的数据与 Context 对象建立关系。声明如下:

func WithValue(parent Context, key, val interface{}) Context

WithValue返回父节点的副本,其中与key关联的值为val。

type Options struct{
    Interval time.Duration
}

func reqTask(ctx context.Context, name string) {
	for {
		select {
		case <-ctx.Done():
			fmt.Println("stop", name)
			return
		default:
			fmt.Println(name, "send request")
			op := ctx.Value("options").(*Options)
			time.Sleep(op.Interval * time.Second)
		}
	}
}

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	vCtx := context.WithValue(ctx, "options", &Options{1})

	go reqTask(vCtx, "worker1")
	go reqTask(vCtx, "worker2")

	time.Sleep(3 * time.Second)
	cancel()
	time.Sleep(3 * time.Second)
}
  • context.WithValue() 创建了一个基于 ctx 的子 Context,并携带了值 options

  • 在子协程中,使用 ctx.Value("options") 获取到传递的值,读取/修改该值。

2.4. context 注意事项

  • 推荐以参数的方式显示传递Context

  • 以Context作为参数的函数方法,应该把Context作为第一个参数。

  • 给一个函数方法传递Context的时候,不要传递nil,如果不知道传递什么,就使用context.TODO()

  • Context的Value相关方法应该传递请求域的必要数据,不应该用于传递可选参数

  • Context是线程安全的,可以放心的在多个goroutine中传递

2.5. 客户端超时取消case

调用服务端API时如何在客户端实现超时控制?

1. server 端

// context_timeout/server/main.go
package main

import (
	"fmt"
	"math/rand"
	"net/http"

	"time"
)

// server端,随机出现慢响应

func indexHandler(w http.ResponseWriter, r *http.Request) {
	number := rand.Intn(2)
	if number == 0 {
		time.Sleep(time.Second * 10) // 耗时10秒的慢响应
		fmt.Fprintf(w, "slow response")
		return
	}
	fmt.Fprint(w, "quick response")
}

func main() {
	http.HandleFunc("/", indexHandler)
	err := http.ListenAndServe(":8000", nil)
	if err != nil {
		panic(err)
	}
}

2. client 端

// context_timeout/client/main.go
package main

import (
	"context"
	"fmt"
	"io/ioutil"
	"net/http"
	"sync"
	"time"
)

// 客户端

type respData struct {
	resp *http.Response
	err  error
}

func doCall(ctx context.Context) {
	transport := http.Transport{
	   // 请求频繁可定义全局的client对象并启用长链接
	   // 请求不频繁使用短链接
	   DisableKeepAlives: true, 	}
	client := http.Client{
		Transport: &transport,
	}

	respChan := make(chan *respData, 1)
	req, err := http.NewRequest("GET", "http://127.0.0.1:8000/", nil)
	if err != nil {
		fmt.Printf("new requestg failed, err:%v\n", err)
		return
	}
	req = req.WithContext(ctx) // 使用带超时的ctx创建一个新的client request
	var wg sync.WaitGroup
	wg.Add(1)
	defer wg.Wait()
	go func() {
		resp, err := client.Do(req)
		fmt.Printf("client.do resp:%v, err:%v\n", resp, err)
		rd := &respData{
			resp: resp,
			err:  err,
		}
		respChan <- rd
		wg.Done()
	}()

	select {
	case <-ctx.Done():
		//transport.CancelRequest(req)
		fmt.Println("call api timeout")
	case result := <-respChan:
		fmt.Println("call server api success")
		if result.err != nil {
			fmt.Printf("call server api failed, err:%v\n", result.err)
			return
		}
		defer result.resp.Body.Close()
		data, _ := ioutil.ReadAll(result.resp.Body)
		fmt.Printf("resp:%v\n", string(data))
	}
}

func main() {
	// 定义一个100毫秒的超时
	ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*100)
	defer cancel() // 调用cancel释放子goroutine资源
	doCall(ctx)
}

相关资料

  1. 出处:https://golang.google.cn/pkg/context/