有没有办法让协程通道在接收时按特定顺序而不是先进先出?

huangapple go评论107阅读模式
英文:

Is there any way to make coroutine channel follow a specific order rather than first-in first-out during receiving

问题

我有一些任务,需要执行三个步骤,第一和第三步必须按顺序运行,只有第二步可以并发运行,使用 Kotlin 的 Channel 以先进先出的顺序,我无法始终让第三步按顺序运行,是否有任何实现可以满足我的目的,或者有其他的数据结构?

以下是我的示范代码:

import android.util.Log
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch
import kotlin.random.Random

class SortingReceiveChannelTest(
    private val coroutineScope: CoroutineScope
) {
    private val tasks = List(30) { Task(it) }

    private val randomDelay = Random(Int.MAX_VALUE)
    private val step2Channel = Channel<Task>(2)
    private val step3Channel = Channel<Task>()

    fun start() {
        coroutineScope.launch(Dispatchers.IO) {
            for (task in tasks) {
                task.step1()
                step2Channel.send(task)
            }
        }

        coroutineScope.launch(Dispatchers.IO) {
            repeat(tasks.size) {
                val task = step2Channel.receive()
                coroutineScope.launch(Dispatchers.IO) {
                    task.step2()
                    step3Channel.send(task)
                }
            }
        }

        coroutineScope.launch(Dispatchers.IO) {
            repeat(tasks.size) {
                val task = step3Channel.receive()
                task.step3()
            }
        }
    }

    inner class Task(val index: Int) {
        fun step1() {
            Log.i(TAG, "执行步骤1[$index] 顺序执行")
            // 模拟长时间运行的任务
            Thread.sleep(randomDelay.nextLong(100, 200))
        }

        fun step2() {
            Log.i(TAG, "执行步骤2[$index] 并发执行")
            if (index == 2 || index == 5 || index == 8 || index == 16) {
                // 使特定任务运行时间较长以混淆步骤3的顺序
                Thread.sleep(randomDelay.nextLong(2000, 5000))
            } else {
                Thread.sleep(randomDelay.nextLong(100, 300))
            }
        }

        fun step3() {
            Log.i(TAG, "执行步骤3[$index] 顺序执行")
            Thread.sleep(randomDelay.nextLong(100, 300))
        }
    }

    companion object {
        private const val TAG = "SortChannel"
    }
}
英文:

I have some tasks which along with three step to execute, the first&third steps must run in sequentially, only the second step run in concurrently, giving the kotlin Channel followed first-in first-out order, I can't always make the step3 run in sequentially, does any imlementation can fit my purpose, or another data structure?

below is my demonstrate code:

import android.util.Log
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch
import kotlin.random.Random

class SortingReceiveChannelTest(
    private val coroutineScope: CoroutineScope
) {
    private val tasks = List(30) { Task(it) }

    private val randomDelay = Random(Int.MAX_VALUE)
    private val step2Channel = Channel&lt;Task&gt;(2)
    private val step3Channel = Channel&lt;Task&gt;()

    fun start() {
        coroutineScope.launch(Dispatchers.IO) {
            for (task in tasks) {
                task.step1()
                step2Channel.send(task)
            }
        }

        coroutineScope.launch(Dispatchers.IO) {
            repeat(tasks.size) {
                val task = step2Channel.receive()
                coroutineScope.launch(Dispatchers.IO) {
                    task.step2()
                    step3Channel.send(task)
                }
            }
        }

        coroutineScope.launch(Dispatchers.IO) {
            repeat(tasks.size) {
                val task = step3Channel.receive()
                task.step3()
            }
        }
    }

    inner class Task(val index: Int) {
        fun step1() {
            Log.i(TAG, &quot;execute step1[$index] sequential&quot;)
            // sleep to simulating a long run task
            Thread.sleep(randomDelay.nextLong(100, 200))
//            Log.i(TAG, &quot;step1[$index] executed&quot;)
        }

        fun step2() {
            Log.i(TAG, &quot;execute step2[$index] concurrently&quot;)
            if (index == 2 || index == 5 || index == 8 || index == 16) {
                // make specific tasks run longer to mess the step3&#39;s order
                Thread.sleep(randomDelay.nextLong(2000, 5000))
            } else {
                Thread.sleep(randomDelay.nextLong(100, 300))
            }
//            Log.i(TAG, &quot;step2[$index] executed&quot;)
        }

        fun step3() {
            Log.i(TAG, &quot;execute step3[$index] sequential&quot;)
            Thread.sleep(randomDelay.nextLong(100, 300))
//            Log.i(TAG, &quot;step3[$index] executed&quot;)
        }
    }

    companion object {
        private const val TAG = &quot;SortChannel&quot;
    }
}

答案1

得分: 3

如果我理解正确:

  • 步骤1、2和3必须按顺序进行
  • 步骤1和3必须按照它们在源列表中的顺序,依次处理任务列表中的每个项目。

一种方法是消除中间的通道,并将次要作业和任务一起发送到最终通道:

class SortingReceiveChannelTest(
    private val coroutineScope: CoroutineScope
) {
    private val tasks = List(30) { Task(it) }

    private val randomDelay = Random(Int.MAX_VALUE)
    private val step3Channel = Channel<Pair<Job, Task>>(capacity = tasks.size)

    fun start() {
        coroutineScope.launch(Dispatchers.IO) {
            for (task in tasks) {
                task.step1()
                val step2Job = coroutineScope.launch { task.step2() }
                step3Channel.send(step2Job to task)
            }
        }

        coroutineScope.launch(Dispatchers.IO) {
            repeat(tasks.size) {
                val (step2Job, task) = step3Channel.receive()
                step2Job.join()
                task.step3()
            }
        }
    }

    //...
}

请注意,我已经将HTML编码中的&lt;&gt;替换为了正常的尖括号,以便代码更加清晰。

英文:

If I understand correctly:

  • Steps 1, 2, and 3 must happen sequentially
  • Steps 1 and 3 must be processed sequentially for each of the items of the task list in the same order they were in the source list.

One way to do this is to eliminate the intermediary Channel and instead sending both the secondary job and the task together to the final Channel:

class SortingReceiveChannelTest(
private val coroutineScope: CoroutineScope
) {
private val tasks = List(30) { Task(it) }
private val randomDelay = Random(Int.MAX_VALUE)
private val step3Channel = Channel&lt;Pair&lt;Job, Task&gt;&gt;(capacity = tasks.size)
fun start() {
coroutineScope.launch(Dispatchers.IO) {
for (task in tasks) {
task.step1()
val step2Job = coroutineScope.launch { task.step2() }
step3Channel.send(step2Job to task)
}
}
coroutineScope.launch(Dispatchers.IO) {
repeat(tasks.size) {
val (step2Job, task) = step3Channel.receive()
step2Job.join()
task.step3()
}
}
}
//...
}

答案2

得分: 0

受到Tenfour04的回答启发,我稍作调整,添加了Semaphore以防止step1运行过快,因为在实际情况下可能会占用大量内存。

import kotlinx.coroutines.sync.Semaphore

class SortingReceiveChannelTest(
    private val coroutineScope: CoroutineScope
) {
    private val tasks = List(30) { Task(it) }

    private val randomDelay = Random(Int.MAX_VALUE)
    private val step3Channel = Channel<Pair<Job, Task>>(capacity = tasks.size)

    fun start() {
        val semaphore = Semaphore(5)
        coroutineScope.launch(Dispatchers.IO) {
            for (task in tasks) {
                semaphore.acquire()
                task.step1()
                step3Channel.send(launch {
                    task.step2()
                    semaphore.release()
                } to task)
            }
        }

        coroutineScope.launch(Dispatchers.IO) {
            repeat(tasks.size) {
                val (step2Job, task) = step3Channel.receive()
                step2Job.join()
                task.step3()
            }
        }
    }

    //...
}

这是您提供的Kotlin代码的翻译部分。

英文:

inspired from Tenfour04's anwser, I tweak a bit, added Semaphore to prevent step1 run too fast as it probably claim many memory to use in product case

import kotlinx.coroutines.sync.Semaphore

class SortingReceiveChannelTest(
    private val coroutineScope: CoroutineScope
) {
    private val tasks = List(30) { Task(it) }

    private val randomDelay = Random(Int.MAX_VALUE)
    private val step3Channel = Channel&lt;Pair&lt;Job, Task&gt;&gt;(capacity = tasks.size)

    fun start() {
        val semaphore = Semaphore(5)
        coroutineScope.launch(Dispatchers.IO) {
            for (task in tasks) {
                semaphore.acquire()
                task.step1()
                step3Channel.send(launch {
                    task.step2()
                    semaphore.release()
                } to task)
            }
        }

        coroutineScope.launch(Dispatchers.IO) {
            repeat(tasks.size) {
                val (step2Job, task) = step3Channel.receive()
                step2Job.join()
                task.step3()
            }
        }
    }

    //...
}

huangapple
  • 本文由 发表于 2023年7月13日 22:37:41
  • 转载请务必保留本文链接:https://go.coder-hub.com/76680603.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定