如何在Django中使用Celery创建任务?POO问题?

huangapple go评论73阅读模式
英文:

How create task with celery in django? POO issue?

问题

我正在尝试使用Django和Celery设置任务。Celery和Django的配置没有问题,没有关于它们的报告。然而,我认为问题出在我的面向对象编程(OOP)代码的编写上。我无法确定问题出在哪里。这是一个参数问题,delay()需要4个参数,但我的方法只需要3个参数。以下是我的文件和我尝试的操作,以及错误跟踪。

我的文件

helpers.py

我想为run_test_optimization方法创建一个任务。

class MachineLearniaXHelpers(MagicModels):
    def __init__(self, data_dict, target_name, type_model, test_size=0.33, random_state=42, **kwargs):
        super().__init__()
        self.test_size = test_size
        self.random_state = random_state
        self.process = DataProcessor(
            data=data_dict,
            target_name=target_name
        )
        self.metrics = MetricsScorer(type_model=type_model)
        self.model = self.several_algorythme(type_model, **kwargs)
    
    @staticmethod
    def split_data(data_dict):
        return train_test_split(*data_dict)
        
    @staticmethod   
    def train_model(model, X, y):
        model.fit(X, y)
    
    def run_test_optimization(self):
        dict_result = []
        for model in self.model:
            feature_model, values = self.process.transform_dict_to_array_structure()
            x_train, x_test, y_train, y_test = self.split_data(values)
            self.train_model(model, x_train, y_train)
            dict_metrics_train = self.metrics.choice_metrics_by_type_model(y_train, model.predict(x_train))
            dict_metrics_test = self.metrics.choice_metrics_by_type_model(y_test, model.predict(x_test))
            dict_result.append({
                "name_model": model.__class__.__name__,
                "features_model": feature_model,
                "train_performances": dict_metrics_train,
                "test_performances": dict_metrics_test
            })
        return dict_result

tasks.py

创建我的任务task_run_test_optimization

from celery import shared_task
from .helpers import MachineLearniaXHelpers

@shared_task
def task_run_test_optimization(data_dict, target_name, type_model):
    constructor = MachineLearniaXHelpers(
        data_dict,
        target_name,
        type_model
    )
    dict_result = constructor.run_test_optimization()
    return dict_result

views.py

API的POST方法中,任务方法在这里是task_run_test_optimization

class MachineLearningXView(APIView):
    serializer_class = BuildModelMLSerializer
    permission_classes = [IsAuthenticated, UserPermissionMachineLearniaX]

    def post(self, request):
        if not self.request.session.exists(self.request.session.session_key):
            self.request.session.create()
        
        serializer = self.serializer_class(data=request.data)
        if serializer is_valid():
            data = request.data.get('data')
            target = serializer.data.get('target_name')
            type_model = serializer.data.get('test_type')
            result = task_run_test_optimization.delay(data, target, type_model)  # 问题出在这里
            print(result)
            return Response(status=status.HTTP_200_OK)

结果在控制台中返回了一个ID,POST请求成功:

System check identified no issues (0 silenced).
February 08, 2023 - 17:25:28
Django version 4.1.5, using settings 'ialab.settings'
Starting development server at http://127.0.0.1:8000/
Quit the server with CTRL-BREAK.
829e66ee-925f-4aff-8c42-49420e5758ce  # <-- 打印的结果在这里
[08/Feb/2023 17:25:47] "POST /api/machinelearniax HTTP/1.1" 200 0

但是在我的Celery worker中,出现了以下错误:

[2023-02-08 17:25:47,813: INFO/MainProcess] Task api.tasks.task_run_test_optimization[829e66ee-925f-4aff-8c42-49420e5758ce] received
[2023-02-08 17:25:47,897: ERROR/MainProcess] Task api.tasks.task_run_test_optimization[829e66ee-925f-4aff-8c42-49420e5758ce] raised unexpected: TypeError('task_run_test_optimization() takes 3 positional arguments but 4 were given')
Traceback (most recent call last):
  File "C:\Users\basti\Desktop\IALab\ialab\.ialabenv\lib\site-packages\celery\app\trace.py", line 451, in trace_task
    R = retval = fun(*args, **kwargs)
  File "C:\Users\basti\Desktop\IALab\ialab\.ialabenv\lib\site-packages\celery\app\trace.py", line 734, in __protected_call__
    return self.run(*args, **kwargs)
TypeError: task_run_test_optimization() takes 3 positional arguments but 4 were given

任务已接收但无法正常工作。

我的错误在哪里?

英文:

I'm trying to set up a task with Django and Celery. The Celery and Django configuration is okay, nothing to report on that side. However, I have a problem, I think with the writing, of my code in OOP. I can't locate where the problem is. It's an argument problem, 4 arguments are expected by delay(), but my method only expects 3. Here are the files and what I tried to do, as well as the error trace.

My files

helpers.py

I want to create a task for run_test_optimization method

class MachineLearniaXHelpers(MagicModels):
    def __init__(self, data_dict, target_name, type_model, test_size = 0.33, random_state = 42, **kwargs) -&gt; None:
        super().__init__()
        self.test_size = test_size
        self.random_state = random_state
        self.process = DataProcessor(
            data=data_dict,
            target_name=target_name
        )
        self.metrics = MetricsScorer(type_model=type_model)
        self.model = self.several_algorythme(type_model, **kwargs)
    
    @staticmethod
    def split_data(data_dict):
        return train_test_split(*data_dict)
        
    @staticmethod   
    def train_model(model, X, y):
        model.fit(X, y)
    
    def run_test_optimization(self):
        dict_result = []
        for model in self.model:
            feature_model, values = self.process.transform_dict_to_array_structure()
            x_train, x_test, y_train, y_test = self.split_data(values)
            self.train_model(model, x_train, y_train)
            dict_metrics_train = self.metrics.choice_metrics_by_type_model(y_train, model.predict(x_train))
            dict_metrics_test = self.metrics.choice_metrics_by_type_model(y_test, model.predict(x_test))
            dict_result.append({
                &quot;name_model&quot; : model.__class__.__name__,
                &quot;features_model&quot; : feature_model,
                &quot;train_performances&quot; : dict_metrics_train,
                &quot;test_performances&quot; : dict_metrics_test
            })
        return dict_result

tasks.py

create my task task_run_test_optimization

from celery import shared_task
from .helpers import MachineLearniaXHelpers

@shared_task
def task_run_test_optimization(data_dict, target_name, type_model):
    constructor = MachineLearniaXHelpers(
        data_dict,
        target_name,
        type_model
        )
    dict_result = constructor.run_test_optimization()
    return dict_result

views.py

API post method, the task method is here task_run_test_optimization

class MachineLearningXView(APIView):
    serializer_class = BuildModelMLSerializer
    permission_classes = [IsAuthenticated, UserPermissionMachineLearniaX]


    def post(self, request):
        if not self.request.session.exists(self.request.session.session_key):
            self.request.session.create()
        
        serializer = self.serializer_class(data=request.data)
        if serializer.is_valid():
            data = request.data.get(&#39;data&#39;)
            target = serializer.data.get(&#39;target_name&#39;)
            type_model = serializer.data.get(&#39;test_type&#39;)
            result = task_run_test_optimization.delay(data, target, type_model) # probleme is here
            print(result)
            return Response(status=status.HTTP_200_OK)

result send me an id in my console, the post request is ok:

System check identified no issues (0 silenced).
February 08, 2023 - 17:25:28
Django version 4.1.5, using settings &#39;ialab.settings&#39;
Starting development server at http://127.0.0.1:8000/
Quit the server with CTRL-BREAK.
829e66ee-925f-4aff-8c42-49420e5758ce &lt;-- PRINT result IS HERE
[08/Feb/2023 17:25:47] &quot;POST /api/machinelearniax HTTP/1.1&quot; 200 0

But in my celery worker, i'have this error:

[2023-02-08 17:25:47,813: INFO/MainProcess] Task api.tasks.task_run_test_optimization[829e66ee-925f-4aff-8c42-49420e5758ce] received
[2023-02-08 17:25:47,897: ERROR/MainProcess] Task api.tasks.task_run_test_optimization[829e66ee-925f-4aff-8c42-49420e5758ce] raised unexpected: TypeError(&#39;task_run_test_optimization() takes 3 positional arguments but 4 were given&#39;)
Traceback (most recent call last):
  File &quot;C:\Users\basti\Desktop\IALab\ialab\.ialabenv\lib\site-packages\celery\app\trace.py&quot;, line 451, in trace_task
    R = retval = fun(*args, **kwargs)
  File &quot;C:\Users\basti\Desktop\IALab\ialab\.ialabenv\lib\site-packages\celery\app\trace.py&quot;, line 734, in __protected_call__
    return self.run(*args, **kwargs)
TypeError: task_run_test_optimization() takes 3 positional arguments but 4 were given

the task is received but not working.

where is my error?

答案1

得分: 0

可能情况和解决方案 #1:

你可能最初让 task_run_test_optimization 接受 4 个参数。你运行了 Celery,并且它加载了这个任务及其签名到内存中。

然后,你可能改变了 task_run_test_optimization 的签名,使其接受 3 个参数,但你忘记了重新加载 Celery。结果,你尝试使用 3 个参数调用任务,但 Celery 仍然保留了旧版本的任务(带有 4 个参数)。

可能情况和解决方案 #2:

也许你有两个同名的不同任务:“task_run_test_optimization”,而你的 views.py 导入了错误的任务?确保你导入了正确的任务。

英文:

Possible case and solution #1.

You might have had task_run_test_optimization accepting 4 arguments initially. You've run Celery and it loaded this task and its signature into the memory.

Then you might have changed task_run_test_optimization signature to take 3 arguments, but you forgot to reload Celery. As a result, you are trying to call the task with 3 arguments, but Celery still keeps the old version of the task (task with 4 arguments).

Possible case and solution #2:

Maybe you have 2 different tasks named the same: "task_run_test_optimization" and your views.py imports the wrong one? Make sure you import the right one.

huangapple
  • 本文由 发表于 2023年2月9日 00:52:22
  • 转载请务必保留本文链接:https://go.coder-hub.com/75389110.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定