如何在同一Laravel队列中只运行一个作业?

huangapple go评论101阅读模式
英文:

How to run only one job at time in same laravel queue?

问题

我有一个与API一起工作的SAAS服务。它有限制,所以我需要一个用户帐户在同一时间只能执行一个请求。

为此,我使用 OnQueue($user->name); 进行排队,

然后在 handle() 中执行作业...

我只需要一个作业可以在用户队列中运行。同时,每个队列只能运行一个不同的队列1个作业。

我正在使用Redis连接。

这是我的作业类:

  1. public function __construct(Accounts $acc)
  2. {
  3. $this->acc = $acc;
  4. $this->ownjob = $acc->prepareJobQueue();
  5. }
  6. public function handle()
  7. {
  8. $acc = $this->acc;
  9. $job = $this->ownjob;
  10. $api = new Api([
  11. 'login' => $acc->login,
  12. 'password' => $acc->password,
  13. ]);
  14. if ($api->checkLogin()) {
  15. info("{$acc->login} OK Authorized");
  16. foreach ($job['queue'] as $term) {
  17. switch($term['type']) {
  18. case 'hashtag':
  19. info("{$acc->login} Queuing: type - {$term['type']}, value - {$term['value']}");
  20. $hashtag = Hashtags::where('cha_name',$term['value'])->first();
  21. $answer = $api->getUsersByHashtag($hashtag,50);
  22. break;
  23. case 'concurency':
  24. info("{$acc->login} Queuing: type - {$term['type']}, value - {$term['value']}");
  25. $soc_user = Users::where('soc_unique_id',$term['value'])->first();
  26. $answer = $api->getUserFollowers($soc_user);
  27. break;
  28. default:
  29. break;
  30. }
  31. }
  32. } else {
  33. info("{$acc->login} NOT Authorized - STOP JOB");
  34. }
  35. }

这是我如何分派作业的方式:

  1. $accounts = Accounts::select(['id', 'login', 'hashtag_filter', 'concurency_filter'])->whereNotNull('hashtag_filter')->get();
  2. foreach ($accounts as $acc) {
  3. doFollowing::dispatch($acc)->onQueue($acc->login);
  4. }

如果需要进一步的帮助,请告诉我。

英文:

I have saas service which working with API. It has limits so I need that one user account doing only one request at the same time.

For this I queuing with OnQueue($user->name);

then in handle() doing job...

I need only one job can be run in users queue. At the same time may be run only diffent queues 1 job per 1 queue.

Im using redis connection.

This my job class:

  1. public function __construct(Accounts $acc)
  2. {
  3. $this->acc = $acc;
  4. $this->ownjob = $acc->prepareJobQueue();
  5. }
  6. public function handle()
  7. {
  8. $acc = $this->acc;
  9. $job = $this->ownjob;
  10. $api = new Api([
  11. 'login' => $acc->login,
  12. 'password' => $acc->password,
  13. ]);
  14. if ($api->checkLogin()) {
  15. info("{$acc->login} OK Authorized");
  16. foreach ($job['queue'] as $term) {
  17. switch($term['type']) {
  18. case 'hashtag':
  19. info("{$acc->login} Queuing: type - {$term['type']}, value - {$term['value']}");
  20. $hashtag = Hashtags::where('cha_name',$term['value'])->first();
  21. $answer = $api->getUsersByHashtag($hashtag,50);
  22. break;
  23. case 'concurency':
  24. info("{$acc->login} Queuing: type - {$term['type']}, value - {$term['value']}");
  25. $soc_user = Users::where('soc_unique_id',$term['value'])->first();
  26. $answer = $api->getUserFollowers($soc_user);
  27. break;
  28. default:
  29. break;
  30. }
  31. }
  32. } else {
  33. info("{$acc->login} NOT Authorized - STOP JOB");
  34. }
  35. }

This is how I dispatching job:

  1. $accounts = Accounts::select(['id', 'login', 'hashtag_filter', 'concurency_filter'])->whereNotNull('hashtag_filter')->get();
  2. foreach ($accounts as $acc) {
  3. doFollowing::dispatch($acc)->onQueue($acc->login);
  4. }

答案1

得分: 3

你可以使用Laravel内置的速率限制功能来实现这个(注意需要Redis)。

在你的作业中:

  1. Redis::funnel('process-name')->limit(1)->then(function () {
  2. // 这里放置你的作业逻辑
  3. });

注意,如果你不提供then方法的第二个回调函数,如果无法获取锁定,它将抛出异常(这将导致作业失败)。

如果你使用的是Laravel 6或更高版本,你还可以选择在作业中间件中执行此操作,而不是在作业本身中执行(如果有多个共享相同锁定的作业,这很方便)。

有关Laravel的速率限制的更多信息,请参阅:https://laravel.com/docs/5.8/queues#rate-limiting

英文:

You could use Laravel's builtin rate limiting for this (note that it does require Redis).

Inside your job:

  1. Redis::funnel('process-name')->limit(1)->then(function () {
  2. // Your job logic here
  3. });

Note that if you don't provide a second callback to then, it will throw an exception if it cannot obtain the lock (which will cause your job to fail)

If you're using Laravel 6 or higher you could also opt to do this in a job middleware, rather than in the job itself (handy if you have multiple jobs that share the same lock)

More info on Laravel's rate limiting: https://laravel.com/docs/5.8/queues#rate-limiting

答案2

得分: 2

###使用 mxl/laravel-queue-rate-limit Composer 包。

它使您能够在特定队列上限制 Laravel 作业的速率,而不使用第三方驱动程序,如 Redis

  1. 使用以下命令安装它:

    1. $ composer require mxl/laravel-queue-rate-limit:^1.0
  2. 该包与 Laravel 5.5+ 兼容,并使用 [自动发现][1] 功能将 MichaelLedin\LaravelQueueRateLimit\QueueServiceProvider::class 添加到提供程序列表中。

  3. config/queue.php 中添加速率限制(每秒 x 个作业)设置:

    1. 'rateLimit' => [
    2. 'mail' => [
    3. 'allows' => 1, // 作业数量
    4. 'every' => 5 // 时间间隔(秒)
    5. ]
    6. ]

这些设置允许在 mail 队列上每 5 秒运行 1 个作业。确保默认队列驱动程序(config/queue.php 中的 default 属性)设置为除 sync 以外的任何值。

  1. 使用 --queue mail 选项运行队列工作者:

    1. $ php artisan queue:work --queue mail

您可以在多个队列上运行工作者,但只有在 rateLimit 设置中引用的队列才会受到速率限制:

  1. $ php artisan queue:work --queue mail,default

default 队列上的作业将不受速率限制。

  1. 将一些作业加入队列以测试速率限制:

    1. SomeJob::dispatch()->onQueue('mail');
    2. SomeJob::dispatch()->onQueue('mail');
    3. SomeJob::dispatch()->onQueue('mail');
    4. SomeJob::dispatch();
英文:

###Use mxl/laravel-queue-rate-limit Composer package.

It enables you to rate limit Laravel jobs on specific queue without using A third party driver such as Redis.

  1. Install it with:

    1. $ composer require mxl/laravel-queue-rate-limit:^1.0
  2. This package is compatible with Laravel 5.5+ and uses [auto-discovery][1] feature to add MichaelLedin\LaravelQueueRateLimit\QueueServiceProvider::class to providers.

  3. Add rate limit (x number of jobs per y seconds) settings to config/queue.php:

    1. 'rateLimit' => [
    2. 'mail' => [
    3. 'allows' => 1, // number of jobs
    4. 'every' => 5 // time interval in seconds
    5. ]
    6. ]

These settings allow to run 1 job every 5 seconds on mail queue.
Make sure that default queue driver (default property in config/queue.php) is set to any value except sync.

  1. Run queue worker with --queue mail option:

    1. $ php artisan queue:work --queue mail

You can run worker on multiple queues, but only queues referenced in rateLimit setting will be rate limited:

  1. $ php artisan queue:work --queue mail,default
  2. Jobs on `default ` queue will be executed without rate limiting.
  1. Queue some jobs to test rate limiting:

    1. SomeJob::dispatch()->onQueue('mail');
    2. SomeJob::dispatch()->onQueue('mail');
    3. SomeJob::dispatch()->onQueue('mail');
    4. SomeJob::dispatch();

答案3

得分: 1

你可以在你的Supervisor或Horizon设置中限制每个队列的numprocs

如果你只为每个用户生成一个队列工作进程,我相信你会得到你想要的行为。

英文:

You could limit numprocs per queue in your Supervisor or Horizon setup.

If you only spawn one queue worker per user I believe you will get your desired behaviour.

huangapple
  • 本文由 发表于 2020年1月3日 18:06:42
  • 转载请务必保留本文链接:https://go.coder-hub.com/59576597.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定