编程知识 cdmana.com

Detailed explanation of waitgroup source code design in go

Go The language provides a synergy goroutine It makes it easy to write multithreaded programs , however , How to make these execute concurrently goroutine Be effectively controlled , This is what we need to discuss . Like a vegetable knife 《Golang Concurrency control 》 Described in ,Go In the synchronization primitives provided by the standard library , Locks and atomic operations focus on Control goroutine Data security between ,WaitGroup、channel And Context What controls is their concurrent behavior . About locks 、 Atomic manipulation 、channel The implementation principle of small dish knife has been analyzed in detail . Therefore, this paper , We will focus on WaitGroup On .

First time to know WaitGroup

WaitGroup yes sync Content under package , Used to control synchronization between processes .WaitGroup The usage scenario has the same meaning as the name , When we need to wait for a set of processes to be completed , To do subsequent processing , You can consider using it .

func main() {
	var wg sync.WaitGroup

	wg.Add(2) //worker number 2

	go func() {
		// worker 1 do something
		fmt.Println("goroutine 1 done!")
		wg.Done()
	}()

	go func() {
		// worker 2 do something
		fmt.Println("goroutine 2 done!")
		wg.Done()
	}()

	wg.Wait() // wait all waiter done
	fmt.Println("all work done!")
}

// output
goroutine 2 done!
goroutine 1 done!
all work done!
 Copy code 

You can see WaitGroup Is very simple to use , It offers three ways . although goroutine There is no father son relationship , But for the convenience of understanding , This article will call Wait Functional goroutine Call the main goroutine, call Done Functional goroutine It's called Zi goroutine.

func (wg *WaitGroup) Add(delta int) //  increase WaitGroup The son of goroutine Count value 
func (wg *WaitGroup) Done() //  space in between goroutine Task to complete , Subtract... From the count value 1
func (wg *WaitGroup) Wait() //  Blocking calls to this method goroutine, Until the count value is 0
 Copy code 

So how does it come true ? In source src/sync/waitgroup.go in , We can see that its core source code is only 100 Can't do it , Very refined , It's worth learning .

Pre knowledge

Less code , Doesn't mean it's easy to implement , Easy to understand . contrary , If the reader does not have the pre knowledge in the following , Want to really understand WaitGroup The implementation of will be more laborious . Before parsing the source code , Let's go through this knowledge first ( If you have mastered , Then you can jump directly to the source code analysis part later ).

Semaphore

When learning the operating system , We know that semaphores are a mechanism to protect shared resources , Used to solve the problem of multithreading synchronization . Semaphore s Is a global variable with a nonnegative integer value , Can only be handled by two special operations , These two operations are called P and V.

  • P(s): If s It's not zero , that P take s reduce 1, And return immediately . If s zero , Then suspend the thread , until s Becomes non-zero , Wait until another execution V(s) The thread of the operation wakes up the thread . After waking up ,P Operation will s reduce 1, And return control to the caller .
  • V(s)V Operation will s Add 1. If any thread is blocking P Operation waiting s Becomes non-zero , that V The operation wakes up one of these threads , Then the thread will s reduce 1, Finish it P operation .

stay Go In the underlying semaphore function

  • runtime_Semacquire(s *uint32) Function will block goroutine Until the semaphore s The value is greater than 0, Then atomically subtract this value , namely P operation .
  • runtime_Semrelease(s *uint32, lifo bool, skipframes int) The atomicity of the function increases the value of the semaphore , Then the notice is runtime_Semacquire Obstructed goroutine, namely V operation .

These two semaphore functions are more than WaitGroup Will be used in , stay 《Go Smart mutex design 》 In the article , We found that Go When designing mutex locks, semaphores are indispensable .

Memory alignment

For the following structures , Can you answer how much memory it takes

type Ins struct {
	x bool  // 1 Bytes 
	y int32 // 4 Bytes 
	z byte  // 1 Bytes 
}

func main() {
	ins := Ins{}
	fmt.Printf("ins size: %d, align: %d\n", unsafe.Sizeof(ins), unsafe.Alignof(ins))
}

//output
ins size: 12, align: 4
 Copy code 

In terms of the size of the fields in the structure ,ins The memory occupied by the object should be 1+4+1=6 Bytes , But it's true 12 Bytes , This is caused by memory alignment . from 《CPU Cache system vs Go The impact of the process 》 In the article , We know CPU Memory read is not read byte by byte , But piece by piece . therefore , When the values of the type are aligned in memory , The loading or writing of the computer will be very efficient .

In aggregate type ( Structure or array ) The length of its memory may be greater than the sum of its elements . The compiler adds unused memory addresses to fill memory gaps , To ensure that successive members or elements are aligned with the starting address of the structure or array .

1.png

therefore , When we design structures , When the types of structure members are different , Defining members of the same type in adjacent locations can save more memory space .

Atomic manipulation CAS

CAS It's a kind of atomic operation , Can be used in multithreaded programming to achieve uninterrupted data exchange operations , thus Avoid the data inconsistency caused by the uncertainty of execution sequence and the unpredictability of interrupt when multiple threads rewrite a certain data at the same time . This operation compares an in-memory value with the specified data , Replace the data in memory with a new value when the value is the same . About Go The underlying implementation of atomic operations in , The side dish knife is 《 The cornerstone of synchronous primitives 》 This article is described in detail .

Shift operation >> And <<

In the previous article on locks 《Go Smart mutex design 》 And 《Go More fine-grained read-write lock design 》, We can see a lot of bit operations . Flexible bit operation , It can make an ordinary number change into rich meaning , Here we will only introduce the shift operation that will be used in the following .

For left shift operation <<, Move all numbers to the left by the corresponding number of digits in binary form , High abandonment , Zero filling of low position space . Without overflowing the numbers , Moving one bit to the left is equivalent to multiplying by 2 Of 1 Power , Move left n Bit is equal to times 2 Of n Power .

For right shift operation >>, Move all numbers in binary form to the right by the corresponding number of digits , Move out low , The sign of the high position . To shift one bit to the right is to divide 2, Move right n Bits are equal to dividing by 2 Of n Power . This is to take business , Don't leave the rest .

Shift operations can also have very clever operations , Later we will see the advanced application of shift operation .

unsafa.Pointer The pointer and uintptr

Go Pointers in can be divided into three categories :1. Common type pointer *T, for example *int;2. unsafe.Pointer The pointer ;3. uintptr.

  • *T: Common pointer types , Used to deliver object addresses , Pointer calculation is not possible .
  • unsafe.Pointer The pointer : Universal pointer , Any ordinary type of pointer *T Can be converted into unsafe.Pointer The pointer , and unsafe.Pointer Pointers of type can also be converted back to normal pointers , And it can be different from the original pointer type *T identical . But it can't do pointer calculation , Can't read the value in memory ( You must convert to a normal pointer of a specific type ).
  • uintptr: To be exact ,uintptr It's not a pointer , It is an unsigned integer of ambiguous size .unsafe.Pointer The type can be the same as uinptr transformation , because uinptr Type holds the value of the address pointed to by the pointer , Therefore, pointer operation can be carried out through this value .GC when , Will not uintptr As a pointer ,uintptr The type target will be recycled .

2.png

unsafe.Pointer It's a bridge , You can convert any type of ordinary pointers to each other , You can also convert any type of pointer to uintptr Pointer operation . however ,unsafe.Pointer And any type of pointer allows us to write any value to memory , It's damaging Go The original type system , At the same time, not all values are legal memory addresses , from uintptr To unsafe.Pointer Conversions also break the type system . therefore , since Go Define the package as unsafe, Then you shouldn't use it casually .

The source code parsing

This article is based on Go Source code 1.15.7 edition

Structure

sync.WaitGroup The structure of is defined as follows , It includes a noCopy Auxiliary fields for , And a compound meaning state1 Field .

type WaitGroup struct {
	noCopy noCopy

	// 64-bit value: high 32 bits are counter, low 32 bits are waiter count.
	// 64-bit atomic operations require 64-bit alignment, but 32-bit
	// compilers do not ensure it. So we allocate 12 bytes and then use
	// the aligned 8 bytes in them as state, and the other 4 as storage
	// for the sema.
	state1 [3]uint32
}

// state returns pointers to the state and sema fields stored within wg.state1.
func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {
  // 64 Bit compiler address can be 8 to be divisible by , It can be judged whether it is 64 Bit alignment 
	if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
		return (*uint64)(unsafe.Pointer(&wg.state1)), &wg.state1[2]
	} else {
		return (*uint64)(unsafe.Pointer(&wg.state1[1])), &wg.state1[0]
	}
}
 Copy code 

among ,noCopy The field is an empty structure , It doesn't take up memory , The compiler does not fill it with bytes . It is mainly to pass go vet Tools to do static compilation checks , Prevent developers from using WaitGroup It was copied in the process , Resulting in potential safety hazards . About this part , You can read 《no copy Mechanism 》 Learn more about .

state1 Field is a field of length 3 Of uint32 Array . It is used to represent three parts :1. adopt Add() Set the child goroutine The count of counter;2. adopt Wait() Stuck in a jam waiter Count ;3. Semaphore semap.

Because the follow-up is right uint64 Type of statep To operate , and 64 The atomic operation of bit integers requires 64 Bit alignment ,32 Bit compilers do not guarantee this . therefore , stay 64 Bit and 32 In a bit environment ,state1 The composition and meaning of fields are different .

3.png

It should be noted that , When we initialize a WaitGroup Object time , Its counter value 、waiter value 、semap Values are 0.

Add function

Add() The input parameter of the function is an integer , It can be positive or negative , It's right counter Change of value . If counter The value changes to 0, So all the blocks are Wait() Functional waiter Will be awakened ; If counter The value is negative , Will cause panic.

We remove the code of race detection ,Add() The implementation source code of the function is as follows

func (wg *WaitGroup) Add(delta int) {
  //  Get contains counter And waiter The composite state of statep, Indicates the value of the semaphore semap
	statep, semap := wg.state()
	state := atomic.AddUint64(statep, uint64(delta)<<32)
	v := int32(state >> 32)
	w := uint32(state)
  
	if v < 0 {
		panic("sync: negative WaitGroup counter")
	}

	if w != 0 && delta > 0 && v == int32(delta) {
		panic("sync: WaitGroup misuse: Add called concurrently with Wait")
	}
  
	if v > 0 || w == 0 {
		return
	}

	if *statep != state {
		panic("sync: WaitGroup misuse: Add called concurrently with Wait")
	}
  
  //  If this is done , It must be  counter=0,waiter>0
  //  Can do this , It must have been carried out Add(-x) Of goroutine
  //  Its implementation , Represents all children goroutine The task has been completed 
  //  therefore , We need to return all the composite states to 0, And release it waiter The number of semaphores 
	*statep = 0
	for ; w != 0; w-- {
    //  Release semaphore , One execution will wake up a blocked waiter
		runtime_Semrelease(semap, false, 0)
	}
}
 Copy code 

The code is very compact , Next, we will analyze the key parts .

 	state := atomic.AddUint64(statep, uint64(delta)<<32)  //  newly added counter The number delta
	v := int32(state >> 32)   //  obtain counter value 
	w := uint32(state)        //  obtain waiter value 
 Copy code 

At this time statep It's a uint64 The number , If at this time statep It contains counter The number of 2,waiter by 1, Input delta by 1, The logical process of these three lines of code is shown in the figure below .

4.png

Get the current counter Count v And waiter Count w after , Will judge their values , In several cases .

	//  situation 1: This is a very low-level mistake ,counter Value cannot be negative 
  if v < 0 {
		panic("sync: negative WaitGroup counter")
	}

  //  situation 2:misuse cause panic 
  //  because wg In fact, it can be reused , But the next reuse is based on the need to reset all States to 0 Can only be 
	if w != 0 && delta > 0 && v == int32(delta) {
		panic("sync: WaitGroup misuse: Add called concurrently with Wait")
	}
  
  //  situation 3: This time Add The operation is only responsible for adding counter value , Just go back .
  //  If at this time counter Greater than 0, The wake-up operation is left to the later Add caller ( perform Add(negative int))
  //  If waiter The value is 0, Represents that there is no blocking at this time waiter
	if v > 0 || w == 0 {
		return
	}

  //  situation 4: misuse Caused by the panic
	if *statep != state {
		panic("sync: WaitGroup misuse: Add called concurrently with Wait")
	}
 Copy code 

About misuse and reused trigger panic The situation of , If there is no example error code , It's actually hard to explain . The good news is that , stay Go The error use demonstration is given in the source code , These examples are located in src/sync/waitgroup_test.go Under the document , Readers who want to learn more can take a look at the examples in the following three test functions .

func TestWaitGroupMisuse(t *testing.T)
func TestWaitGroupMisuse2(t *testing.T)
func TestWaitGroupMisuse3(t *testing.T)
 Copy code 
Done function

Done() The function is relatively simple , It's called Add(-1). In actual use , space in between goroutine After the task is completed , You should call Done() function .

func (wg *WaitGroup) Done() {	wg.Add(-1)}
 Copy code 
Wait function

If WaitGroup Medium counter Greater than 0, Then perform Wait() The main of the function goroutine Will waiter It's worth adding 1, And wait until the value is 0, To continue executing subsequent code .

We remove the code of race detection ,Wait() The implementation source code of the function is as follows

func (wg *WaitGroup) Wait() {	statep, semap := wg.state()	for {		state := atomic.LoadUint64(statep) //  Atoms read composite states statep v := int32(state >> 32) //  obtain counter value  w := uint32(state) //  obtain waiter value  //  If at this time v==0, Prove that there are no sub tasks to be performed goroutine, Just exit . if v == 0 { return } //  If in execution CAS Between atomic operations and reading composite states , Nothing else goroutine Changed composite state  //  Then we will waiter value +1, otherwise : Enter the next cycle , Reread the composite state  if atomic.CompareAndSwapUint64(statep, state, state+1) { //  Yes waiter After the value accumulation is successful  //  wait for Add Call in function  runtime_Semrelease  Wake yourself up  runtime_Semacquire(semap) // reused  trigger panic //  At present goroutine When awakened , Because you wake up your goroutine By calling Add When the method is used  //  Have gone through  *statep = 0  Statement does a reset operation  //  The composite status bit at this time is not 0, Just because I haven't waited Waiter After execution Wait,WaitGroup It has already happened  if *statep != 0 { panic("sync: WaitGroup is reused before previous Wait has returned") } return } }}
 Copy code 

summary

To understand WaitGroup Source code implementation , We need some pre knowledge , For example, semaphore 、 Memory alignment 、 Atomic manipulation 、 Shift operation and pointer conversion .

But in fact WaitGroup The implementation idea of is still quite simple , Through the structure field state1 Two counters and a semaphore are maintained , The counters are through Add() Children added goroutine The count of counter, adopt Wait() Stuck in a jam waiter Count , Semaphores are used to block and wake up Waiter. When executed Add(positive n) when ,counter +=n, Indicates the addition of n Height goroutine Perform tasks . Each child goroutine After finishing the task , Need to call Done() Function will counter Value reduction 1, When the last one goroutine When finished ,counter The value would be 0, At this point, you need to wake up and block in Wait() In the call Waiter.

however , In the use of WaitGroup when , There are several points to pay attention to

  • adopt Add() Function added counter The number must be consistent with the subsequent passage Done() The subtracted values are consistent . If the former is large , So the blockage is Wait() At call goroutine Will never be awakened ; If the latter is large , Would trigger panic.
  • Add() The incremental function of should be executed first .
  • Don't take WaitGroup Object to copy .
  • If you want to reuse WaitGroup, Must be in all previous Wait() After the call returns, a new Add() call .

版权声明
本文为[Machine bell chopper]所创,转载请带上原文链接,感谢
https://cdmana.com/2021/09/20210909124112676b.html

Scroll to Top