1 // Copyright 2011 The Go Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style 3 // license that can be found in the LICENSE file. 4 5 package sync 6 7 import ( 8 "internal/race" 9 "sync/atomic" 10 "unsafe" 11 ) 12 13 // A WaitGroup waits for a collection of goroutines to finish. 14 // The main goroutine calls Add to set the number of 15 // goroutines to wait for. Then each of the goroutines 16 // runs and calls Done when finished. At the same time, 17 // Wait can be used to block until all goroutines have finished. 18 // 19 // A WaitGroup must not be copied after first use. 20 // 21 // In the terminology of the Go memory model, a call to Done 22 // “synchronizes before” the return of any Wait call that it unblocks. 23 type WaitGroup struct { 24 noCopy noCopy 25 26 // 64-bit value: high 32 bits are counter, low 32 bits are waiter count. 27 // 64-bit atomic operations require 64-bit alignment, but 32-bit 28 // compilers only guarantee that 64-bit fields are 32-bit aligned. 29 // For this reason on 32 bit architectures we need to check in state() 30 // if state1 is aligned or not, and dynamically "swap" the field order if 31 // needed. 32 state1 uint64 33 state2 uint32 34 } 35 36 // state returns pointers to the state and sema fields stored within wg.state*. 37 func (wg *WaitGroup) state() (statep *uint64, semap *uint32) { 38 if unsafe.Alignof(wg.state1) == 8 || uintptr(unsafe.Pointer(&wg.state1))%8 == 0 { 39 // state1 is 64-bit aligned: nothing to do. 40 return &wg.state1, &wg.state2 41 } else { 42 // state1 is 32-bit aligned but not 64-bit aligned: this means that 43 // (&state1)+4 is 64-bit aligned. 44 state := (*[3]uint32)(unsafe.Pointer(&wg.state1)) 45 return (*uint64)(unsafe.Pointer(&state[1])), &state[0] 46 } 47 } 48 49 // Add adds delta, which may be negative, to the WaitGroup counter. 50 // If the counter becomes zero, all goroutines blocked on Wait are released. 51 // If the counter goes negative, Add panics. 52 // 53 // Note that calls with a positive delta that occur when the counter is zero 54 // must happen before a Wait. Calls with a negative delta, or calls with a 55 // positive delta that start when the counter is greater than zero, may happen 56 // at any time. 57 // Typically this means the calls to Add should execute before the statement 58 // creating the goroutine or other event to be waited for. 59 // If a WaitGroup is reused to wait for several independent sets of events, 60 // new Add calls must happen after all previous Wait calls have returned. 61 // See the WaitGroup example. 62 func (wg *WaitGroup) Add(delta int) { 63 statep, semap := wg.state() 64 if race.Enabled { 65 _ = *statep // trigger nil deref early 66 if delta < 0 { 67 // Synchronize decrements with Wait. 68 race.ReleaseMerge(unsafe.Pointer(wg)) 69 } 70 race.Disable() 71 defer race.Enable() 72 } 73 state := atomic.AddUint64(statep, uint64(delta)<<32) 74 v := int32(state >> 32) 75 w := uint32(state) 76 if race.Enabled && delta > 0 && v == int32(delta) { 77 // The first increment must be synchronized with Wait. 78 // Need to model this as a read, because there can be 79 // several concurrent wg.counter transitions from 0. 80 race.Read(unsafe.Pointer(semap)) 81 } 82 if v < 0 { 83 panic("sync: negative WaitGroup counter") 84 } 85 if w != 0 && delta > 0 && v == int32(delta) { 86 panic("sync: WaitGroup misuse: Add called concurrently with Wait") 87 } 88 if v > 0 || w == 0 { 89 return 90 } 91 // This goroutine has set counter to 0 when waiters > 0. 92 // Now there can't be concurrent mutations of state: 93 // - Adds must not happen concurrently with Wait, 94 // - Wait does not increment waiters if it sees counter == 0. 95 // Still do a cheap sanity check to detect WaitGroup misuse. 96 if *statep != state { 97 panic("sync: WaitGroup misuse: Add called concurrently with Wait") 98 } 99 // Reset waiters count to 0. 100 *statep = 0 101 for ; w != 0; w-- { 102 runtime_Semrelease(semap, false, 0) 103 } 104 } 105 106 // Done decrements the WaitGroup counter by one. 107 func (wg *WaitGroup) Done() { 108 wg.Add(-1) 109 } 110 111 // Wait blocks until the WaitGroup counter is zero. 112 func (wg *WaitGroup) Wait() { 113 statep, semap := wg.state() 114 if race.Enabled { 115 _ = *statep // trigger nil deref early 116 race.Disable() 117 } 118 for { 119 state := atomic.LoadUint64(statep) 120 v := int32(state >> 32) 121 w := uint32(state) 122 if v == 0 { 123 // Counter is 0, no need to wait. 124 if race.Enabled { 125 race.Enable() 126 race.Acquire(unsafe.Pointer(wg)) 127 } 128 return 129 } 130 // Increment waiters count. 131 if atomic.CompareAndSwapUint64(statep, state, state+1) { 132 if race.Enabled && w == 0 { 133 // Wait must be synchronized with the first Add. 134 // Need to model this is as a write to race with the read in Add. 135 // As a consequence, can do the write only for the first waiter, 136 // otherwise concurrent Waits will race with each other. 137 race.Write(unsafe.Pointer(semap)) 138 } 139 runtime_Semacquire(semap) 140 if *statep != 0 { 141 panic("sync: WaitGroup is reused before previous Wait has returned") 142 } 143 if race.Enabled { 144 race.Enable() 145 race.Acquire(unsafe.Pointer(wg)) 146 } 147 return 148 } 149 } 150 } 151