“threads can only be started once” 当再次调用带有多线程装饰器的方法时。

huangapple go评论195阅读模式
英文:

"threads can only be started once" when calling a method again with a multi-threading deco

问题

我创建了一个装饰器(deco),用于在多线程中运行多个具有相同方法的任务。

import threading
from functools import wraps

class ThreadingMask:
    '''
    一个装饰器,用于运行具有列表作为第一个参数的动态方法
    将列表分割成多个列表(根据maxThreads),并在多线程中运行它们
    在定义下有一个示例
    '''
    def __init__(self, maxThreads: int = 64):
        self.maxThreads = maxThreads
        self.groups: list[list] = []
        self.threads = []

    def separate_groups(self, items):
        # 初始化组
        i = 0
        while i < self.maxThreads:
            self.groups.append([])
            i += 1
        i = 0
        # 分离组
        for item in items:
            if i == self.maxThreads:
                i = 0
            self.groups[i].append(item)
            i += 1

    def __call__(self, func):
        @wraps(func)
        def group_run(*args, **kwargs):
            if type(args[1]) is list:
                self.separate_groups(args[1])
            for group in self.groups:
                argsSep = list(args)
                argsSep[1] = group
                argsSep = tuple(argsSep)
                th = threading.Thread(target=func, args=argsSep, kwargs=kwargs)
                self.threads.append(th)
            for th in self.threads:
                th.start()
            for th in self.threads:
                th.join()
        return group_run
import time

class Test:
    '''一个示例'''

    def __init__(self):
        self.result = []
        self.lock = threading.Lock()

    @ThreadingMask()
    def runs(self, items, typeLimit, isNot="a"):
        for item in items:
            def get_matched():
                time.sleep(1)
                if type(item) is typeLimit and str(item) != str(isNot):
                    self.lock.acquire()
                    self.result.append(item)
                    self.lock.release()
            get_matched()

if __name__ == "__main__":
    start = time.time()
    a = Test()
    bunchOfItems = ["d", None, True, "", 1, 2, 3, "a", "b", "c", "this", "e", "d", 13, "p", 0.1, 11]
    a.runs(bunchOfItems, str, isNot="b")
    a.result.sort()
    print(a.result)
    b = Test()
    b.runs(bunchOfItems, str, isNot="b")

但是,当第二次调用具有此装饰器的方法时,会出现 RuntimeError: threads can only be started once。有没有一种方法可以在第二次调用方法时启动不同的线程,而不是由第一次调用启动的线程?

英文:

I created a deco to run a bunch of things with same method in multi-threads

import threading
from functools import wraps

class ThreadingMask:
    &#39;&#39;&#39;
    a deco to run dymastic method which has a list as first arg
    will separate the list into several lists (according to maxThreads) and run them with the method in multi-threads
    there is an example under defination
    &#39;&#39;&#39;
    def __init__(self, maxThreads: int = 64):
        self.maxThreads = maxThreads
        self.groups: list[list] = []
        self.threads = []

    def separate_groups(self, items):
        # init groups
        i = 0
        while i &lt; self.maxThreads:
            self.groups.append([])
            i += 1
        i = 0
        # separate groups
        for item in items:
            if i == self.maxThreads:
                i = 0
            self.groups[i].append(item)
            i += 1

    def __call__(self, func):
        @wraps(func)
        def group_run(*args, **kwargs):
            if type(args[1]) is list:
                self.separate_groups(args[1])
            for group in self.groups:
                argsSep = list(args)
                argsSep[1] = group
                argsSep = tuple(argsSep)
                th = threading.Thread(target=func, args=(argsSep), kwargs=(kwargs))
                self.threads.append(th)
            for th in self.threads:
                th.start()
            for th in self.threads:
                th.join()
        return group_run

import time
class Test:
    &#39;&#39;&#39;just a sample&#39;&#39;&#39;

    def __init__(self):
        self.result = []
        self.lock = threading.Lock()

    @ThreadingMask()
    def runs(self, items, typeLimit, isNot=&quot;a&quot;):
        for item in items:
            def get_matched():
                time.sleep(1)
                if type(item) is typeLimit and str(item) != str(isNot):
                    self.lock.acquire()
                    self.result.append(item)
                    self.lock.release()
            get_matched()


if __name__ == &quot;__main__&quot;:
    start = time.time()
    a = Test()
    bunchOfItems: list = [&quot;d&quot;, None, True, &quot;&quot;, 1, 2, 3, &quot;a&quot;, &quot;b&quot;, &quot;c&quot;, &quot;this&quot;, &quot;e&quot;, &quot;d&quot;, 13, &quot;p&quot;, 0.1, 11]
    a.runs(bunchOfItems, str, isNot=&quot;b&quot;)
    a.result.sort()
    print(a.result)
    b = Test()
    b.runs(bunchOfItems, str, isNot=&quot;b&quot;)

But RuntimeError: threads can only be started once occurs when the second time call the method with this deco
Is there a way that can start different threads when second time calling the method than the ones started by the first calling?

答案1

得分: 1

你需要在装饰器对象(ThreadingMask)上创建线程的新实例,以便在单独的线程中运行,而不是尝试两次启动相同的线程(你正在使用相同的self.threads对象,而你应该在函数上创建新线程)。

修改后的代码示例:

import threading
import time
from functools import wraps

class ThreadingMask:
    '''
    a deco to run dynamic method which has a list as the first arg
    will separate the list into several lists (according to maxThreads) and run them with the method in multi-threads
    there is an example under definition
    '''
    def __init__(self, maxThreads: int = 64):
        self.maxThreads = maxThreads

    def separate_groups(self, items):
        # init groups
        groups = [[] for _ in range(self.maxThreads)]
        # separate groups
        for i, item in enumerate(items):
            groups[i % self.maxThreads].append(item)
        return groups

    def __call__(self, func):
        @wraps(func)
        def group_run(*args, **kwargs):
            if type(args[1]) is list:
                groups = self.separate_groups(args[1])
            else:
                groups = [[]]
            threads = []
            for group in groups:
                args_sep = list(args)
                args_sep[1] = group
                args_sep = tuple(args_sep)
                th = threading.Thread(target=func, args=args_sep, kwargs=kwargs)
                threads.append(th)
            for th in threads:
                th.start()
            for th in threads:
                th.join()
        return group_run

class Test:
    '''just a sample'''

    def __init__(self):
        self.result = []
        self.lock = threading.Lock()

    @ThreadingMask()
    def runs(self, items, typeLimit, isNot="a"):
        for item in items:
            def get_matched():
                time.sleep(1)
                if type(item) is typeLimit and str(item) != str(isNot):
                    with self.lock:
                        self.result.append(item)
            get_matched()

if __name__ == "__main__":
    start = time.time()
    a = Test()
    b = Test()

    bunchOfItems = ["d", None, True, "", 1, 2, 3, "a", "b", "c", "this", "e", "d", 13, "p", 0.1, 11]
    a.runs(bunchOfItems, str, isNot="b")
    b.runs(bunchOfItems, str, isNot="b")

    a.result.sort()
    b.result.sort()
    print(a.result)
    print(b.result)

这是修改后的代码示例。

英文:

You have to create a new instance of your threads on decorator object (ThreadingMask) to run in separate threads, instead you try to launch the same thread twice which you can't (you were using the same self.threads object that you were stocking, instead you should create new thread on function)

Modified code sample :

import threading
import time
from functools import wraps
class ThreadingMask:
&#39;&#39;&#39;
a deco to run dynamic method which has a list as the first arg
will separate the list into several lists (according to maxThreads) and run them with the method in multi-threads
there is an example under definition
&#39;&#39;&#39;
def __init__(self, maxThreads: int = 64):
self.maxThreads = maxThreads
def separate_groups(self, items):
# init groups
groups = [[] for _ in range(self.maxThreads)]
# separate groups
for i, item in enumerate(items):
groups[i % self.maxThreads].append(item)
return groups
def __call__(self, func):
@wraps(func)
def group_run(*args, **kwargs):
if type(args[1]) is list:
groups = self.separate_groups(args[1])
else:
groups = [[]]
threads = []
for group in groups:
args_sep = list(args)
args_sep[1] = group
args_sep = tuple(args_sep)
th = threading.Thread(target=func, args=args_sep, kwargs=kwargs)
threads.append(th)
for th in threads:
th.start()
for th in threads:
th.join()
return group_run
class Test:
&#39;&#39;&#39;just a sample&#39;&#39;&#39;
def __init__(self):
self.result = []
self.lock = threading.Lock()
@ThreadingMask()
def runs(self, items, typeLimit, isNot=&quot;a&quot;):
for item in items:
def get_matched():
time.sleep(1)
if type(item) is typeLimit and str(item) != str(isNot):
with self.lock:
self.result.append(item)
get_matched()
if __name__ == &quot;__main__&quot;:
start = time.time()
a = Test()
b = Test()
bunchOfItems: list = [&quot;d&quot;, None, True, &quot;&quot;, 1, 2, 3, &quot;a&quot;, &quot;b&quot;, &quot;c&quot;, &quot;this&quot;, &quot;e&quot;, &quot;d&quot;, 13, &quot;p&quot;, 0.1, 11]
a.runs(bunchOfItems, str, isNot=&quot;b&quot;)
b.runs(bunchOfItems, str, isNot=&quot;b&quot;)
a.result.sort()
b.result.sort()
print(a.result)
print(b.result)

答案2

得分: 0

我认为我得到了答案,我没有在每次调用时重置self.threads = []。

根据[https://stackoverflow.com/questions/27575954/python-decorator-with-arguments-only-called-once],应该位于包装函数group_run之下。

def __call__(self, func):
    @wraps(func)
    def group_run(*args, **kwargs):
        self.threads = []
        if type(args[1]) is list:
            self.separate_groups(args[1])
        for group in self.groups:
            argsSep = list(args)
            argsSep[1] = group
            argsSep = tuple(argsSep)
            th = threading.Thread(target=func, daemon=True, args=argsSep, kwargs=kwargs)
            self.threads.append(th)
        for th in self.threads:
            th.start()
        for th in self.threads:
            th.join()
    return group_run
英文:

I think I got the answer, I didn't reset self.threads = [] at each calling.

And according to [https://stackoverflow.com/questions/27575954/python-decorator-with-arguments-only-called-once], it should be under the wrapped func group_run

    def __call__(self, func):
@wraps(func)
def group_run(*args, **kwargs):
self.threads = []
if type(args[1]) is list:
self.separate_groups(args[1])
for group in self.groups:
argsSep = list(args)
argsSep[1] = group
argsSep = tuple(argsSep)
th = threading.Thread(target=func, daemon=True, args=(argsSep), kwargs=(kwargs))
self.threads.append(th)
for th in self.threads:
th.start()
for th in self.threads:
th.join()
return group_run

huangapple
  • 本文由 发表于 2023年6月2日 11:38:30
  • 转载请务必保留本文链接:https://go.coder-hub.com/76386972.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定