If each thread needs to resume work, whenever any thread finds some new information, how to wait until all threads have finished?

If each thread needs to resume work, whenever any thread finds some new information, how to wait until all threads have finished?






















A Seemingly Simple Synchronization Problem


Several threads depend on each other. Whenever one of them finds some new information, all of them need to process that information. How to determine, that all threads are ready?


I have (almost) parallelized a function Foo(input) that solves a problem, which is known to be P-complete and may be thought of as some type of search. Unsurprisingly, so far nobody has managed to successfully exploit parallelism beyond two threads for solving that problem. However, I had a promising idea and managed to fully implement it, except for this seemingly simply problem.


Information between each of the threads is exchanged implicitly using some kind of shared graph-like database g of type G, such that the threads have all informations immediately and do not really need to notify each other explicitly. More precisely, each time an information i is found by some thread, that thread calls a thread-safe function g.addInformation(i) which among other things basically places the information i at the end of some array. One aspect of my new implementation is, that threads can use an information i during their search even before i has been enqueued at the end of the array. Nevertheless, each thread needs to additionally process the information i separately after it has been enqueued in that array. Enqueueing i may happen after the thread who added i has returned from g.addInformation(i). This is because some other thread may take over responsibility to enqueue i.

Each thread s calls a function s.ProcessAllInformation() in order to processes all information in that array in g in order. A call to s.ProcessAllInformation by some thread is a noop, i.e. does nothing, if that thread has already processed all informations or there was no (new) informations.

As soon as a thread finished processing all informations, it should wait for all other threads to finish. And it should resume work if any of the other threads finds some new information i. I.e. each time some thread calls g.addInformation(i) all threads that had finished processing all previously known informations, need to resume their work and process that (and any other) newly added information.

My Problem

Any solution I could think does not work and suffers from a variation of the same problem: One thread finished processing all informations and then sees all other threads are ready, too. Hence, this thread leaves. But then another thread notices some new information had been added, resumes work and finds a new information. The new information is then not processed by the thread that has already left.

A solution to this problem may be straight forward, but I can not think of one. Ideally a solution to this problem should not depend on time-consuming operations during a function call to g.addInformation(i) whenever a new information is found, because of how many times a second this situation is predicted to appear (1 or 2 Million times per second, see below).

Even more background

In my initially sequential application the function Foo(input) is called roughly 100k times a second on modern hardware and my application spends 80% to 90% of time executing Foo(input). Actually, all function calls to Foo(input) depend on each other, we kind of search for something in a very large space in an iterative manner. Solving a reasonable-sized problem typically takes about one or two hours when using the sequential version of the application.

Each time Foo(input) is called between zero and many hundred new informations are found. On average during the execution of my application 1 or 2 million informations are found per second, i.e. we find 10 to 20 new informations on each function call to Foo(input). All of these statistics probably have a very high standard deviation (which i didn't yet measure, though).

Currently I am writing a prototype for the parallel version of Foo(input) in go. I prefer answers in go. The sequential application is written in C (actually it's C++, but its written like a program in C). So answers in C or C++ (or pseudo-code) are no problem. I haven't benchmarked my prototype, yet, since wrong code is infinitely slower than slow code.


This code examples are in order to clarify. Since I haven't solved the problem feel free to consider any changes to the code. (I appreciate unrelated helpful remarks, too.)

Global situation

We have some type G and Foo() is a method of G. If g is an object of type G and when g.Foo(input) is called, g creates some workers s[1], ..., s[g.numThreads] that obtain a pointer to g, such that these have access to the member variables of g and are able to call g.addInformation(i) whenever they find a new information. Then for each worker s[j] a method FooInParallel() is called in parallel.

type G struct {
  s           []worker
  numThreads  int

  // some data, that the workers need access to

func (g *G) initializeWith(input InputType) {
  // Some code...

func (g *G) Foo(input InputType) int {
  // Initialize data-structures:

  // Initialize workers:
  g.s := make([]worker, g.numThreads)
  for j := range g.s {
    g.s[j] := newWorker(g) // workers get a pointer to g

  // Note: This wait group doesn&#39;t solve the problem. See remark below.
  wg := new(sync.WaitGroup)
  // Actual computation in parallel:
  for j := 0 ; j &lt; g.numThreads - 1 ; j++ {
    // Start g.numThread - 1 go-routines in parrallel
    go g.s[j].FooInParallel(wg)

  // Last thread is this go-routine, such that we have
  // g.numThread go-routines in total.


// This function is thread-safe in so far as several
// workers can concurrently add information.
// The function is optimized for heavy contention; most
// threads can leave almost immediately. One threads 
// cleans up any mess they leave behind (and even in 
// bad cases that is not too much).
func (g *G) addInformation(i infoType) {
  // Step 1: Make information available to all threads.
  // Step 2: Enqueue information at the end of some array.
  // Step 3: Possibly, call g.notifyAll()

// If a new information has been added, we must ensure, 
// that every thread, that had finished, resumes work 
// and processes any newly added informations. 
func (g *G) notifyAll() {
   // TODO:
   // This is what I fail to accomplish. I include
   // my most successful attempt in the corresponding.
   // section. It doesn&#39;t work, though.

// If a thread has finished processing all information
// it must ensure that all threads have finished and
// that no new information have been added since.
func (g *G) allThreadsReady() bool {
   // TODO:
   // This is what I fail to accomplish. I include
   // my most successful attempt in the corresponding.
   // section. It doesn&#39;t work, though.

Remark: The only purpose of the wait group is to ensure Foo(input) is not called again before the last worker has returned. However, you can completely ignore this.

Local Situation

Each worker contains a pointer to the global data-structure and searches for either a treasure or new informations until it has processed all information that have been enqueued by this or other threads. If it finds a new information i it calls the function g.addInformation(i) and continues its search. If it finds a treasure it sends the treasure via a channel it has obtained as an argument and returns. If all threads are ready with processing all information, each of them can send a dummy-treasure to the channel and return. However, determining whether all threads are ready is exactly my problem.

type worker struct {
  // Each worker contains a pointer to g
  // such that it has access to its member
  // variables and is able to call the
  // function g.addInformation(i) as soon 
  // as it finds some information i.
  g    *G 

  // Also contains some other stuff. 

func (s *worker) FooInParallel(wg *sync.WaitGroup) {
  defer wg.Done()
  for {
    a := s.processAllInformation()
    // The following is the problem. Feel free to make any 
    // changes to the following block.
    for !s.needsToResumeWork() {
      if s.allThreadsReady() {


func (s *worker) notifyAll() {
  // TODO:
  // This is what I fail to accomplish. I include
  // my most successful attempt in the corresponding.
  // section. It doesn&#39;t work, though.

  // An example: 
  // Step 1: Possibly, do something else first.
  // Step 2: Call g.notifyAll()

func (s *worker) needsToResumeWork() bool {
  // TODO:
  // This is what I fail to accomplish. I include
  // my most successful attempt in the corresponding.
  // section. It doesn&#39;t work, though.

func (s *worker) allThreadsReady() bool {
  // TODO:
  // This is what I fail to accomplish. I include
  // my most successful attempt in the corresponding.
  // section. It doesn&#39;t work, though.

  // If all threads are ready, return true. 
  // Otherwise, return false.

  // Alternatively, spin as long as no new information
  // has been added, and return false as soon as some
  // new information has been added, or true if no new
  // information has been added and all other threads
  // are ready.
  // However, this doesn&#39;t really matter, because a 
  // function call to processAllInformation is cheap
  // if no new informations are available.

// A call to this function is cheap if no new work has
// been added since the last function call.
func (s *worker) processAllInformation() treasureType {
  // Access member variables of g and search
  // for information or treasures. 

  // If a new information i is found, calls the
  // function g.addInformation(i).

  // If all information that have been enqueued to
  // g have been processed by this thread, returns.

My best attempt to solve the problem

Well, by now, I am rather tired, so I might need to double-check my solution later. However, even my correct attempt doesn't work. So in order to give you an idea of what I have been trying so far (among many other things), I share it immediately.

I tried the following. Each of the workers contains a member variable needsToResumeWork, that is atomically set to one whenever a new information has been added. Several times setting this member variable to one does not do harm, it is only important that the thread resumes work after the last information has been added.

In order to reduce work load for a thread calling g.addInformation(i) whenever an information i is found, instead of notifying all threads individually, the thread that enqueues the information (that is not necessarily the thread that called g.addInformation(i)) afterwards sets a member variable notifyAllFlag of g to one, which indicates that all threads need to be notified about the latest information.

Whenever a thread that has finished processing all information that had been enqueued calls the function g.notifyAll(), it checks whether the member variable notifyAllFlag is set to one. If so it tries to atomically compare g.allInformedFlag with 1 and swap with 0. If it could not write g.allInformedFlag it assumes some other thread has taken the responsibility to inform all threads. If this operation is successful, this thread has taken over responsibility to notify all threads and proceeds to do so by setting the member variable needsToResumeWorkFlag to one for every thread. Afterwards it atomically sets g.numThreadsReady and g.notifyAllFlag to zero, and g.allInformedFlag to 1.

type G struct {
  numThreads       int
  numThreadsReady      *uint32 // initialize to 0 somewhere appropriate
  notifyAllFlag        *uint32 // initialize to 0 somewhere appropriate
  allInformedFlag      *uint32 // initialize to 1 somewhere appropriate (1 is not a typo)

  // some data, that the workers need access to

// This function is thread-safe in so far as several
// workers can concurrently add information.
// The function is optimized for heavy contention; most
// threads can leave almost immediately. One threads 
// cleans up any mess they leave behind (and even in 
// bad cases that is not too much).
func (g *G) addInformation(i infoType) {
  // Step 1: Make information available to all threads.
  // Step 2: Enqueue information at the end of some array.

  // Since the responsibility to enqueue an information may
  // be passed to another thread, it is important that the
  // last step is executed by the thread which enqueues the 
  // information(s) in order to ensure, that the information
  // successfully has been enqueued.

  // Step 3:
  atomic.StoreUint32(g.notifyAllFlag,1)        // all threads need to be notified

// If a new information has been added, we must ensure, 
// that every thread, that had finished, resumes work 
// and processes any newly added informations. 
func (g *G) notifyAll() {
  if atomic.LoadUint32(g.notifyAll) == 1 {
    // Somebody needs to notify all threads.
    if atomic.CompareAndSwapUint32(g.allInformedFlag, 1, 0) {
      // This thread has taken over the responsibility to inform
      // all other threads. All threads are hindered to access 
      // their member variable s.needsToResumeWorkFlag
      for j := range g.s {
        atomic.StoreUint32(g.s[j].needsToResumeWorkFlag, 1)
      atomic.StoreUint32(g.notifyAllFlag, 0)
      atomic.StoreUint32(g.numThreadsReady, 0)
      atomic.StoreUint32(g.allInformedFlag, 1)
    } else {
      // Some other thread has taken responsibility to inform
      // all threads. 

Whenever a thread finishes processing all information that had been enqueued, it checks whether it needs to resume work by atomically comparing its member variable needsToResumeWorkFlag with 1 and swapping with 0. However, since one of the threads is responsible to notify all others, it can not do so immediately.

First, it must call the function g.notifyAll(), and then it must check, whether the latest thread to call g.notifyAll() finished notifying all threads. Hence, after calling g.notifyAll() it must spin until g.allInformed is one, before it checks whether its member variable s.needsToResumeWorkFlag is one and in this case atomically sets it to be zero and resumes work. (I guess here is a mistake, but I also tried several other things here without success.) If s.needsToResumeWorkFlag is already zero, it atomically increments g.numThreadsReady by one, if it hasn't done so before. (Recall that g.numThreadsReady is reset during a function call to g.notifyAll().) then it atomically checks whether g.numThreadsReady is equal to g.numThreads, in which case it can leave (after sending a dummy-treasure to the channel). otherwise we start all over again until either this thread has been notified (possibly by itself) or all threads are ready.

type worker struct {
  // Each worker contains a pointer to g
  // such that it has access to its member
  // variables and is able to call the
  // function g.addInformation(i) as soon 
  // as it finds some information i.
  g    *G 

  // If new work has been added, the thread
  // is notified by setting the uint32 
  // at which needsToResumeWorkFlag points to 1.
  needsToResumeWorkFlag *uint32 // initialize to 0 somewhere appropriate

  // Also contains some other stuff. 

func (s *worker) FooInParallel(wg *sync.WaitGroup) {
  defer wg.Done()
  for {
    a := s.processAllInformation()

    numReadyIncremented := false
    for !s.needsToResumeWork() {
      if !numReadyIncremented {
        numReadyIncremented = true
      if s.allThreadsReady() {


func (s *worker) needsToResumeWork() bool {
  for {
    if atomic.LoadUint32(g.allInformedFlag) == 1 {
      if atomic.CompareAndSwapUint32(s.needsToResumeWorkFlag, 1, 0) {
        return true
      } else {
        return false

func (s *worker) notifyAll() {

func (g *G) allThreadsReady() bool {
  if atomic.LoadUint32(g.numThreadsReady) == g.numThreads {
    return true
  } else {
    return false

As mentioned my solution doesn't work.


得分: 3



type G struct {
  numThreads           int
  numThreadsReady      *uint32 // 在适当的位置初始化为0
  notifyAllFlag        *uint32 // 在适当的位置初始化为0
  allCanGoFlag         *uint32 // 在适当的位置初始化为0
  lock                 *uint32 // 在适当的位置初始化为0

  // 一些工作线程需要访问的数据

// 此函数是线程安全的,因为多个工作线程可以同时添加信息。
// 该函数针对高争用情况进行了优化;大多数线程几乎可以立即离开。一个线程
// 清理其他线程留下的任何混乱(即使在糟糕的情况下,这也不会太多)。
func (g *G) addInformation(i infoType) {
  // 步骤1:使所有线程都可以访问信息。
  // 步骤2:将信息排队到某个数组的末尾。

  // 由于将信息排队的责任可能会传递给另一个线程,因此重要的是
  // 最后一步由将信息排队的线程执行,以确保信息成功排队。

  // 步骤3:
  atomic.StoreUint32(g.notifyAllFlag, 1)        // 所有线程都需要被通知

// 如果添加了新信息,我们必须确保每个已完成的线程都恢复工作并处理任何新添加的信息。
// 此函数不是线程安全的。确保不要同时调用此函数的多个线程
// 如果这些调用没有由某个锁保护。
func (g *G) notifyAll() {
  if atomic.LoadUint32(g.notifyAllFlag, 1) {    
    for j := range g.s {
      atomic.StoreUint32(g.s[j].needsToResumeWorkFlag, 1)
    atomic.StoreUint32(g.notifyAllFlag, 0)
    atomic.StoreUint32(g.numThreadsReady, 0)


type worker struct {
  // 每个工作线程包含一个指向g的指针
  // 以便可以访问其成员变量,并能够调用
  // 函数g.addInformation(i),一旦找到一些信息i。
  g    *G 

  // 如果添加了新工作,线程将通过将needsToResumeWorkFlag指向的uint32设置为1来收到通知。
  needsToResumeWorkFlag *uint32 // 在适当的位置初始化为0

  incrementedNumReadyFlag *uint32 // 在适当的位置初始化为0

  // 还包含一些其他内容。 

func (s *worker) FooInParallel(wg *sync.WaitGroup) {
  defer wg.Done()
  for {
    a := s.processAllInformation()

    if atomic.LoadUint32(s.g.allCanGoFlag, 1) {

    if atomic.CompareAndSwapUint32(g.lock, 0, 1) { // 如果可能,锁定。
      s.g.notifyAll() // 重要的是,这也要受到锁的保护。

      if atomic.LoadUint32(s.needsToResumeWorkFlag) == 1 {
        atomic.StoreUint32(s.needsToResumeWorkFlag, 0)

        // 找到了一些新信息,但是该线程不能确定
        // 它是否已经处理过。由于就绪线程的计数器已被重置,
        // 我们必须在以下迭代中的下一次调用processAllInformation()之后
        // 增加该计数器。
        atomic.StoreUint32(s.incrementedNumReadyFlag, 0)

      } else {

        // 如果该线程之前没有执行过此操作(自上次找到新信息以来),
        // 将就绪线程的数量增加一。
        if atomic.CompareAndSwapUint32(s.incrementedNumReadyFlag, 0, 1) {
          atomic.AddUint32(s.g.numThreadsReady, 1)

        // 如果所有线程都准备就绪,给它们发送一个信号。
        if atomic.LoadUint32(s.g.numThreadsReady) == s.g.numThreads {
          atomic.StoreUint32(s.g.allCanGo, 1)


      atomic.StoreUint32(g.lock, 0) // 解锁。




I found a solution myself. We exploit, that a call to s.processAllInformation() does nothing, if no new information had been added (and is cheap). The trick is to use an atomic variable as a lock to both, for each thread to notify all if necessary and to check whether it has been notified. And then to simply call s.processAllInformation() again, if the lock can not be acquired. A thread then uses the notifications to check whether it has to increment the counter of ready threads, instead of to see whether it needs to return work.

Global situation

type G struct {
numThreads           int
numThreadsReady      *uint32 // initialize to 0 somewhere appropriate
notifyAllFlag        *uint32 // initialize to 0 somewhere appropriate
allCanGoFlag         *uint32 // initialize to 0 somewhere appropriate
lock                 *uint32 // initialize to 0 somewhere appropriate
// some data, that the workers need access to
// This function is thread-safe in so far as several
// workers can concurrently add information.
// The function is optimized for heavy contention; most
// threads can leave almost immediately. One threads 
// cleans up any mess they leave behind (and even in 
// bad cases that is not too much).
func (g *G) addInformation(i infoType) {
// Step 1: Make information available to all threads.
// Step 2: Enqueue information at the end of some array.
// Since the responsibility to enqueue an information may
// be passed to another thread, it is important that the
// last step is executed by the thread which enqueues the 
// information(s) in order to ensure, that the information
// successfully has been enqueued.
// Step 3:
atomic.StoreUint32(g.notifyAllFlag,1)        // all threads need to be notified
// If a new information has been added, we must ensure, 
// that every thread, that had finished, resumes work 
// and processes any newly added informations. 
// This function is not thread-safe. Make sure not to 
// have several threads call this function concurrently
// if these calls are not guarded by some lock.
func (g *G) notifyAll() {
if atomic.LoadUint32(g.notifyAllFlag,1) {    
for j := range g.s {
atomic.StoreUint32(g.s[j].needsToResumeWorkFlag, 1)

Local situation

type worker struct {
// Each worker contains a pointer to g
// such that it has access to its member
// variables and is able to call the
// function g.addInformation(i) as soon 
// as it finds some information i.
g    *G 
// If new work has been added, the thread
// is notified by setting the uint32 
// at which needsToResumeWorkFlag points to 1.
needsToResumeWorkFlag *uint32 // initialize to 0 somewhere appropriate
incrementedNumReadyFlag *uint32 // initialize to 0 somewhere appropriate
// Also contains some other stuff. 
func (s *worker) FooInParallel(wg *sync.WaitGroup) {
defer wg.Done()
for {
a := s.processAllInformation()
if atomic.LoadUint32(s.g.allCanGoFlag, 1) {
if atomic.CompareAndSwapUint32(g.lock,0,1) { // If possible, lock.
s.g.notifyAll() // It is essential, that this is also guarded by the lock.
if atomic.LoadUint32(s.needsToResumeWorkFlag) == 1 {
// Some new information was found, and this thread can&#39;t be sure,
// whether it already has processed it. Since the counter for
// how many threads are ready had been reset, we must increment
// that counter after the next call processAllInformation() in the 
// following iteration.
} else {
// Increment number of ready threads by one, if this thread had not 
// done this before (since the last newly found information).
if atomic.CompareAndSwapUint32(s.incrementedNumReadyFlag,0,1) {
// If all threads are ready, give them all a signal.
if atomic.LoadUint32(s.g.numThreadsReady) == s.g.numThreads {
atomic.StoreUint32(s.g.allCanGo, 1)
atomic.StoreUint32(g.lock,0) // Unlock.

Later I may add some order for the threads to access to the lock under heavy contention, but for now that'll do.

