If each thread needs to resume work, whenever any thread finds some new information, how to wait until all threads have finished?

huangapple go评论81阅读模式
英文:

If each thread needs to resume work, whenever any thread finds some new information, how to wait until all threads have finished?

问题

一个看似简单的同步问题

TL;DR

几个线程彼此依赖。每当其中一个线程找到一些新信息时,所有线程都需要处理该信息。如何确定所有线程都准备好了?

背景

我已经(几乎)将解决一个已知为P-complete的问题的函数Foo(input)并行化,可以将其视为某种类型的搜索。毫不奇怪,到目前为止,没有人成功地利用并行性来解决该问题,除了两个线程。然而,我有一个有希望的想法,并成功地实现了它,除了这个看似简单的问题。

详细信息

每个线程之间的信息是通过某种共享的类似图形的数据库g(类型为G)隐式交换的,因此线程立即拥有所有信息,实际上不需要显式地通知彼此。更准确地说,每当某个线程找到一个信息i时,该线程调用一个线程安全的函数g.addInformation(i),该函数在其他操作之外,基本上将信息i放置在某个数组的末尾。我的新实现的一个方面是,线程可以在将i排队到数组末尾之前,在其搜索过程中使用信息i。然而,每个线程在将信息i排队到数组中后,仍然需要单独处理信息i。将i排队可能发生在添加i的线程从g.addInformation(i)返回之后。这是因为其他线程可能负责排队i

每个线程s调用函数s.ProcessAllInformation()以按顺序处理g中数组中的所有信息。如果某个线程已经处理了所有信息或者没有(新的)信息,则对s.ProcessAllInformation的调用是noop,即不执行任何操作。

一旦一个线程完成了所有信息的处理,它应该等待所有其他线程完成。如果其他任何线程发现了一些新信息i,它应该恢复工作。也就是说,每当某个线程调用g.addInformation(i)时,所有已经完成处理所有先前已知信息的线程都需要恢复工作并处理(和任何其他)新添加的信息。

我的问题

我能想到的任何解决方案都不起作用,并且都存在同一个问题的变体:一个线程完成了所有信息的处理,然后看到所有其他线程也准备好了。因此,该线程离开。但是,另一个线程注意到添加了一些新信息,恢复工作并找到了一个新信息。然后,已经离开的线程没有处理新信息。

解决此问题可能很简单,但我想不出一个解决方案。理想情况下,此问题的解决方案不应依赖于在发现新信息时对g.addInformation(i)的函数调用期间进行耗时的操作,因为预计每秒会出现多少次此情况(每秒1或2百万次,见下文)。

更多背景

在我最初的顺序应用程序中,函数Foo(input)在现代硬件上大约每秒调用100k次,我的应用程序花费80%到90%的时间执行Foo(input)。实际上,所有对Foo(input)的函数调用彼此依赖,我们以迭代的方式在一个非常大的空间中搜索某些内容。使用顺序版本的应用程序解决一个合理大小的问题通常需要一到两个小时。

每次调用Foo(input)时,可能会找到零到几百个新信息。在我的应用程序执行期间,平均每秒找到1或2百万个信息,即在每次对Foo(input)的函数调用中找到10到20个新信息。所有这些统计数据可能具有非常高的标准差(尽管我尚未测量)。

目前,我正在使用Go编写Foo(input)的并行版本的原型。我更喜欢使用Go进行回答。顺序应用程序是用C编写的(实际上是C++,但它的编写方式类似于C程序)。因此,使用C、C++(或伪代码)进行回答没有问题。我尚未对原型进行基准测试,因为错误的代码比慢的代码要慢得多。

代码

这些代码示例是为了澄清问题。由于我尚未解决问题,因此可以自由考虑对代码进行任何更改(我也欢迎与问题无关的有用备注)。

全局情况

我们有一些类型GFoo()G的一个方法。如果g是类型G的对象,并且调用g.Foo(input),则g创建一些工作线程s[1],...,s[g.numThreads],这些线程获得对g的指针,以便可以访问g的成员变量,并能够在找到新信息时调用g.addInformation(i)。然后,对于每个工作线程s[j],并行调用方法FooInParallel()

type G struct {
  s           []worker
  numThreads  int

  // 工作线程需要访问的一些数据
}

func (g *G) initializeWith(input InputType) {
  // 一些代码...
}

func (g *G) Foo(input InputType) int {
  // 初始化数据结构:
  g.initializeWith(input)

  // 初始化工作线程:
  g.s := make([]worker, g.numThreads)
  for j := range g.s {
    g.s[j] := newWorker(g) // 工作线程获得对g的指针
  }

  // 注意:这个等待组不能解决问题。请参见下面的备注。
  wg := new(sync.WaitGroup)
  wg.Add(g.numThreads)
 
  // 并行计算:
  for j := 0 ; j < g.numThreads - 1 ; j++ {
    // 启动g.numThread - 1个goroutine并行执行
    go g.s[j].FooInParallel(wg)
  }

  // 最后一个线程是这个goroutine,这样我们总共有g.numThread个goroutine。
  g.s[g.numThread-1].FooInParallel(wg)

  wg.Wait()
}

// 这个函数在多个工作线程同时添加信息时是线程安全的。
// 
// 该函数针对高争用情况进行了优化;大多数线程几乎可以立即离开。一个线程
// 清理其他线程留下的任何混乱(即使在糟糕的情况下,这也不会太多)。
func (g *G) addInformation(i infoType) {
  // 步骤1:使所有线程都能访问信息。
  // 步骤2:将信息排队到某个数组的末尾。
  // 步骤3:可能调用g.notifyAll()
}

// 如果添加了新信息,我们必须确保每个已经完成处理的线程都恢复工作并处理任何新添加的信息。
func (g *G) notifyAll() {
   // TODO:
   // 这是我无法完成的部分。我在相应的部分中包含了我最成功的尝试。
   // 但是它不起作用。
}

// 如果线程已经完成了处理所有信息,它必须确保所有线程都已经完成,并且自从上次处理信息以来没有添加新信息。
func (g *G) allThreadsReady() bool {
   // TODO:
   // 这是我无法完成的部分。我在相应的部分中包含了我最成功的尝试。
   // 但是它不起作用。
}

*备注:等待组的唯一目的是确保在最后一个工作线程返回之前不会再次调用Foo(input)。但是,您可以完全忽略这一点。

局部情况

每个工作线程包含对全局数据结构的指针,并在处理由此线程或其他线程排队的所有信息之前,搜索宝藏或新信息。如果找到新信息i,它调用函数g.addInformation(i)并继续搜索。如果找到宝藏,它通过一个通道将宝藏发送出去,并返回。如果所有线程都准备好处理所有信息,它们中的每一个都可以向通道发送一个虚拟宝藏并返回。然而,确定所有线程是否准备好正是我的问题。

type worker struct {
  // 每个工作线程包含对g的指针
  // 以便可以访问其成员变量,并且可以在找到某些信息i后调用函数g.addInformation(i)。
  g    *G 

  // 还包含一些其他内容。 
}

func (s *worker) FooInParallel(wg *sync.WaitGroup) {
  defer wg.Done()
  for {
    a := s.processAllInformation()
    
    // 以下是问题所在。请随意对以下代码块进行任何更改。
    s.notifyAll()
    for !s.needsToResumeWork() {
      if s.allThreadsReady() {
        return
      }
    }

  }
}

func (s *worker) notifyAll() {
  // TODO:
  // 这是我无法完成的部分。我在相应的部分中包含了我最成功的尝试。
  // 但是它不起作用。

  // 一个示例:
  // 步骤1:可能先执行其他操作。
  // 步骤2:调用g.notifyAll()
}

func (s *worker) needsToResumeWork() bool {
  // TODO:
  // 这是我无法完成的部分。我在相应的部分中包含了我最成功的尝试。
  // 但是它不起作用。
}

func (s *worker) allThreadsReady() bool {
  // TODO:
  // 这是我无法完成的部分。我在相应的部分中包含了我最成功的尝试。
  // 但是它不起作用。

  // 如果所有线程都准备好,返回true。
  // 否则,返回false。

  // 或者,只要没有添加新信息,就旋转,如果添加了一些新信息,则返回false,
  // 如果没有添加新信息并且所有其他线程都准备好,则返回true。
  // 
  // 但是,这并不重要,因为如果没有新信息可用,对processAllInformation的函数调用是廉价的。
}

// 如果没有添加新工作,对此函数的调用是廉价的。
func (s *worker) processAllInformation() treasureType {
  // 访问g的成员变量并搜索信息或宝藏。

  // 如果找到新信息i,则调用函数g.addInformation(i)。

  // 如果该线程已经处理了所有已经排队到g的信息,则返回。
}

我解决问题的最佳尝试

好吧,现在我有点累了,所以我可能需要稍后再检查我的解决方案。然而,即使我的“正确”尝试也不起作用。因此,为了让您对我迄今为止尝试了什么有所了解(除了其他许多尝试),我立即分享它。

我尝试了以下方法。每个工作线程都包含一个成员变量needsToResumeWork,当添加新信息时,该变量会被原子地设置为1。多次将此成员变量设置为1不会有害,重要的是在最后一个信息添加后线程恢复工作。

为了减少调用g.addInformation(i)时的工作负载,而不是单独通知所有线程,将排队信息的线程(不一定是调用g.addInformation(i)的线程)之后将g.notifyAllFlag的成员变量设置为1,表示所有线程都需要被通知最新信息。

每当一个已经完成处理所有已排队信息的线程调用函数g.notifyAll()时,它会检查成员变量notifyAllFlag是否设置为1。如果是,则尝试原子地将g.allInformedFlag与1进行比较并交换为0。如果无法写入g.allInformedFlag,则假设其他线程已经负责通知所有线程。如果此操作成功,则此线程已经负责通知所有线程,并继续通过将成员变量s.needsToResumeWorkFlag设置为1来执行此操作。然后,它原子地将g.numThreadsReadyg.notifyAllFlag设置为零,并将g.allInformedFlag设置为1。

type G struct {
  numThreads       int
  numThreadsReady      *uint32 // 在适当的地方初始化为0
  notifyAllFlag        *uint32 // 在适当的地方初始化为0
  allInformedFlag      *uint32 // 在适当的地方初始化为1

  // 工作线程需要访问的一些数据
}

// 这个函数在多个工作线程同时添加信息时是线程安全的。
// 
// 该函数针对高争用情况进行了优化;大多数线程几乎可以立即离开。一个线程
// 清理其他线程留下的任何混乱(即使在糟糕的情况下,这也不会太多)。
func (g *G) addInformation(i infoType) {
  // 步骤1:使所有线程都能访问信息。
  // 步骤2:将信息排队到某个数组的末尾。

  // 由于排队信息的责任可能传递给另一个线程,因此重要的是最后一步由排队信息的线程执行,
  // 以确保信息已成功排队。

  // 步骤3:
  atomic.StoreUint32(g.notifyAllFlag,1)        // 所有线程都需要被通知
}

// 如果添加了新信息,我们必须确保每个已经完成处理的线程都恢复工作并处理任何新添加的信息。
func (g *G) notifyAll() {
  if atomic.LoadUint32(g.notifyAll) == 1 {
    // 有人需要通知所有线程。
    if atomic.CompareAndSwapUint32(g.allInformedFlag, 1, 0) {
      // 此线程已经负责通知所有其他线程。所有线程都无法访问其成员变量s.needsToResumeWorkFlag
      for j := range g.s {
        atomic.StoreUint32(g.s[j].needsToResumeWorkFlag, 1)
      }
      atomic.StoreUint32(g.notifyAllFlag, 0)
      atomic.StoreUint32(g.numThreadsReady, 0)
      atomic.StoreUint32(g.allInformedFlag, 1)
    } else {
      // 其他线程已经负责通知所有线程。
  }
}

每当一个线程完成处理所有已排队信息时,它都会检查它是否需要恢复工作,方法是原子地将其成员变量s.needsToResumeWorkFlag与1进行比较并交换为0。但是,由于其中一个线程负责通知所有其他线程,它不能立即执行此操作。

首先,它必须调用函数g.notifyAll(),然后必须检查最新调用g.notifyAll()的线程是否已完成通知所有线程的工作。因此,在调用g.notifyAll()之后,它必须旋转,直到g.allInformed为1,然后再检查其成员变量s.needsToResumeWorkFlag是否为1,并在这种情况下将其原子地设置为零并恢复工作。(我猜这里有一个错误,但我也尝试了几种其他方法,没有成功。)如果s.needsToResumeWorkFlag已经为零,则原子地将g.numThreadsReady增加一次(如果尚未增加)。 (请记住,在调用g.notifyAll()的函数中,g.numThreadsReady会被重置。)然后,它原子地检查g.numThreadsReady是否等于g.numThreads,如果是,则可以离开(在向通道发送一个虚拟宝藏后)。否则,我们重新开始,直到此线程被通知(可能是由它自己)或所有线程都准备好。

type worker struct {
  // 每个工作线程包含对g的指针
  // 以便可以访问其成员变量,并且可以在找到某些信息i后调用函数g.addInformation(i)。
  g    *G 

  // 如果添加了新工作,线程将通过将needsToResumeWorkFlag指向的uint32设置为1来通知。
  needsToResumeWorkFlag *uint32 // 在适当的地方初始化为0

  // 还包含一些其他内容。 
}

func (s *worker) FooInParallel(wg *sync.WaitGroup) {
  defer wg.Done()
  for {
    a := s.processAllInformation()

    numReadyIncremented := false
    for !s.needsToResumeWork() {
      if !numReadyIncremented {
        atomic.AddUint32(g.numThreadsReady,1)
        numReadyIncremented = true
      }
      if s.allThreadsReady() {
        return
      }
    }

  }
}

func (s *worker) needsToResumeWork() bool {
  s.notifyAll()
  for {
    if atomic.LoadUint32(g.allInformedFlag) == 1 {
      if atomic.CompareAndSwapUint32(s.needsToResumeWorkFlag, 1, 0) {
        return true
      } else {
        return false
      }
    }
  }
}

func (s *worker) notifyAll() {
  g.notifyAll()
}

func (g *G) allThreadsReady() bool {
  if atomic.LoadUint32(g.numThreadsReady) == g.numThreads {
    return true
  } else {
    return false
  }
}

正如我提到的,我的解决方案不起作用。

英文:

A Seemingly Simple Synchronization Problem

TL;DR

Several threads depend on each other. Whenever one of them finds some new information, all of them need to process that information. How to determine, that all threads are ready?

Background

I have (almost) parallelized a function Foo(input) that solves a problem, which is known to be P-complete and may be thought of as some type of search. Unsurprisingly, so far nobody has managed to successfully exploit parallelism beyond two threads for solving that problem. However, I had a promising idea and managed to fully implement it, except for this seemingly simply problem.

Details

Information between each of the threads is exchanged implicitly using some kind of shared graph-like database g of type G, such that the threads have all informations immediately and do not really need to notify each other explicitly. More precisely, each time an information i is found by some thread, that thread calls a thread-safe function g.addInformation(i) which among other things basically places the information i at the end of some array. One aspect of my new implementation is, that threads can use an information i during their search even before i has been enqueued at the end of the array. Nevertheless, each thread needs to additionally process the information i separately after it has been enqueued in that array. Enqueueing i may happen after the thread who added i has returned from g.addInformation(i). This is because some other thread may take over responsibility to enqueue i.

Each thread s calls a function s.ProcessAllInformation() in order to processes all information in that array in g in order. A call to s.ProcessAllInformation by some thread is a noop, i.e. does nothing, if that thread has already processed all informations or there was no (new) informations.

As soon as a thread finished processing all informations, it should wait for all other threads to finish. And it should resume work if any of the other threads finds some new information i. I.e. each time some thread calls g.addInformation(i) all threads that had finished processing all previously known informations, need to resume their work and process that (and any other) newly added information.

My Problem

Any solution I could think does not work and suffers from a variation of the same problem: One thread finished processing all informations and then sees all other threads are ready, too. Hence, this thread leaves. But then another thread notices some new information had been added, resumes work and finds a new information. The new information is then not processed by the thread that has already left.

A solution to this problem may be straight forward, but I can not think of one. Ideally a solution to this problem should not depend on time-consuming operations during a function call to g.addInformation(i) whenever a new information is found, because of how many times a second this situation is predicted to appear (1 or 2 Million times per second, see below).

Even more background

In my initially sequential application the function Foo(input) is called roughly 100k times a second on modern hardware and my application spends 80% to 90% of time executing Foo(input). Actually, all function calls to Foo(input) depend on each other, we kind of search for something in a very large space in an iterative manner. Solving a reasonable-sized problem typically takes about one or two hours when using the sequential version of the application.

Each time Foo(input) is called between zero and many hundred new informations are found. On average during the execution of my application 1 or 2 million informations are found per second, i.e. we find 10 to 20 new informations on each function call to Foo(input). All of these statistics probably have a very high standard deviation (which i didn't yet measure, though).

Currently I am writing a prototype for the parallel version of Foo(input) in go. I prefer answers in go. The sequential application is written in C (actually it's C++, but its written like a program in C). So answers in C or C++ (or pseudo-code) are no problem. I haven't benchmarked my prototype, yet, since wrong code is infinitely slower than slow code.

Code

This code examples are in order to clarify. Since I haven't solved the problem feel free to consider any changes to the code. (I appreciate unrelated helpful remarks, too.)

Global situation

We have some type G and Foo() is a method of G. If g is an object of type G and when g.Foo(input) is called, g creates some workers s[1], ..., s[g.numThreads] that obtain a pointer to g, such that these have access to the member variables of g and are able to call g.addInformation(i) whenever they find a new information. Then for each worker s[j] a method FooInParallel() is called in parallel.

type G struct {
  s           []worker
  numThreads  int

  // some data, that the workers need access to
}

func (g *G) initializeWith(input InputType) {
  // Some code...
}

func (g *G) Foo(input InputType) int {
  // Initialize data-structures:
  g.initializeWith(input)

  // Initialize workers:
  g.s := make([]worker, g.numThreads)
  for j := range g.s {
    g.s[j] := newWorker(g) // workers get a pointer to g
  }

  // Note: This wait group doesn&#39;t solve the problem. See remark below.
  wg := new(sync.WaitGroup)
  wg.Add(g.numThreads)
 
  // Actual computation in parallel:
  for j := 0 ; j &lt; g.numThreads - 1 ; j++ {
    // Start g.numThread - 1 go-routines in parrallel
    go g.s[j].FooInParallel(wg)
  }

  // Last thread is this go-routine, such that we have
  // g.numThread go-routines in total.
  g.s[g.numThread-1].FooInParallel(wg)

  wg.Wait()
}

// This function is thread-safe in so far as several
// workers can concurrently add information.
// 
// The function is optimized for heavy contention; most
// threads can leave almost immediately. One threads 
// cleans up any mess they leave behind (and even in 
// bad cases that is not too much).
func (g *G) addInformation(i infoType) {
  // Step 1: Make information available to all threads.
  // Step 2: Enqueue information at the end of some array.
  // Step 3: Possibly, call g.notifyAll()
}

// If a new information has been added, we must ensure, 
// that every thread, that had finished, resumes work 
// and processes any newly added informations. 
func (g *G) notifyAll() {
   // TODO:
   // This is what I fail to accomplish. I include
   // my most successful attempt in the corresponding.
   // section. It doesn&#39;t work, though.
}

// If a thread has finished processing all information
// it must ensure that all threads have finished and
// that no new information have been added since.
func (g *G) allThreadsReady() bool {
   // TODO:
   // This is what I fail to accomplish. I include
   // my most successful attempt in the corresponding.
   // section. It doesn&#39;t work, though.
}

Remark: The only purpose of the wait group is to ensure Foo(input) is not called again before the last worker has returned. However, you can completely ignore this.

Local Situation

Each worker contains a pointer to the global data-structure and searches for either a treasure or new informations until it has processed all information that have been enqueued by this or other threads. If it finds a new information i it calls the function g.addInformation(i) and continues its search. If it finds a treasure it sends the treasure via a channel it has obtained as an argument and returns. If all threads are ready with processing all information, each of them can send a dummy-treasure to the channel and return. However, determining whether all threads are ready is exactly my problem.

type worker struct {
  // Each worker contains a pointer to g
  // such that it has access to its member
  // variables and is able to call the
  // function g.addInformation(i) as soon 
  // as it finds some information i.
  g    *G 

  // Also contains some other stuff. 
}

func (s *worker) FooInParallel(wg *sync.WaitGroup) {
  defer wg.Done()
  for {
    a := s.processAllInformation()
    
    // The following is the problem. Feel free to make any 
    // changes to the following block.
    s.notifyAll()
    for !s.needsToResumeWork() {
      if s.allThreadsReady() {
        return
      }
    }

  }
}

func (s *worker) notifyAll() {
  // TODO:
  // This is what I fail to accomplish. I include
  // my most successful attempt in the corresponding.
  // section. It doesn&#39;t work, though.

  // An example: 
  // Step 1: Possibly, do something else first.
  // Step 2: Call g.notifyAll()
}

func (s *worker) needsToResumeWork() bool {
  // TODO:
  // This is what I fail to accomplish. I include
  // my most successful attempt in the corresponding.
  // section. It doesn&#39;t work, though.
}

func (s *worker) allThreadsReady() bool {
  // TODO:
  // This is what I fail to accomplish. I include
  // my most successful attempt in the corresponding.
  // section. It doesn&#39;t work, though.

  // If all threads are ready, return true. 
  // Otherwise, return false.

  // Alternatively, spin as long as no new information
  // has been added, and return false as soon as some
  // new information has been added, or true if no new
  // information has been added and all other threads
  // are ready.
  // 
  // However, this doesn&#39;t really matter, because a 
  // function call to processAllInformation is cheap
  // if no new informations are available.
}

// A call to this function is cheap if no new work has
// been added since the last function call.
func (s *worker) processAllInformation() treasureType {
  // Access member variables of g and search
  // for information or treasures. 

  // If a new information i is found, calls the
  // function g.addInformation(i).

  // If all information that have been enqueued to
  // g have been processed by this thread, returns.
}

My best attempt to solve the problem

Well, by now, I am rather tired, so I might need to double-check my solution later. However, even my correct attempt doesn't work. So in order to give you an idea of what I have been trying so far (among many other things), I share it immediately.

I tried the following. Each of the workers contains a member variable needsToResumeWork, that is atomically set to one whenever a new information has been added. Several times setting this member variable to one does not do harm, it is only important that the thread resumes work after the last information has been added.

In order to reduce work load for a thread calling g.addInformation(i) whenever an information i is found, instead of notifying all threads individually, the thread that enqueues the information (that is not necessarily the thread that called g.addInformation(i)) afterwards sets a member variable notifyAllFlag of g to one, which indicates that all threads need to be notified about the latest information.

Whenever a thread that has finished processing all information that had been enqueued calls the function g.notifyAll(), it checks whether the member variable notifyAllFlag is set to one. If so it tries to atomically compare g.allInformedFlag with 1 and swap with 0. If it could not write g.allInformedFlag it assumes some other thread has taken the responsibility to inform all threads. If this operation is successful, this thread has taken over responsibility to notify all threads and proceeds to do so by setting the member variable needsToResumeWorkFlag to one for every thread. Afterwards it atomically sets g.numThreadsReady and g.notifyAllFlag to zero, and g.allInformedFlag to 1.

type G struct {
  numThreads       int
  numThreadsReady      *uint32 // initialize to 0 somewhere appropriate
  notifyAllFlag        *uint32 // initialize to 0 somewhere appropriate
  allInformedFlag      *uint32 // initialize to 1 somewhere appropriate (1 is not a typo)

  // some data, that the workers need access to
}

// This function is thread-safe in so far as several
// workers can concurrently add information.
// 
// The function is optimized for heavy contention; most
// threads can leave almost immediately. One threads 
// cleans up any mess they leave behind (and even in 
// bad cases that is not too much).
func (g *G) addInformation(i infoType) {
  // Step 1: Make information available to all threads.
  // Step 2: Enqueue information at the end of some array.

  // Since the responsibility to enqueue an information may
  // be passed to another thread, it is important that the
  // last step is executed by the thread which enqueues the 
  // information(s) in order to ensure, that the information
  // successfully has been enqueued.

  // Step 3:
  atomic.StoreUint32(g.notifyAllFlag,1)        // all threads need to be notified
}

// If a new information has been added, we must ensure, 
// that every thread, that had finished, resumes work 
// and processes any newly added informations. 
func (g *G) notifyAll() {
  if atomic.LoadUint32(g.notifyAll) == 1 {
    // Somebody needs to notify all threads.
    if atomic.CompareAndSwapUint32(g.allInformedFlag, 1, 0) {
      // This thread has taken over the responsibility to inform
      // all other threads. All threads are hindered to access 
      // their member variable s.needsToResumeWorkFlag
      for j := range g.s {
        atomic.StoreUint32(g.s[j].needsToResumeWorkFlag, 1)
      }
      atomic.StoreUint32(g.notifyAllFlag, 0)
      atomic.StoreUint32(g.numThreadsReady, 0)
      atomic.StoreUint32(g.allInformedFlag, 1)
    } else {
      // Some other thread has taken responsibility to inform
      // all threads. 
  }
}

Whenever a thread finishes processing all information that had been enqueued, it checks whether it needs to resume work by atomically comparing its member variable needsToResumeWorkFlag with 1 and swapping with 0. However, since one of the threads is responsible to notify all others, it can not do so immediately.

First, it must call the function g.notifyAll(), and then it must check, whether the latest thread to call g.notifyAll() finished notifying all threads. Hence, after calling g.notifyAll() it must spin until g.allInformed is one, before it checks whether its member variable s.needsToResumeWorkFlag is one and in this case atomically sets it to be zero and resumes work. (I guess here is a mistake, but I also tried several other things here without success.) If s.needsToResumeWorkFlag is already zero, it atomically increments g.numThreadsReady by one, if it hasn't done so before. (Recall that g.numThreadsReady is reset during a function call to g.notifyAll().) then it atomically checks whether g.numThreadsReady is equal to g.numThreads, in which case it can leave (after sending a dummy-treasure to the channel). otherwise we start all over again until either this thread has been notified (possibly by itself) or all threads are ready.

type worker struct {
  // Each worker contains a pointer to g
  // such that it has access to its member
  // variables and is able to call the
  // function g.addInformation(i) as soon 
  // as it finds some information i.
  g    *G 

  // If new work has been added, the thread
  // is notified by setting the uint32 
  // at which needsToResumeWorkFlag points to 1.
  needsToResumeWorkFlag *uint32 // initialize to 0 somewhere appropriate

  // Also contains some other stuff. 
}

func (s *worker) FooInParallel(wg *sync.WaitGroup) {
  defer wg.Done()
  for {
    a := s.processAllInformation()

    numReadyIncremented := false
    for !s.needsToResumeWork() {
      if !numReadyIncremented {
        atomic.AddUint32(g.numThreadsReady,1)
        numReadyIncremented = true
      }
      if s.allThreadsReady() {
        return
      }
    }

  }
}

func (s *worker) needsToResumeWork() bool {
  s.notifyAll()
  for {
    if atomic.LoadUint32(g.allInformedFlag) == 1 {
      if atomic.CompareAndSwapUint32(s.needsToResumeWorkFlag, 1, 0) {
        return true
      } else {
        return false
      }
    }
  }
}

func (s *worker) notifyAll() {
  g.notifyAll()
}

func (g *G) allThreadsReady() bool {
  if atomic.LoadUint32(g.numThreadsReady) == g.numThreads {
    return true
  } else {
    return false
  }
}

As mentioned my solution doesn't work.

答案1

得分: 3

我自己找到了一个解决方案。我们利用了一个调用s.processAllInformation()的特性,即如果没有添加新信息,则不执行任何操作(而且这个操作很廉价)。关键是使用原子变量作为锁,用于每个线程通知所有线程(如果需要)并检查是否已被通知。然后,如果无法获取锁定,只需再次调用s.processAllInformation()。线程使用通知来检查是否需要增加就绪线程的计数器,而不是检查是否需要返回工作。

全局情况

type G struct {
  numThreads           int
  numThreadsReady      *uint32 // 在适当的位置初始化为0
  notifyAllFlag        *uint32 // 在适当的位置初始化为0
  allCanGoFlag         *uint32 // 在适当的位置初始化为0
  lock                 *uint32 // 在适当的位置初始化为0

  // 一些工作线程需要访问的数据
}

// 此函数是线程安全的,因为多个工作线程可以同时添加信息。
// 
// 该函数针对高争用情况进行了优化;大多数线程几乎可以立即离开。一个线程
// 清理其他线程留下的任何混乱(即使在糟糕的情况下,这也不会太多)。
func (g *G) addInformation(i infoType) {
  // 步骤1:使所有线程都可以访问信息。
  // 步骤2:将信息排队到某个数组的末尾。

  // 由于将信息排队的责任可能会传递给另一个线程,因此重要的是
  // 最后一步由将信息排队的线程执行,以确保信息成功排队。

  // 步骤3:
  atomic.StoreUint32(g.notifyAllFlag, 1)        // 所有线程都需要被通知
}

// 如果添加了新信息,我们必须确保每个已完成的线程都恢复工作并处理任何新添加的信息。
//
// 此函数不是线程安全的。确保不要同时调用此函数的多个线程
// 如果这些调用没有由某个锁保护。
func (g *G) notifyAll() {
  if atomic.LoadUint32(g.notifyAllFlag, 1) {    
    for j := range g.s {
      atomic.StoreUint32(g.s[j].needsToResumeWorkFlag, 1)
    }
    atomic.StoreUint32(g.notifyAllFlag, 0)
    atomic.StoreUint32(g.numThreadsReady, 0)
}

局部情况

type worker struct {
  // 每个工作线程包含一个指向g的指针
  // 以便可以访问其成员变量,并能够调用
  // 函数g.addInformation(i),一旦找到一些信息i。
  g    *G 

  // 如果添加了新工作,线程将通过将needsToResumeWorkFlag指向的uint32设置为1来收到通知。
  needsToResumeWorkFlag *uint32 // 在适当的位置初始化为0

  incrementedNumReadyFlag *uint32 // 在适当的位置初始化为0

  // 还包含一些其他内容。 
}

func (s *worker) FooInParallel(wg *sync.WaitGroup) {
  defer wg.Done()
  for {
    a := s.processAllInformation()

    if atomic.LoadUint32(s.g.allCanGoFlag, 1) {
      return
    }

    if atomic.CompareAndSwapUint32(g.lock, 0, 1) { // 如果可能,锁定。
      s.g.notifyAll() // 重要的是,这也要受到锁的保护。

      if atomic.LoadUint32(s.needsToResumeWorkFlag) == 1 {
        atomic.StoreUint32(s.needsToResumeWorkFlag, 0)

        // 找到了一些新信息,但是该线程不能确定
        // 它是否已经处理过。由于就绪线程的计数器已被重置,
        // 我们必须在以下迭代中的下一次调用processAllInformation()之后
        // 增加该计数器。
        atomic.StoreUint32(s.incrementedNumReadyFlag, 0)

      } else {

        // 如果该线程之前没有执行过此操作(自上次找到新信息以来),
        // 将就绪线程的数量增加一。
        if atomic.CompareAndSwapUint32(s.incrementedNumReadyFlag, 0, 1) {
          atomic.AddUint32(s.g.numThreadsReady, 1)
        }

        // 如果所有线程都准备就绪,给它们发送一个信号。
        if atomic.LoadUint32(s.g.numThreadsReady) == s.g.numThreads {
          atomic.StoreUint32(s.g.allCanGo, 1)
        }

      }

      atomic.StoreUint32(g.lock, 0) // 解锁。
    }

  }
}

稍后,我可能会为线程访问锁定添加一些顺序,以处理高争用情况,但目前这样就可以了。

英文:

I found a solution myself. We exploit, that a call to s.processAllInformation() does nothing, if no new information had been added (and is cheap). The trick is to use an atomic variable as a lock to both, for each thread to notify all if necessary and to check whether it has been notified. And then to simply call s.processAllInformation() again, if the lock can not be acquired. A thread then uses the notifications to check whether it has to increment the counter of ready threads, instead of to see whether it needs to return work.

Global situation

type G struct {
numThreads           int
numThreadsReady      *uint32 // initialize to 0 somewhere appropriate
notifyAllFlag        *uint32 // initialize to 0 somewhere appropriate
allCanGoFlag         *uint32 // initialize to 0 somewhere appropriate
lock                 *uint32 // initialize to 0 somewhere appropriate
// some data, that the workers need access to
}
// This function is thread-safe in so far as several
// workers can concurrently add information.
// 
// The function is optimized for heavy contention; most
// threads can leave almost immediately. One threads 
// cleans up any mess they leave behind (and even in 
// bad cases that is not too much).
func (g *G) addInformation(i infoType) {
// Step 1: Make information available to all threads.
// Step 2: Enqueue information at the end of some array.
// Since the responsibility to enqueue an information may
// be passed to another thread, it is important that the
// last step is executed by the thread which enqueues the 
// information(s) in order to ensure, that the information
// successfully has been enqueued.
// Step 3:
atomic.StoreUint32(g.notifyAllFlag,1)        // all threads need to be notified
}
// If a new information has been added, we must ensure, 
// that every thread, that had finished, resumes work 
// and processes any newly added informations. 
//
// This function is not thread-safe. Make sure not to 
// have several threads call this function concurrently
// if these calls are not guarded by some lock.
func (g *G) notifyAll() {
if atomic.LoadUint32(g.notifyAllFlag,1) {    
for j := range g.s {
atomic.StoreUint32(g.s[j].needsToResumeWorkFlag, 1)
}
atomic.StoreUint32(g.notifyAllFlag,0)
atomic.StoreUint32(g.numThreadsReady,0)
}

Local situation

type worker struct {
// Each worker contains a pointer to g
// such that it has access to its member
// variables and is able to call the
// function g.addInformation(i) as soon 
// as it finds some information i.
g    *G 
// If new work has been added, the thread
// is notified by setting the uint32 
// at which needsToResumeWorkFlag points to 1.
needsToResumeWorkFlag *uint32 // initialize to 0 somewhere appropriate
incrementedNumReadyFlag *uint32 // initialize to 0 somewhere appropriate
// Also contains some other stuff. 
}
func (s *worker) FooInParallel(wg *sync.WaitGroup) {
defer wg.Done()
for {
a := s.processAllInformation()
if atomic.LoadUint32(s.g.allCanGoFlag, 1) {
return
}
if atomic.CompareAndSwapUint32(g.lock,0,1) { // If possible, lock.
s.g.notifyAll() // It is essential, that this is also guarded by the lock.
if atomic.LoadUint32(s.needsToResumeWorkFlag) == 1 {
atomic.StoreUint32(s.needsToResumeWorkFlag,0)
// Some new information was found, and this thread can&#39;t be sure,
// whether it already has processed it. Since the counter for
// how many threads are ready had been reset, we must increment
// that counter after the next call processAllInformation() in the 
// following iteration.
atomic.StoreUint32(s.incrementedNumReadyFlag,0)
} else {
// Increment number of ready threads by one, if this thread had not 
// done this before (since the last newly found information).
if atomic.CompareAndSwapUint32(s.incrementedNumReadyFlag,0,1) {
atomic.AddUint32(s.g.numThreadsReady,1)
}
// If all threads are ready, give them all a signal.
if atomic.LoadUint32(s.g.numThreadsReady) == s.g.numThreads {
atomic.StoreUint32(s.g.allCanGo, 1)
}
}
atomic.StoreUint32(g.lock,0) // Unlock.
}
}
}

Later I may add some order for the threads to access to the lock under heavy contention, but for now that'll do.

huangapple
  • 本文由 发表于 2021年12月31日 05:03:48
  • 转载请务必保留本文链接:https://go.coder-hub.com/70536680.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定