“threads can only be started once” 当再次调用带有多线程装饰器的方法时。

huangapple go评论220阅读模式
英文:

"threads can only be started once" when calling a method again with a multi-threading deco

问题

我创建了一个装饰器(deco),用于在多线程中运行多个具有相同方法的任务。

  1. import threading
  2. from functools import wraps
  3. class ThreadingMask:
  4. '''
  5. 一个装饰器,用于运行具有列表作为第一个参数的动态方法
  6. 将列表分割成多个列表(根据maxThreads),并在多线程中运行它们
  7. 在定义下有一个示例
  8. '''
  9. def __init__(self, maxThreads: int = 64):
  10. self.maxThreads = maxThreads
  11. self.groups: list[list] = []
  12. self.threads = []
  13. def separate_groups(self, items):
  14. # 初始化组
  15. i = 0
  16. while i < self.maxThreads:
  17. self.groups.append([])
  18. i += 1
  19. i = 0
  20. # 分离组
  21. for item in items:
  22. if i == self.maxThreads:
  23. i = 0
  24. self.groups[i].append(item)
  25. i += 1
  26. def __call__(self, func):
  27. @wraps(func)
  28. def group_run(*args, **kwargs):
  29. if type(args[1]) is list:
  30. self.separate_groups(args[1])
  31. for group in self.groups:
  32. argsSep = list(args)
  33. argsSep[1] = group
  34. argsSep = tuple(argsSep)
  35. th = threading.Thread(target=func, args=argsSep, kwargs=kwargs)
  36. self.threads.append(th)
  37. for th in self.threads:
  38. th.start()
  39. for th in self.threads:
  40. th.join()
  41. return group_run
  1. import time
  2. class Test:
  3. '''一个示例'''
  4. def __init__(self):
  5. self.result = []
  6. self.lock = threading.Lock()
  7. @ThreadingMask()
  8. def runs(self, items, typeLimit, isNot="a"):
  9. for item in items:
  10. def get_matched():
  11. time.sleep(1)
  12. if type(item) is typeLimit and str(item) != str(isNot):
  13. self.lock.acquire()
  14. self.result.append(item)
  15. self.lock.release()
  16. get_matched()
  17. if __name__ == "__main__":
  18. start = time.time()
  19. a = Test()
  20. bunchOfItems = ["d", None, True, "", 1, 2, 3, "a", "b", "c", "this", "e", "d", 13, "p", 0.1, 11]
  21. a.runs(bunchOfItems, str, isNot="b")
  22. a.result.sort()
  23. print(a.result)
  24. b = Test()
  25. b.runs(bunchOfItems, str, isNot="b")

但是,当第二次调用具有此装饰器的方法时,会出现 RuntimeError: threads can only be started once。有没有一种方法可以在第二次调用方法时启动不同的线程,而不是由第一次调用启动的线程?

英文:

I created a deco to run a bunch of things with same method in multi-threads

  1. import threading
  2. from functools import wraps
  3. class ThreadingMask:
  4. &#39;&#39;&#39;
  5. a deco to run dymastic method which has a list as first arg
  6. will separate the list into several lists (according to maxThreads) and run them with the method in multi-threads
  7. there is an example under defination
  8. &#39;&#39;&#39;
  9. def __init__(self, maxThreads: int = 64):
  10. self.maxThreads = maxThreads
  11. self.groups: list[list] = []
  12. self.threads = []
  13. def separate_groups(self, items):
  14. # init groups
  15. i = 0
  16. while i &lt; self.maxThreads:
  17. self.groups.append([])
  18. i += 1
  19. i = 0
  20. # separate groups
  21. for item in items:
  22. if i == self.maxThreads:
  23. i = 0
  24. self.groups[i].append(item)
  25. i += 1
  26. def __call__(self, func):
  27. @wraps(func)
  28. def group_run(*args, **kwargs):
  29. if type(args[1]) is list:
  30. self.separate_groups(args[1])
  31. for group in self.groups:
  32. argsSep = list(args)
  33. argsSep[1] = group
  34. argsSep = tuple(argsSep)
  35. th = threading.Thread(target=func, args=(argsSep), kwargs=(kwargs))
  36. self.threads.append(th)
  37. for th in self.threads:
  38. th.start()
  39. for th in self.threads:
  40. th.join()
  41. return group_run
  1. import time
  2. class Test:
  3. &#39;&#39;&#39;just a sample&#39;&#39;&#39;
  4. def __init__(self):
  5. self.result = []
  6. self.lock = threading.Lock()
  7. @ThreadingMask()
  8. def runs(self, items, typeLimit, isNot=&quot;a&quot;):
  9. for item in items:
  10. def get_matched():
  11. time.sleep(1)
  12. if type(item) is typeLimit and str(item) != str(isNot):
  13. self.lock.acquire()
  14. self.result.append(item)
  15. self.lock.release()
  16. get_matched()
  17. if __name__ == &quot;__main__&quot;:
  18. start = time.time()
  19. a = Test()
  20. bunchOfItems: list = [&quot;d&quot;, None, True, &quot;&quot;, 1, 2, 3, &quot;a&quot;, &quot;b&quot;, &quot;c&quot;, &quot;this&quot;, &quot;e&quot;, &quot;d&quot;, 13, &quot;p&quot;, 0.1, 11]
  21. a.runs(bunchOfItems, str, isNot=&quot;b&quot;)
  22. a.result.sort()
  23. print(a.result)
  24. b = Test()
  25. b.runs(bunchOfItems, str, isNot=&quot;b&quot;)

But RuntimeError: threads can only be started once occurs when the second time call the method with this deco
Is there a way that can start different threads when second time calling the method than the ones started by the first calling?

答案1

得分: 1

你需要在装饰器对象(ThreadingMask)上创建线程的新实例,以便在单独的线程中运行,而不是尝试两次启动相同的线程(你正在使用相同的self.threads对象,而你应该在函数上创建新线程)。

修改后的代码示例:

  1. import threading
  2. import time
  3. from functools import wraps
  4. class ThreadingMask:
  5. '''
  6. a deco to run dynamic method which has a list as the first arg
  7. will separate the list into several lists (according to maxThreads) and run them with the method in multi-threads
  8. there is an example under definition
  9. '''
  10. def __init__(self, maxThreads: int = 64):
  11. self.maxThreads = maxThreads
  12. def separate_groups(self, items):
  13. # init groups
  14. groups = [[] for _ in range(self.maxThreads)]
  15. # separate groups
  16. for i, item in enumerate(items):
  17. groups[i % self.maxThreads].append(item)
  18. return groups
  19. def __call__(self, func):
  20. @wraps(func)
  21. def group_run(*args, **kwargs):
  22. if type(args[1]) is list:
  23. groups = self.separate_groups(args[1])
  24. else:
  25. groups = [[]]
  26. threads = []
  27. for group in groups:
  28. args_sep = list(args)
  29. args_sep[1] = group
  30. args_sep = tuple(args_sep)
  31. th = threading.Thread(target=func, args=args_sep, kwargs=kwargs)
  32. threads.append(th)
  33. for th in threads:
  34. th.start()
  35. for th in threads:
  36. th.join()
  37. return group_run
  38. class Test:
  39. '''just a sample'''
  40. def __init__(self):
  41. self.result = []
  42. self.lock = threading.Lock()
  43. @ThreadingMask()
  44. def runs(self, items, typeLimit, isNot="a"):
  45. for item in items:
  46. def get_matched():
  47. time.sleep(1)
  48. if type(item) is typeLimit and str(item) != str(isNot):
  49. with self.lock:
  50. self.result.append(item)
  51. get_matched()
  52. if __name__ == "__main__":
  53. start = time.time()
  54. a = Test()
  55. b = Test()
  56. bunchOfItems = ["d", None, True, "", 1, 2, 3, "a", "b", "c", "this", "e", "d", 13, "p", 0.1, 11]
  57. a.runs(bunchOfItems, str, isNot="b")
  58. b.runs(bunchOfItems, str, isNot="b")
  59. a.result.sort()
  60. b.result.sort()
  61. print(a.result)
  62. print(b.result)

这是修改后的代码示例。

英文:

You have to create a new instance of your threads on decorator object (ThreadingMask) to run in separate threads, instead you try to launch the same thread twice which you can't (you were using the same self.threads object that you were stocking, instead you should create new thread on function)

Modified code sample :

  1. import threading
  2. import time
  3. from functools import wraps
  4. class ThreadingMask:
  5. &#39;&#39;&#39;
  6. a deco to run dynamic method which has a list as the first arg
  7. will separate the list into several lists (according to maxThreads) and run them with the method in multi-threads
  8. there is an example under definition
  9. &#39;&#39;&#39;
  10. def __init__(self, maxThreads: int = 64):
  11. self.maxThreads = maxThreads
  12. def separate_groups(self, items):
  13. # init groups
  14. groups = [[] for _ in range(self.maxThreads)]
  15. # separate groups
  16. for i, item in enumerate(items):
  17. groups[i % self.maxThreads].append(item)
  18. return groups
  19. def __call__(self, func):
  20. @wraps(func)
  21. def group_run(*args, **kwargs):
  22. if type(args[1]) is list:
  23. groups = self.separate_groups(args[1])
  24. else:
  25. groups = [[]]
  26. threads = []
  27. for group in groups:
  28. args_sep = list(args)
  29. args_sep[1] = group
  30. args_sep = tuple(args_sep)
  31. th = threading.Thread(target=func, args=args_sep, kwargs=kwargs)
  32. threads.append(th)
  33. for th in threads:
  34. th.start()
  35. for th in threads:
  36. th.join()
  37. return group_run
  38. class Test:
  39. &#39;&#39;&#39;just a sample&#39;&#39;&#39;
  40. def __init__(self):
  41. self.result = []
  42. self.lock = threading.Lock()
  43. @ThreadingMask()
  44. def runs(self, items, typeLimit, isNot=&quot;a&quot;):
  45. for item in items:
  46. def get_matched():
  47. time.sleep(1)
  48. if type(item) is typeLimit and str(item) != str(isNot):
  49. with self.lock:
  50. self.result.append(item)
  51. get_matched()
  52. if __name__ == &quot;__main__&quot;:
  53. start = time.time()
  54. a = Test()
  55. b = Test()
  56. bunchOfItems: list = [&quot;d&quot;, None, True, &quot;&quot;, 1, 2, 3, &quot;a&quot;, &quot;b&quot;, &quot;c&quot;, &quot;this&quot;, &quot;e&quot;, &quot;d&quot;, 13, &quot;p&quot;, 0.1, 11]
  57. a.runs(bunchOfItems, str, isNot=&quot;b&quot;)
  58. b.runs(bunchOfItems, str, isNot=&quot;b&quot;)
  59. a.result.sort()
  60. b.result.sort()
  61. print(a.result)
  62. print(b.result)

答案2

得分: 0

我认为我得到了答案,我没有在每次调用时重置self.threads = []。

根据[https://stackoverflow.com/questions/27575954/python-decorator-with-arguments-only-called-once],应该位于包装函数group_run之下。

  1. def __call__(self, func):
  2. @wraps(func)
  3. def group_run(*args, **kwargs):
  4. self.threads = []
  5. if type(args[1]) is list:
  6. self.separate_groups(args[1])
  7. for group in self.groups:
  8. argsSep = list(args)
  9. argsSep[1] = group
  10. argsSep = tuple(argsSep)
  11. th = threading.Thread(target=func, daemon=True, args=argsSep, kwargs=kwargs)
  12. self.threads.append(th)
  13. for th in self.threads:
  14. th.start()
  15. for th in self.threads:
  16. th.join()
  17. return group_run
英文:

I think I got the answer, I didn't reset self.threads = [] at each calling.

And according to [https://stackoverflow.com/questions/27575954/python-decorator-with-arguments-only-called-once], it should be under the wrapped func group_run

  1. def __call__(self, func):
  2. @wraps(func)
  3. def group_run(*args, **kwargs):
  4. self.threads = []
  5. if type(args[1]) is list:
  6. self.separate_groups(args[1])
  7. for group in self.groups:
  8. argsSep = list(args)
  9. argsSep[1] = group
  10. argsSep = tuple(argsSep)
  11. th = threading.Thread(target=func, daemon=True, args=(argsSep), kwargs=(kwargs))
  12. self.threads.append(th)
  13. for th in self.threads:
  14. th.start()
  15. for th in self.threads:
  16. th.join()
  17. return group_run

huangapple
  • 本文由 发表于 2023年6月2日 11:38:30
  • 转载请务必保留本文链接:https://go.coder-hub.com/76386972.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定