Queues
Table of Contents
Introduction
あなたの web application を構築する際に、アップロードされた CSV ファイルの解析や格納など、一般的な web request の間に実行するには時間がかかりすぎるタスクがいくつかあるかもしれません。幸いなことに、 Laravel は、バックグラウンドで処理できるキューに入った jobs を簡単に作成することを可能にしてくれます。時間を要するタスクを queue に移すことで、あなたの application は驚異的な速さで web リクエストに対応し、ユーザーにより良い user 体験を提供することができます。
Laravel queues は、Amazon SQS 、Redis 、あるいは関係 types database など、さまざまな queue バックエンドを通じて統一されたキューイング API を提供します。
Laravel の queue 設定 options は、アプリケーションのconfig/queue.php
設定ファイルに保存されます。このファイルには、フレームワークに含まれる各 queue ドライバのための connection 設定があり、 database 、Amazon SQS 、Redis 、およびBeanstalkd ドライバ、さらには jobs を即時に実行する同期 driver (ローカル開発中に使用)が含まれています。また、キューに追加した jobs を破棄するnull
queue driver も含まれています。
NOTE
今、 Laravel は Horizon を提供しています。これは、 Redis で動作する queues に対する美しいダッシュボードと設定システムです。詳細については、Horizon documentationをご覧ください。
接続 vs. Queues
Laravel queues を始める前に、 "connections" と "queues" の違いを理解することが重要です。config/queue.php
設定ファイルには、connections
設定 array があります。このオプションは、Amazon SQS、Beanstalk、または Redis などのバックエンド queue services への接続を定義します。ただし、任意の queue connection は、異なるスタックやキューの山と考えられる複数の "queues" を持つことができます。
queue
設定ファイルの各 connection 設定例には、queue
の attribute が含まれていることに注意してください。これは、 jobs が特定の connection に送信されるときにディスパッチされる default queue です。つまり、どの queue にディスパッチするかを明示的に定義せずに job を dispatch すると、その job はqueue
の attribute で定義された queue に配置されます、という connection の設定です:
use App\Jobs\ProcessPodcast;
// This job is sent to the default connection's default queue...
ProcessPodcast::dispatch();
// This job is sent to the default connection's "emails" queue...
ProcessPodcast::dispatch()->onQueue('emails');
一部の applications では、複数の queues に jobs をプッシュする必要が一切なく、代わりに一つのシンプルな queue を持つことを好むかもしれません。しかし、jobs を複数の queues にプッシュすることは、jobs の処理を優先順位付けしたり、分割したりしたい applications にとって特に有用で、Laravel queue ワーカーは優先順位で処理すべき queues を指定することを可能にします。例えば、high
の queue に jobs をプッシュしたら、それらにより高い processing 優先順位を与えるワーカーを実行することができます:
php artisan queue:work --queue=high,default
Driver のメモと前提条件
Database
database
の queue driver を使用するためには、jobs を保持するための database テーブルが必要です。通常、これは Laravel の default 0001_01_01_000002_create_jobs_table.php
のdatabase マイグレーションに含まれています。しかし、あなたの application にこのマイグレーションが含まれていない場合は、make:queue-table
の Artisan command を使用して作成することができます。
php artisan make:queue-table
php artisan migrate
Redis
redis
の queue driver を使用するためには、config/database.php
設定ファイルで Redis database connection を設定する必要があります。
WARNING
serializer
およびcompression
Redis options は、redis
queue driver ではサポートされていません。
Redis クラスター
あなたの Redis queue connection が Redis クラスターを使用している場合、あなたの queue 名には key hash tag を含める必要があります。これは、指定された queue のすべての Redis keys が同じ hash スロットに配置されることを確保するために required です。
'redis' => [
'driver' => 'redis',
'connection' => env('REDIS_QUEUE_CONNECTION', 'default'),
'queue' => env('REDIS_QUEUE', '{default}'),
'retry_after' => env('REDIS_QUEUE_RETRY_AFTER', 90),
'block_for' => null,
'after_commit' => false,
],
Blocking
Redis queue を使用する際、block_for
設定 option を使用して、driver がワーカーループを反復し、Redisdatabase を再ポーリングする前に job が利用可能になるまでどのくらい待つべきかを指定することができます。
この value を queue の負荷に基づいて調整することは、新しい jobs を探すために Redis database を継続的にポーリングするよりも効率的です。例えば、driver が job を待機する間に 5 秒間ブロックするように value を 5
に設定できます。
'redis' => [
'driver' => 'redis',
'connection' => env('REDIS_QUEUE_CONNECTION', 'default'),
'queue' => env('REDIS_QUEUE', 'default'),
'retry_after' => env('REDIS_QUEUE_RETRY_AFTER', 90),
'block_for' => 5,
'after_commit' => false,
],
WARNING
block_for
を0
に設定すると、 queue の作業者は、 job が利用可能になるまで無期限にブロックされます。これにより、次の job が処理されるまで、SIGTERM
などのシグナルが処理されなくなります。
その他の Driver に必要な条件
次に示す依存関係は、リストされた queue ドライバに必要です。これらの依存関係は Composer パッケージマネージャーを介してインストールできます。
- Amazon SQS:
aws/aws-sdk-php ~3.0
- Beanstalkd:
pda/pheanstalk ~5.0
- Redis:
predis/predis ~2.0
または phpRedis PHP 拡張
Creating Jobs
Job クラスの生成
default によると、あなたの application のすべての queue 可能な jobs は app/Jobs
ディレクトリに保存されます。もし app/Jobs
ディレクトリが存在しない場合、make:job
Artisan command を実行した時に作成されます。
php artisan make:job ProcessPodcast
生成された class は Illuminate\Contracts\Queue\ShouldQueue
インターフェースを実装します。これは Laravel に対して、 job が非同期に実行するために queue にプッシュされるべきであることを示しています。
NOTE
Job スタブは スタブの公開を使用してカスタマイズすることができます。
Class の構造
Job クラスは非常にシンプルで、通常はhandle
method のみを含み、このメソッドは job が queue によって処理される時に呼び出されます。始めるには、例の job class を見てみましょう。この例では、ポッドキャスト公開 service を管理していると仮定し、公開前にアップロードされたポッドキャストファイルを process する必要があるとします:
<?php
namespace App\Jobs;
use App\Models\Podcast;
use App\Services\AudioProcessor;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
class ProcessPodcast implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
/**
* Create a new job instance.
*/
public function __construct(
public Podcast $podcast,
) {}
/**
* Execute the job.
*/
public function handle(AudioProcessor $processor): void
{
// Process uploaded podcast...
}
}
この例では、Eloquent modelsを queue に入れられたジョブのコンストラクタに直接渡すことができたことに注目してください。SerializesModels
という特性を使っているため、job によって、Eloquent models とそれらがロードしたリレーションシップは、job が処理される際にスムーズにシリアライズおよびアンシリアライズされます。
キューに追加された job がコンストラクタで Eloquent model を受け取る場合、queue には model の識別子だけがシリアライズされます。実際に job が処理されると、queue システムは database からフルの model インスタンスとそのロードされたリレーションシップを自動的に再取得します。この model シリアライゼーションのアプローチにより、queue driver に送信される job ペイロードが大幅に小さくなります。
handle
Method Dependency Injection
handle
method は、 job が queue によって処理されたときに呼び出されます。我々は、 job のhandle
method に対する依存関係を型ヒントとして指定することができることに注意してください。 Laravel service containerは自動的にこれらの依存関係を注入します。
コンテナが依存関係を handle
method に注入する方法を完全に制御したい場合は、コンテナの bindMethod
method を使用することができます。bindMethod
method は、 job とコンテナを受け取るコールバックを受け付けます。コールバック内では、handle
method を自由に呼び出すことができます。通常、App\Providers\AppServiceProvider
の boot
method からこの method を呼び出すべきです。service provider:
use App\Jobs\ProcessPodcast;
use App\Services\AudioProcessor;
use Illuminate\Contracts\Foundation\Application;
$this->app->bindMethod([ProcessPodcast::class, 'handle'], function (ProcessPodcast $job, Application $app) {
return $job->handle($app->make(AudioProcessor::class));
});
WARNING
バイナリ data、たとえば生の image の内容は、 Binary data に待機している job に渡す前に
base64_encode
関数を通すべきです。そうでなければ、その job は queue に置かれる際に JSON に適切にシリアライズされないかもしれません。
キューの関係
すべてのロードされた Eloquent model の関連性も、 job がキューに積まれるときにシリアライズされるため、シリアライズされた job string はかなり大きくなることがあります。さらに、 job が逆シリアライズされて model の関連性が database から再取得されると、それらは完全に retrieved されます。 job のキューイング process 中にシリアライズされた model に以前に適用されていた関連性の制約は、 job が逆シリアライズされるときには適用されません。したがって、特定の関連性の一部分で作業したい場合は、キューに積まれた job 内でその関連性を再制約する必要があります。
または、リレーションがシリアライズされるのを防ぐために、プロパティ value を設定する際に model にwithoutRelations
method を呼び出すことができます。この method はロードされたリレーションシップなしの model のインスタンスを返します:
/**
* Create a new job instance.
*/
public function __construct(Podcast $podcast)
{
$this->podcast = $podcast->withoutRelations();
}
あなたが PHP コンストラクタプロパティプロモーションを使用しており、 Eloquent model がその関連性をシリアライズしないように指示したい場合、WithoutRelations
attribute を使用することができます:
use Illuminate\Queue\Attributes\WithoutRelations;
/**
* Create a new job instance.
*/
public function __construct(
#[WithoutRelations]
public Podcast $podcast
) {
}
もし job が single model の代わりに collection または array の Eloquent models を受け取る場合、その collection 内の models は、 job がデシリアル化され実行されたときに関係性が復元されません。これは、大量の models を扱う jobs における過度の resource 使用を防ぐためです。
Unique Jobs
WARNING
uniquejobs は、ロックをサポートする cachedrivers が必要です。現在、
memcached
、redis
、dynamodb
、database
、file
及びarray
cachedrivers はアトミックロックをサポートしています。また、uniquejobs の制約は batch 内の jobs には適用されません。
時々、特定の job がある時点で queue 上に一つだけ存在することを確認したい場合があります。その場合、ShouldBeUnique
インターフェースを job class に実装することにより実現できます。このインターフェースは、あなたの class に追加のメソッドを定義する必要はありません:
<?php
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Contracts\Queue\ShouldBeUnique;
class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique
{
...
}
上記の例では、UpdateSearchIndex
の job は unique です。したがって、job は、同じ job の別のインスタンスが既に queue にあり、処理がまだ終了していない場合には dispatch されません。
特定のケースでは、 job unique を作る特定の"key"を定義したい場合や、 job が unique のままでいられない timeout を指定したい場合があります。これを実現するために、uniqueId
やuniqueFor
のプロパティまたはメソッドを job class に定義できます:
<?php
use App\Models\Product;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Contracts\Queue\ShouldBeUnique;
class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique
{
/**
* The product instance.
*
* @var \App\Product
*/
public $product;
/**
* The number of seconds after which the job's unique lock will be released.
*
* @var int
*/
public $uniqueFor = 3600;
/**
* Get the unique ID for the job.
*/
public function uniqueId(): string
{
return $this->product->id;
}
}
上記の例では、UpdateSearchIndex
job は製品 ID によって unique です。ですから、同じ製品 ID を持つ新たな job の dispatches は、既存の job の処理が完了するまで無視されます。さらに、既存の job が 1 時間以内に処理されない場合、unique ロックは解除され、同じ uniquekey を持つ別の job が queue に dispatches することが可能になります。
WARNING
あなたの application が複数の web サーバーやコンテナから jobs をディスパッチしている場合、すべてのサーバーが同じ中央の cache サーバーと通信していることを確認するべきです。そうすれば Laravel が job が unique であるかどうかを正確に判断することができます。
Jobs Unique を Processing が始まるまで保持する
default では、unique job は job が処理を終了するか、すべてのリトライ試行に失敗した後に アンロックされます。ただし、job が処理される前にすぐにアンロックしたい場合もあるかもしれません。それを実現するためには、job はShouldBeUnique
コントラクトの代わりにShouldBeUniqueUntilProcessing
コントラクトを実装するべきです。
<?php
use App\Models\Product;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Contracts\Queue\ShouldBeUniqueUntilProcessing;
class UpdateSearchIndex implements ShouldQueue, ShouldBeUniqueUntilProcessing
{
// ...
}
Unique Job ロック
裏側では、ShouldBeUnique
の job が dispatch されると、 Laravel はuniqueId
key を用いてロックを取得しようとします。もしロックが取得できなければ、その job は dispatch されません。このロックは、job が処理を完了するか、全てのリトライの試行が失敗すると release されます。default では、Laravel はこのロックを取得するために defaultcache ドライバを使用します。しかし、ロック取得のために別の driver を使用したい場合は、使用すべき cache ドライバを返すuniqueVia
の method を定義することができます:
use Illuminate\Contracts\Cache\Repository;
use Illuminate\Support\Facades\Cache;
class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique
{
...
/**
* Get the cache driver for the unique job lock.
*/
public function uniqueVia(): Repository
{
return Cache::driver('redis');
}
}
NOTE
job の同時処理数を制限するだけであれば、代わりに
WithoutOverlapping
ジョブ middleware を使用してください。
暗号化された Jobs
Laravel を使用すると、encryptionを介してジョブの data のプライバシーと integrity を確保できます。始めるには、単に ShouldBeEncrypted
インターフェースを job class に追加します。このインターフェースが class に追加されると、 Laravel は自動的にあなたの job を暗号化し、それを queue にプッシュします:
<?php
use Illuminate\Contracts\Queue\ShouldBeEncrypted;
use Illuminate\Contracts\Queue\ShouldQueue;
class UpdateSearchIndex implements ShouldQueue, ShouldBeEncrypted
{
// ...
}
Job Middleware
Job middleware は、キューに入れられた jobs の実行に対して custom ロジックを適用し、それ自体の jobs での定型的なコードを減らすことができます。例えば、以下のhandle
method は、Laravel の Redis rate limiting features を利用して、5 秒ごとに 1 つの job だけを process させるようにします:
use Illuminate\Support\Facades\Redis;
/**
* Execute the job.
*/
public function handle(): void
{
Redis::throttle('key')->block(0)->allow(1)->every(5)->then(function () {
info('Lock obtained...');
// Handle job...
}, function () {
// Could not obtain lock...
return $this->release(5);
});
}
この code は有効ですが、handle
method の実装は Redis rate limiting ロジックが散見されるために騒々しいものとなります。さらに、この rate limiting ロジックは、レート制限を行いたい他の jobs のためにも複製しなければなりません。
handle method での rate limiting の代わりに、 job middleware を定義して rate limiting を処理することができます。 Laravel には job middleware の default の位置がないため、あなたの application のどこにでも job middleware を配置しても構いません。この例では、app/Jobs/Middleware
ディレクトリに middleware を配置します。
<?php
namespace App\Jobs\Middleware;
use Closure;
use Illuminate\Support\Facades\Redis;
class RateLimited
{
/**
* Process the queued job.
*
* @param \Closure(object): void $next
*/
public function handle(object $job, Closure $next): void
{
Redis::throttle('key')
->block(0)->allow(1)->every(5)
->then(function () use ($job, $next) {
// Lock obtained...
$next($job);
}, function () use ($job) {
// Could not obtain lock...
$job->release(5);
});
}
}
ご覧の通り、route middlewareと同様に、 job middleware は処理中の job と、その job の processing を続行するために呼び出される必要があるコールバックを受け取ります。
job middleware を作成した後、それらは middleware
method から返すことで job に関連付けることができます。 この method は make:job
Artisan command で設営した jobs には存在しないため、手動で job class に追加する必要があります。
use App\Jobs\Middleware\RateLimited;
/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [new RateLimited];
}
NOTE
Job middleware は、キュー可能な event listeners 、mailables、および notifications にも割り当てることができます。
Rate Limiting
私たちはちょうどあなた自身の rate limiting job middleware の書き方をデモンストレーションしましたが、実際には Laravel はあなたが jobs のレート制限に使うことのできる rate limiting middleware を含みます。route rate limitersのように、 job rate limiters は、RateLimiter
ファサードの for
method を使って定義されます。
たとえば、 users が自身の data を 1 時間に 1 回バックアップできるように許可したい場合や、プレミアム顧客にはそのような制限を課さずに許可したい場合があります。これを達成するためには、AppServiceProvider
の method であるboot
にRateLimiter
を定義することができます。
use Illuminate\Cache\RateLimiting\Limit;
use Illuminate\Support\Facades\RateLimiter;
/**
* Bootstrap any application services.
*/
public function boot(): void
{
RateLimiter::for('backups', function (object $job) {
return $job->user->vipCustomer()
? Limit::none()
: Limit::perHour(1)->by($job->user->id);
});
}
上記の例では、時間単位のレート制限を定義しましたが、perMinute
method を使用して分単位でのレート制限を簡単に定義できます。さらに、レート制限のby
method に任意の value を渡すことができます。ただし、この value は最もよく、顧客ごとのレート制限を区分するために使用されます:
return Limit::perMinute(50)->by($job->user->id);
あなたが一度レート制限を定義したら、Illuminate\Queue\Middleware\RateLimited
middleware を使用してそのレートリミッターをあなたの job に attach することができます。 job がレート制限を超えるたびに、この middleware は job をレート制限の期間に基づいた適切な delay で queue に再び release します。
use Illuminate\Queue\Middleware\RateLimited;
/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [new RateLimited('backups')];
}
レート制限された job を queue に戻すと、その job の attempts
の総数は増加します。tries
や maxExceptions
のプロパティを jobclass に合わせて調整することをお勧めします。または、retryUntil
methodを使用して、job を試行しない時間を定義することもお勧めします。
もし job がレート制限に達した際に再試行させたくない場合、dontRelease
method を使用することができます。
/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new RateLimited('backups'))->dontRelease()];
}
NOTE
Redis を使用している場合、
Illuminate\Queue\Middleware\RateLimitedWithRedis
middleware を使用できます。これは Redis に最適化されており、基本的な rate limiting middleware よりも効率的です。
Job の重複を防ぐ
Laravel には、任意の key に基づいて job の重複を防ぐことができるIlluminate\Queue\Middleware\WithoutOverlapping
middleware が含まれています。これは、一度に一つの job だけが変更すべき“ resource を変更する待機中の“ job がある場合に役立つことがあります。
例えば、ユーザーのクレジットスコアを更新する job が queue に入っていて、同じ user ID に対するクレジットスコアの update job が重複するのを防ぎたいとします。これを達成するためには、ジョブのmiddleware
method からWithoutOverlapping
middleware を返せばよいです。
use Illuminate\Queue\Middleware\WithoutOverlapping;
/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [new WithoutOverlapping($this->user->id)];
}
同じ type の重複する jobs は、 queue に戻されます。また、リリースされた job が再度試みられるまでの秒数も指定することができます。
/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new WithoutOverlapping($this->order->id))->releaseAfter(60)];
}
あなたが、重複する jobs をすぐに delete して再試行されないようにしたい場合は、dontRelease
method を使用することができます:
/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new WithoutOverlapping($this->order->id))->dontRelease()];
}
WithoutOverlapping
という middleware は、Laravel のアトミックロック機能によって機能します。時折、あなたの job が予期せずに失敗したりタイムアウトしたりするとロックが解除されないことがあります。したがって、expireAfter
method を使用して、明示的にロックの有効期限を設定することができます。例えば、以下の例では、Laravel に job が処理を開始してから 3 分後にWithoutOverlapping
ロックを解放するように指示します:
/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new WithoutOverlapping($this->order->id))->expireAfter(180)];
}
WARNING
WithoutOverlapping
middleware は、ロックをサポートする cache driver が必要です。現在、memcached
、redis
、dynamodb
、database
、file
、array
の cache drivers がアトミックロックをサポートしています。
Job クラス間でのロックの Keys 共有
default では、WithoutOverlapping
middleware は同じ class の job のみがオーバーラップしないようにします。したがって、2 つの異なる jobclass が同じロック key を使用しても、それらがオーバーラップするのを防ぐことはありません。ただし、shared
method を使用して、Laravel に jobclass 全体に key を適用するよう指示することができます。
use Illuminate\Queue\Middleware\WithoutOverlapping;
class ProviderIsDown
{
// ...
public function middleware(): array
{
return [
(new WithoutOverlapping("status:{$this->provider}"))->shared(),
];
}
}
class ProviderIsUp
{
// ...
public function middleware(): array
{
return [
(new WithoutOverlapping("status:{$this->provider}"))->shared(),
];
}
}
スロットリング Exceptions
Laravel には、Illuminate\Queue\Middleware\ThrottlesExceptions
という middleware が含まれており、これにより例外のスロットリングを行うことができます。一旦 job が一定の数の例外を throws すると、その後の job の実行は、指定された時間間隔が経過するまで遅延します。この middleware は、不安定なサードパーティの services と対話する job に特に有用です。
たとえば、queue に入った job が、例外を throw し始めるサードパーティーの API とやり取りするとしましょう。これらの例外をスロットル(調整)するために、あなたはThrottlesExceptions
middleware をあなたのジョブのmiddleware
method から返すことができます。通常、この middleware は時間ベースの試行を実装する job と組み合わせて使用すべきです。
use DateTime;
use Illuminate\Queue\Middleware\ThrottlesExceptions;
/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [new ThrottlesExceptions(10, 5)];
}
/**
* Determine the time at which the job should timeout.
*/
public function retryUntil(): DateTime
{
return now()->addMinutes(5);
}
middleware が受け入れる最初のコンストラクタ引数は、job が throw できる exceptions の数であり、2 つめのコンストラクタ引数は、job がスロットルされた後、再試行するまでの経過すべき分数です。上記の code 例で、job が 5 分以内に 10 の exceptions を throw した場合、job を再試行するまで 5 分待つことになります。
job が例外を throws したが、例外の閾 values にはまだ達していない場合、その job は通常すぐに再試行されます。しかし、その job が遅延すべき分数を指定することができます。これは、 middleware を job に attach するときに backoff
method を呼び出すことで行います。
use Illuminate\Queue\Middleware\ThrottlesExceptions;
/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new ThrottlesExceptions(10, 5))->backoff(5)];
}
内部的に、この middleware は Laravel の cache システムを使用して rate limiting を実装し、ジョブの class 名が cache の key として使用されます。このキーは、by
method を使用して middleware をあなたの job に添付するときに上書きすることができます。これは、同じサードパーティの service とやりとりをする複数の jobs を持ち、それらが一般的なスロットリング bucket を共有したい場合に役立つかもしれません。
use Illuminate\Queue\Middleware\ThrottlesExceptions;
/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new ThrottlesExceptions(10, 10))->by('key')];
}
default では、この middleware はすべての例外をスロットルします。この挙動を変更するには、when
method を用いて middleware をあなたの job に attach する際に呼び出します。その結果、when
method がtrue
を返す場合にのみ例外がスロットルされます。
use Illuminate\Http\Client\HttpClientException;
use Illuminate\Queue\Middleware\ThrottlesExceptions;
/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new ThrottlesExceptions(10, 10))->when(
fn (Throwable $throwable) => $throwable instanceof HttpClientException
)];
}
例外( exceptions )をアプリケーションの例外( handler )に報告したい場合は、 middleware を job にアタッチする際に report
method を呼び出すことで実現できます。オプションとして、report
method にクロージャを提供することも可能で、その場合、指定されたクロージャが true
を返すときのみ例外が報告されます:
use Illuminate\Http\Client\HttpClientException;
use Illuminate\Queue\Middleware\ThrottlesExceptions;
/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new ThrottlesExceptions(10, 10))->report(
fn (Throwable $throwable) => $throwable instanceof HttpClientException
)];
}
NOTE
Redis を使用している場合、
Illuminate\Queue\Middleware\ThrottlesExceptionsWithRedis
という middleware を使用することができます。これは Redis に特化して調整されており、基本的な例外スロットリングの middleware よりも効率的です。
Dispatching Jobs
job class を書き終えたら、job 自身のdispatch
method を使ってそれを dispatch できます。dispatch
method に渡される引数は、job のコンストラクタに渡されます。
<?php
namespace App\Http\Controllers;
use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;
class PodcastController extends Controller
{
/**
* Store a new podcast.
*/
public function store(Request $request): RedirectResponse
{
$podcast = Podcast::create(/* ... */);
// ...
ProcessPodcast::dispatch($podcast);
return redirect('/podcasts');
}
}
もし条件付きで dispatch を job に送りたい場合、dispatchIf
とdispatchUnless
メソッドを使用することができます:
ProcessPodcast::dispatchIf($accountActive, $podcast);
ProcessPodcast::dispatchUnless($accountSuspended, $podcast);
新しい Laravel アプリケーションでは、sync
driver は default queue driver です。この driver は、現在の request のフォアグラウンドで jobs を同期的に実行するため、ローカル開発中によく便利です。実際に jobs をバックグラウンドの processing にキューに入れたい場合、アプリケーションのconfig/queue.php
設定ファイル内で異なる queue driver を指定することができます。
遅延ディスパッチ
もし job がすぐに queue ワーカーによる processing 可能な状態にしないことを指定したい場合は、 job をディスパッチする際にdelay
method を使用できます。例えば、ディスパッチされてから 10 分後まで job が processing 可能な状態にならないように指定しましょう:
<?php
namespace App\Http\Controllers;
use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;
class PodcastController extends Controller
{
/**
* Store a new podcast.
*/
public function store(Request $request): RedirectResponse
{
$podcast = Podcast::create(/* ... */);
// ...
ProcessPodcast::dispatch($podcast)
->delay(now()->addMinutes(10));
return redirect('/podcasts');
}
}
WARNING
Amazon SQS の queue service の最大 delay 時間は 15 分です。
ブラウザへの Response 送信後のディスパッチ
また、あなたの webserver が FastCGI を使用している場合、dispatchAfterResponse
method は、jobs の dispatch を HTTP response が user のブラウザに送信されてから遅らせます。これにより、queue に入っている jobs がまだ実行中であっても、user が application を使用し始めることが可能になります。これは、mail の送信など、約 1 秒かかる jobs に対して通常使用されるべきです。これらは現在の HTTP request の中で処理されるため、この方法で dispatch された jobs を処理するためには、queue ワーカーが稼働している必要はありません。
use App\Jobs\SendNotification;
SendNotification::dispatchAfterResponse();
また、dispatch
でクロージャーを実行し、afterResponse
の method をdispatch
helper にチェーンして、 HTTP response がブラウザに送信された後にクロージャーを実行することもできます:
use App\Mail\WelcomeMessage;
use Illuminate\Support\Facades\Mail;
dispatch(function () {
Mail::to('taylor@example.com')->send(new WelcomeMessage);
})->afterResponse();
同期ディスパッチ
すぐに(同期的に) job を dispatch したい場合は、 dispatchSync
method を使用することができます。この method を使用すると、 job は queue に入れられず、現在の process 内で即座に実行されます。
<?php
namespace App\Http\Controllers;
use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;
class PodcastController extends Controller
{
/**
* Store a new podcast.
*/
public function store(Request $request): RedirectResponse
{
$podcast = Podcast::create(/* ... */);
// Create podcast...
ProcessPodcast::dispatchSync($podcast);
return redirect('/podcasts');
}
}
Jobs & Database Transactions
database transactions 内で dispatch jobs を行うことは全く問題ありませんが、 job が実際に成功することを確実にするためには特別な注意が必要です。 transaction 内で job を発行すると、 job が親の transaction がコミットされる前にワーカーによって処理される可能性があります。これが起こると、 database トランザクション中に models または database レコードに対して行った更新は、まだ database に反映されていない可能性があります。さらに、トランザクション中に作成された models または database レコードは、 database に存在しない可能性があります。
幸いなことに、Laravel はこの問題を回避するためのいくつかの方法を提供しています。まず、after_commit
接続 option をあなたの queue 接続の設定 array に設定することができます:
'redis' => [
'driver' => 'redis',
// ...
'after_commit' => true,
],
after_commit
option が true
の場合、databasetransactions 内で job を dispatch することができます。しかし、Laravel は、親の database transactions が実際にコミットされるまでジョブの dispatch を待つでしょう。もちろん、現在オープンしている database transactions がない場合、job はすぐに dispatch されます。
transaction が発生中の例外によりロールバックされた場合、その transaction 中に dispatch された jobs は破棄されます。
NOTE
after_commit
設定オプションをtrue
に設定すると、キューに入った event listeners 、メール、 notifications 、および broadcast events は、すべての開いている database transactions がコミットされた後に処理されます。
コミット Dispatch の振る舞いをインラインで指定する
after_commit
の queue 接続設定 option をtrue
に設定しない場合でも、特定の job がすべての開いている database transactions がコミットされた後に queue connection するように指定することができます。これを達成するために、 dispatch 操作に対してafterCommit
method をチェーンさせることができます。
use App\Jobs\ProcessPodcast;
ProcessPodcast::dispatch($podcast)->afterCommit();
同様に、after_commit
設定オプションがtrue
に設定されている場合、特定の job が、開かれた database transactions がコミットされるのを待たずに直ちにディスパッチされるべきであることを示すことができます。
ProcessPodcast::dispatch($podcast)->beforeCommit();
Job Chaining
Job chaining は、待機中の job リストを指定して、それらを primary job が正常に実行した後に sequence で実行することを可能にします。sequence 中の 1 つの job が失敗すると、残りの job は実行されません。待機中の job チェインを実行するには、Bus
facade によって提供されるchain
method を使うことができます。Laravel の command バスは、待機中の job の dispatch が構築される低レベルの component です:
use App\Jobs\OptimizePodcast;
use App\Jobs\ProcessPodcast;
use App\Jobs\ReleasePodcast;
use Illuminate\Support\Facades\Bus;
Bus::chain([
new ProcessPodcast,
new OptimizePodcast,
new ReleasePodcast,
])->dispatch();
job class のインスタンスを連鎖させるだけでなく、クロージャも連鎖させることができます。
Bus::chain([
new ProcessPodcast,
new OptimizePodcast,
function () {
Podcast::update(/* ... */);
},
])->dispatch();
WARNING
Deleting jobs を
$this->delete()
の method で job 内から削除しても、連鎖的な jobs の処理が阻止されるわけではありません。チェーンの実行が停止するのは、チェーン内の job が失敗した場合のみです。
チェーン Connection と Queue
チェーンされた jobs に使用するべき connection と queue を指定したい場合は、onConnection
およびonQueue
メソッドを使用することができます。これらのメソッドは、 queue connection と queue の名前を指定します。これらは、キューに入れられた job が明示的に別の connection / queue が割り当てられていない unless 場合に使用するべきものです:
Bus::chain([
new ProcessPodcast,
new OptimizePodcast,
new ReleasePodcast,
])->onConnection('redis')->onQueue('podcasts')->dispatch();
チェーンに Jobs を追加する
たまに、既存の job チェーンから別の job チェーンに job を追加したり、その前に追加したりする必要があります。これは、prependToChain
およびappendToChain
method を使用して実現できます。
/**
* Execute the job.
*/
public function handle(): void
{
// ...
// Prepend to the current chain, run job immediately after current job...
$this->prependToChain(new TranscribePodcast);
// Append to the current chain, run job at end of chain...
$this->appendToChain(new TranscribePodcast);
}
チェーンの失敗
jobs を連鎖させる際には、チェーン内の job が失敗した場合に呼び出されるべきクロージャを指定するためにcatch
method を使用することができます。与えられたコールバックは、 job の失敗を引き起こしたThrowable
インスタンスを受け取ります:
use Illuminate\Support\Facades\Bus;
use Throwable;
Bus::chain([
new ProcessPodcast,
new OptimizePodcast,
new ReleasePodcast,
])->catch(function (Throwable $e) {
// A job within the chain has failed...
})->dispatch();
WARNING
チェーンコールバックはシリアル化され、後で Laravel queue によって実行されるため、チェーンコールバック内で
$this
変数を使用すべきではありません。
Queue と Connection のカスタマイズ
特定の Queue へのディスパッチ
onQueue
method を使用して jobs を dispatch するときに、'queues'を特定するために以下の通りとなります。'jobs'を異なる'queues'に押し出すことで、queues に入れた'jobs'を"カテゴリー化"することができ、さらに各'queues'に割り当てるワーカーの数を優先順位付けすることができます。しかし、これは'jobs'をあなたの'queues'設定ファイルで定義された異なる'queues' "接続"に押し出すものではなく、単一の接続内の特定の'queues'に対してのみ押し出します。
<?php
namespace App\Http\Controllers;
use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;
class PodcastController extends Controller
{
/**
* Store a new podcast.
*/
public function store(Request $request): RedirectResponse
{
$podcast = Podcast::create(/* ... */);
// Create podcast...
ProcessPodcast::dispatch($podcast)->onQueue('processing');
return redirect('/podcasts');
}
}
あるいは、ジョブのコンストラクタ内で onQueue
method を呼び出すことにより、ジョブの queue を指定することもできます。
<?php
namespace App\Jobs;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
class ProcessPodcast implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
/**
* Create a new job instance.
*/
public function __construct()
{
$this->onQueue('processing');
}
}
特定の Connection へのディスパッチ
あなたの application が複数の queue 接続とやり取りを行う場合、onConnection
method を使用して job をプッシュする connection を指定できます。
<?php
namespace App\Http\Controllers;
use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;
class PodcastController extends Controller
{
/**
* Store a new podcast.
*/
public function store(Request $request): RedirectResponse
{
$podcast = Podcast::create(/* ... */);
// Create podcast...
ProcessPodcast::dispatch($podcast)->onConnection('sqs');
return redirect('/podcasts');
}
}
onConnection
と onQueue
の methods を連結して、job の コネクションと queue を指定することができます。
ProcessPodcast::dispatch($podcast)
->onConnection('sqs')
->onQueue('processing');
あるいは、ジョブのコンストラクタ内でonConnection
method を呼び出すことにより、ジョブの connection を指定することもできます:
<?php
namespace App\Jobs;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
class ProcessPodcast implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
/**
* Create a new job instance.
*/
public function __construct()
{
$this->onConnection('sqs');
}
}
最大 Job の試行回数 / Timeout Values の指定
最大試行回数
あなたのキューに入っている jobs の一つが error に遭遇している場合、おそらくそれが無限に再試行することは望ましくないでしょう。そのため、 Laravel は、 job が試みられる回数や期間を指定するさまざまな方法を提供しています。
仕事の試行可能最大数を指定する一つの方法は、--tries
スイッチを Artisan command ラインに使うことです。これは、処理中の仕事が試行可能な回数を指定しない限り、ワーカーによって処理されるすべての仕事に適用されます。
php artisan queue:work --tries=3
jobs が最大試行回数を超えると、"失敗した" job とみなされます。失敗した job の取り扱いについての詳細は、失敗した job のドキュメンテーションをご覧ください。--tries=0
がqueue:work
command に提供された場合、 job は無限に再試行されます。
job class 自体で job が試みることができる最大回数を定義することで、より詳細なアプローチを取ることができます。最大試行回数が job 上で指定されている場合、それは command ライン上で提供される--tries
value よりも優先されます。
<?php
namespace App\Jobs;
class ProcessPodcast implements ShouldQueue
{
/**
* The number of times the job may be attempted.
*
* @var int
*/
public $tries = 5;
}
特定の job の最大試行回数に対して動的な制御が必要な場合は、その job にtries
method を定義することができます。
/**
* Determine number of times the job may be attempted.
*/
public function tries(): int
{
return 5;
}
時間ベースの試み
job が失敗するまでに何度試行されるかを定義する代わりに、 job が試行されなくなるべき時間を定義することもできます。これにより、特定の時間枠内であれば job を何度でも試行することができます。 job が試行されなくなるべき時間を定義するために、retryUntil
method を job class に追加してください。この method はDateTime
のインスタンスを返すべきです:
use DateTime;
/**
* Determine the time at which the job should timeout.
*/
public function retryUntil(): DateTime
{
return now()->addMinutes(10);
}
NOTE
また、queue に入れられた event listenersに対して、
tries
プロパティまたはretryUntil
method を定義することもできます。
Max Exceptions
時々、 job を何度も試行できるように設定したい場合がありますが、リトライが特定の数の未処理の exceptions によってトリガーされた場合には失敗すべきだと考えられます(release
method によって直接リリースされるのではなく)。これを実現するために、maxExceptions
プロパティを job class に定義することができます:
<?php
namespace App\Jobs;
use Illuminate\Support\Facades\Redis;
class ProcessPodcast implements ShouldQueue
{
/**
* The number of times the job may be attempted.
*
* @var int
*/
public $tries = 25;
/**
* The maximum number of unhandled exceptions to allow before failing.
*
* @var int
*/
public $maxExceptions = 3;
/**
* Execute the job.
*/
public function handle(): void
{
Redis::throttle('key')->allow(10)->every(60)->then(function () {
// Lock obtained, process the podcast...
}, function () {
// Unable to obtain lock...
return $this->release(10);
});
}
}
この例では、もし application が Redis ロックを取得できない場合、 job は 10 秒間リリースされ、最大 25 回まで再試行されます。ただし、 job が job によってスローされた 3 つの未処理の exceptions により失敗します。
Timeout
多くの場合、あなたは自分の queue に入れられた jobs がどのくらいの時間かかるかを概算しています。このため、 Laravel では、timeout の value を指定することができます。 default では、 timeout value は 60 秒です。 job が timeout value で指定された秒数よりも長く processing していた場合、その job を processing しているワーカーは error で終了します。通常、ワーカーはあなたの servers で設定された process マネージャーによって自動的に再起動されます。
jobs が実行できる最大秒数は、 Artisan command 行で--timeout
スイッチを使用して指定できます。
php artisan queue:work --timeout=30
job がタイムアウトを繰り返して最大試行回数を超えると、失敗とマークされます。
また、 job class 自体に job が実行を許可される最大秒数を定義することもできます。もし timeout が job 上で指定されている場合、それは command ライン上で指定された任意の timeout よりも優先されます。
<?php
namespace App\Jobs;
class ProcessPodcast implements ShouldQueue
{
/**
* The number of seconds the job can run before timing out.
*
* @var int
*/
public $timeout = 120;
}
場合によっては、ソケットや発信用の HTTP 接続などの IO ブロッキングプロセスがあなたが指定した timeout を尊重しない場合があります。そのため、これらの features を使用するときは、常にそれらの API を使用して timeout を指定するように attempt するべきです。例えば、Guzzle を使用する場合は、常に connection と request timeout value を指定するべきです。
WARNING
pcntl
PHP 拡張は、"job"のタイムアウトを指定するためにインストールされている必要があります。さらに、job の"timeout"value は常に、その"retry after"value よりも小さくなければなりません。 さもないと、"job"が実際に実行され終わったりタイムアウトする前に再試行される可能性があります。
Timeout で失敗
job を失敗とマークするべきであることを示したい場合は、タイムアウト時に$failOnTimeout
プロパティを jobclass で定義することができます。
/**
* Indicate if the job should be marked as failed on timeout.
*
* @var bool
*/
public $failOnTimeout = true;
Error Handling
job が処理されている最中に例外がスローされた場合、その job は自動的に queue に戻され、再度試行することができます。 job は、あなたの application で許されている最大試行回数に達するまでリリースされ続けます。最大試行回数は、queue:work
Artisan command で使用される--tries
スイッチによって定義されます。また、最大試行回数は job class 自体で定義することもできます。 queue ワーカーの実行に関する詳細情報はこちらから参照できます。
手動での Job のリリース
時には、手動で release 、 job を queue に戻し、後で再試行できるようにすることが望ましいかもしれません。これは、release
method を呼び出すことで実現できます。
/**
* Execute the job.
*/
public function handle(): void
{
// ...
$this->release();
}
default では、release
method は job を queue にすぐに処理するために戻します。しかし、整数または日付のインスタンスをrelease
method に渡すことで、一定の秒数が経過するまで job を処理できるように queue が作動しないように指示することができます:
$this->release(10);
$this->release(now()->addSeconds(10));
手動で Job を失敗させる
たまに、"" job ""を失敗として手動でマークする必要があるかもしれません。その場合は、fail
"" method ""を呼び出すことができます。
/**
* Execute the job.
*/
public function handle(): void
{
// ...
$this->fail();
}
例外をキャッチしたために、 job を失敗としてマークしたい場合、その例外をfail
method に渡すことができます。また、便宜上、 string error メッセージを渡すこともできます。これはあなたのために例外に変換されます:
$this->fail($exception);
$this->fail('Something went wrong.');
NOTE
失敗した jobs に関する詳細情報は、job の失敗に対処するためのドキュメンテーションをご覧ください。
Job Batching
Laravel の job バッチ feature を使用すると、簡単に batch の jobs を実行し、batch の jobs が実行を完了したときに何らかの action を実行できます。始める前に、完了率などの job バッチに関する meta 情報を含むテーブルを作成するための database マイグレーションを作成する必要があります。このマイグレーションは、make:queue-batches-table
Artisan command を使用して生成できます。
php artisan make:queue-batches-table
php artisan migrate
バッチ可能な Jobs の定義
batch 化可能な job を定義するには、通常どおりに queue 可能な jobを作成します。ただし、Illuminate\Bus\Batchable
トレイトを jobclass に追加する必要があります。このトレイトは、batch
method へのアクセスを提供し、job が実行中の現在の batch を取得するために使用することができます:
<?php
namespace App\Jobs;
use Illuminate\Bus\Batchable;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
class ImportCsv implements ShouldQueue
{
use Batchable, Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
/**
* Execute the job.
*/
public function handle(): void
{
if ($this->batch()->cancelled()) {
// Determine if the batch has been cancelled...
return;
}
// Import a portion of the CSV file...
}
}
バッチのディスパッチ
batch オブ jobs を dispatch するには、 Bus
facade の batch
method を使用する必要があります。もちろん、batch 処理は完了コールバックと組み合わせて使用することが主に有効です。 したがって、 batch の完了コールバックを定義するために、 then
、catch
、および finally
method を使用することができます。 これらのコールバックは、呼び出されるときに Illuminate\Bus\Batch
インスタンスを受け取ります。この例では、CSV ファイルから指定された行数を process する jobs の batch を queue イングしていると想像してみてください:
use App\Jobs\ImportCsv;
use Illuminate\Bus\Batch;
use Illuminate\Support\Facades\Bus;
use Throwable;
$batch = Bus::batch([
new ImportCsv(1, 100),
new ImportCsv(101, 200),
new ImportCsv(201, 300),
new ImportCsv(301, 400),
new ImportCsv(401, 500),
])->before(function (Batch $batch) {
// The batch has been created but no jobs have been added...
})->progress(function (Batch $batch) {
// A single job has completed successfully...
})->then(function (Batch $batch) {
// All jobs completed successfully...
})->catch(function (Batch $batch, Throwable $e) {
// First batch job failure detected...
})->finally(function (Batch $batch) {
// The batch has finished executing...
})->dispatch();
return $batch->id;
batch の ID は、$batch->id
プロパティを介してアクセスでき、dispatch 後の batch に関する情報について、query という Laravel command バスを使用して問い合わせることができます。
WARNING
batch のコールバックはシリアル化され、後で Laravel queue によって実行されますので、コールバック内で
$this
変数を使用しないでください。さらに、バッチ化された jobs は database transactions 内でラップされているため、暗黙のコミットを実行する database ステートメントは jobs 内で実行されるべきではありません。
バッチの命名
バッチが名前付けられている場合、 Laravel Horizon や Laravel Telescope のようなツールは、バッチに対するよりユーザーフレンドリーな debug 情報を提供するかもしれません。任意の名前を batch に割り当てるためには、name
method を batch を定義する際に呼び出すことができます。
$batch = Bus::batch([
// ...
])->then(function (Batch $batch) {
// All jobs completed successfully...
})->name('Import CSV')->dispatch();
Batch Connection と Queue
バッチ化された jobs で使用するべき connection と queue を指定したい場合は、onConnection
メソッドと onQueue
メソッドを使用することができます。すべてのバッチ化された jobs は、同じ connection と queue 内で実行する必要があります:
$batch = Bus::batch([
// ...
])->then(function (Batch $batch) {
// All jobs completed successfully...
})->onConnection('redis')->onQueue('imports')->dispatch();
チェーンとバッチ
chained jobs のセットを、 array に chained jobs を置くことにより batch 内で定義することができます。例えば、二つの job チェーンを平行して実行し、両方の job チェーンが processing を終了したときにコールバックを実行することができます:
use App\Jobs\ReleasePodcast;
use App\Jobs\SendPodcastReleaseNotification;
use Illuminate\Bus\Batch;
use Illuminate\Support\Facades\Bus;
Bus::batch([
[
new ReleasePodcast(1),
new SendPodcastReleaseNotification(1),
],
[
new ReleasePodcast(2),
new SendPodcastReleaseNotification(2),
],
])->then(function (Batch $batch) {
// ...
})->dispatch();
逆に、chain内でバッチを定義することにより、一連の jobs を一括で実行できます。例えば、最初に複数のポッドキャストを release するための batch の jobs を実行し、その後 release notifications を送るための batch の jobs を実行できます。
use App\Jobs\FlushPodcastCache;
use App\Jobs\ReleasePodcast;
use App\Jobs\SendPodcastReleaseNotification;
use Illuminate\Support\Facades\Bus;
Bus::chain([
new FlushPodcastCache,
Bus::batch([
new ReleasePodcast(1),
new ReleasePodcast(2),
]),
Bus::batch([
new SendPodcastReleaseNotification(1),
new SendPodcastReleaseNotification(2),
]),
])->dispatch();
バッチに Jobs を追加する
時には、バッチ化された job の中から追加の jobs を batch に加えることが有用な場合があります。これは、数千の jobs を batch 化する必要があり、それが web request の間に dispatch するには時間がかかりすぎる場合に役立ちます。したがって、代わりに、最初の batch を"loader"の jobs で dispatch し、その batch をさらに多くの jobs で補充することを希望するかもしれません:
$batch = Bus::batch([
new LoadImportBatch,
new LoadImportBatch,
new LoadImportBatch,
])->then(function (Batch $batch) {
// All jobs completed successfully...
})->name('Import Contacts')->dispatch();
この例では、LoadImportBatch
job を使用して、追加の jobs で batch を補充します。これを達成するためには、ジョブのbatch
method を介してアクセス可能な batch インスタンス上のadd
method を使用することができます。
use App\Jobs\ImportContacts;
use Illuminate\Support\Collection;
/**
* Execute the job.
*/
public function handle(): void
{
if ($this->batch()->cancelled()) {
return;
}
$this->batch()->add(Collection::times(1000, function () {
return new ImportContacts;
}));
}
WARNING
同じ batch に所属する job からのみ、 jobs を batch に追加することができます。
バッチの検査
Illuminate\Bus\Batch
インスタンスは、 batch の完了コールバックに提供され、特定の batch の jobs を操作および検査するためのさまざまなプロパティとメソッドを備えています。
// The UUID of the batch...
$batch->id;
// The name of the batch (if applicable)...
$batch->name;
// The number of jobs assigned to the batch...
$batch->totalJobs;
// The number of jobs that have not been processed by the queue...
$batch->pendingJobs;
// The number of jobs that have failed...
$batch->failedJobs;
// The number of jobs that have been processed thus far...
$batch->processedJobs();
// The completion percentage of the batch (0-100)...
$batch->progress();
// Indicates if the batch has finished executing...
$batch->finished();
// Cancel the execution of the batch...
$batch->cancel();
// Indicates if the batch has been cancelled...
$batch->cancelled();
Routes からバッチを返す
すべての Illuminate\Bus\Batch
インスタンスは JSON シリアライズ可能であり、つまり、あなたのアプリケーションの routes から直接それらを返すことができます。これにより、 batch に関する情報、特にその完了進捗の情報を含む JSON ペイロードを取得できます。これにより、アプリケーションの UI でバッチの完了進捗情報を表示することが便利になります。
ID によって batch を取得するには、Bus
facade のfindBatch
method を使用することができます:
use Illuminate\Support\Facades\Bus;
use Illuminate\Support\Facades\Route;
Route::get('/batch/{batchId}', function (string $batchId) {
return Bus::findBatch($batchId);
});
バッチのキャンセル
時々、特定のバッチの実行を cancel する必要があるかもしれません。これは、Illuminate\Bus\Batch
インスタンス上のcancel
method を呼び出すことで達成できます:
/**
* Execute the job.
*/
public function handle(): void
{
if ($this->user->exceedsImportLimit()) {
return $this->batch()->cancel();
}
if ($this->batch()->cancelled()) {
return;
}
}
前の例でお気づきかもしれませんが、batch 化された job は、その対応する batch が cancel される前に通常決定する必要があります。ただし、便宜上、SkipIfBatchCancelled
middleware を job に代入することもできます。その名前が示すように、この middleware は、Laravel に対して、その対応する batch が cancel された場合は job を処理しないよう指示します。
use Illuminate\Queue\Middleware\SkipIfBatchCancelled;
/**
* Get the middleware the job should pass through.
*/
public function middleware(): array
{
return [new SkipIfBatchCancelled];
}
Batch 失敗
バッチ処理の job が失敗すると、catch
コールバック(割り当てられている場合)が呼び出されます。このコールバックは、 batch 内で最初に失敗した job に対してのみ呼び出されます。
失敗を許可する
batch 内の job が失敗した場合、 Laravel は自動的に batch をキャンセル済にマークします。必要であれば、 job の失敗が自動的に batch をキャンセルしないように、この動作を無効にすることも可能です。これは、 batch をディスパッチしている間に、allowFailures
method を呼び出すことで実現できます。
$batch = Bus::batch([
// ...
])->then(function (Batch $batch) {
// All jobs completed successfully...
})->allowFailures()->dispatch();
失敗した Batch Jobs の再試行
便利のために、Laravel はqueue:retry-batch
Artisan command を提供しており、特定の batch の失敗した jobs すべてを簡単に retry することができます。 queue:retry-batch
command は、retry するための失敗した jobs を持つ batch の UUID を受け付けます。
php artisan queue:retry-batch 32dbc76c-4f82-4749-b610-a639fe0099b5
バッチの剪定
job_batches
テーブルが非常に迅速にレコードを蓄積することがあります。これを軽減するためには、queue:prune-batches
の Artisan command をscheduleして、毎日実行すべきです。
use Illuminate\Support\Facades\Schedule;
Schedule::command('queue:prune-batches')->daily();
default では、24 時間以上前に終了したすべての batches が整理されます。 hours
option を使って、command を呼び出す際に batchesdata を保持する時間を決定することができます。例えば、次の command は、48 時間前に終了したすべての batches を削除します:
use Illuminate\Support\Facades\Schedule;
Schedule::command('queue:prune-batches --hours=48')->daily();
時折、あなたの jobs_batches
テーブルは、成功して完了しなかったバッチの batch レコードを蓄積する可能性があります。例えば、 job が失敗し、その job が成功するまで再試行されなかったバッチなどです。これら未完の batch レコードを削除するために、あなたはqueue:prune-batches
command にunfinished
オプションを使用して指示することができます:
use Illuminate\Support\Facades\Schedule;
Schedule::command('queue:prune-batches --hours=48 --unfinished=72')->daily();
同様に、あなたの jobs_batches
テーブルはキャンセルされたバッチの batch レコードも蓄積するかもしれません。 queue:prune-batches
command を指示して、cancelled
オプションを使ってこれらのキャンセルされた batch レコードを削除することができます:
use Illuminate\Support\Facades\Schedule;
Schedule::command('queue:prune-batches --hours=48 --cancelled=72')->daily();
バッチを DynamoDB に保存する
Laravel は、リレーショナルな database の代わりに、DynamoDB に batch meta 情報を格納するサポートも提供しています。ただし、 batch レコードをすべて格納するための DynamoDB テーブルを手動で作成する必要があります。
通常、このテーブルはjob_batches
と名付けるべきですが、application のqueue
設定ファイル内のqueue.batching.table
設定の value に基づいてテーブルに名前をつけるべきです。
DynamoDB Batch テーブル設定
job_batches
テーブルは、application
という名前の string primary パーティションキーと、id
という名前の string primary ソートキーを持つべきです。キーのapplication
部分には、アプリケーションのapp
設定ファイル内のname
設定 value で定義されたアプリケーションの名前が含まれるでしょう。 application 名は DynamoDB テーブルのキーの一部であるため、同じテーブルを使用して複数の Laravel アプリケーションの job バッチを保存することができます。
さらに、自動的な batch 剪定を活用したい場合は、テーブルに対してttl
attribute を定義することもできます。
DynamoDB の設定
次に、あなたの Laravel application が Amazon の DynamoDB と通信できるように、AWS SDK をインストールします。
composer require aws/aws-sdk-php
次に、queue.batching.driver
設定オプションの value をdynamodb
に設定します。さらに、key
、secret
、およびregion
設定の options をbatching
設定の array 内に定義する必要があります。これらの options は、AWS との認証に使用されます。dynamodb
driver を使用する場合、queue.batching.database
設定オプションは不要です。
'batching' => [
'driver' => env('QUEUE_FAILED_DRIVER', 'dynamodb'),
'key' => env('AWS_ACCESS_KEY_ID'),
'secret' => env('AWS_SECRET_ACCESS_KEY'),
'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),
'table' => 'job_batches',
],
DynamoDB でのバッチの剪定
DynamoDB を使用して job batch 情報を保存する場合、リレーショナル database に保存された batches を剪定するために使用される一般的な剪定 commands は機能しません。代わりに、DynamoDB のネイティブ TTL 機能 を利用して、古い batches のレコードを自動的に削除することができます。
DynamoDB テーブルを ttl
attribute で定義した場合、batch レコードを削除する方法を Laravel に指示するための設定パラメータを定義できます。queue.batching.ttl_attribute
設定 value は TTL を保持する attribute の名前を定義し、queue.batching.ttl
設定 value はレコードが更新されてからどのくらいの時間が経過した後に batch レコードを DynamoDB テーブルから削除できるかを定義します。
'batching' => [
'driver' => env('QUEUE_FAILED_DRIVER', 'dynamodb'),
'key' => env('AWS_ACCESS_KEY_ID'),
'secret' => env('AWS_SECRET_ACCESS_KEY'),
'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),
'table' => 'job_batches',
'ttl_attribute' => 'ttl',
'ttl' => 60 * 60 * 24 * 7, // 7 days...
],
Queueing Closures
job class を queue に dispatch する代わりに、クロージャを dispatch することもできます。これは、現在の request サイクルの外部で実行する必要がある速やかで単純なタスクに最適です。クロージャを queue に dispatch する際、クロージャの code content は、転送中に変更されないように暗号化されて署名されます。
$podcast = App\Podcast::find(1);
dispatch(function () use ($podcast) {
$podcast->publish();
});
catch
method を使用すると、queue の設定された retry の試みをすべて使い果たした後に、queue のクロージャが正常に完了しなかった場合に実行されるべきクロージャを提供できます。
use Throwable;
dispatch(function () use ($podcast) {
$podcast->publish();
})->catch(function (Throwable $e) {
// This job has failed...
});
WARNING
catch
のコールバックはシリアル化され、後で Laravel queue によって実行されるため、catch
コールバック内で$this
変数を使用すべきではありません。
Running the Queue Worker
queue:work
Command
Laravel には Artisan command が含まれており、queue 上に新しい jobs が追加されると、queue の作業者を起動して新しい jobs を process します。queue:work
Artisan command を使用して作業者を実行することができます。queue:work
command を開始すると、手動で停止するかターミナルを閉じるまで実行し続けることに注意してください。
php artisan queue:work
NOTE
queue:work
process をバックグラウンドで常に実行し続けるためには、 process 監視ツールであるSupervisorのようなものを使用して、 queue ワーカーが実行を停止しないようにするべきです。
コマンドの output に処理済みの job ID を含めたい場合、queue:work
command を呼び出す際に、-v
フラグを含めることができます。
php artisan queue:work -v
覚えておいてください、 queue のワーカーは長時間生き続けるプロセスで、起動した application state をメモリに保存します。したがって、ワーカーが起動した後では、 code ベースの変更は認識しないでしょう。したがって、 deployment process の間には、必ずqueue のワーカーを再起動してください。さらに、あなたの application が作成または変更した任意の静的な state は、 jobs の間に自動的に reset されないことを覚えておいてください。
あるいは、queue:listen
の command を実行することもできます。queue:listen
の command を使用すると、更新されたコードをリロードしたり、application の状態を reset したいときに、手動でワーカーを再起動する必要はありません。しかし、この command はqueue:work
の command よりも効率が著しく低いです。
php artisan queue:listen
複数の Queue ワーカーの実行
複数のワーカーを queue に割り当てて process jobs concurrently するためには、単純に複数のqueue:work
プロセスを開始するだけです。これは、ターミナルの複数のタブをローカルで使用するか、または production で process マネージャーの設定を使用して行うことができます。Supervisor を使用する場合、numprocs
設定の value を使用することができます。
Connection と Queue を指定する
また、ワーカーが使用するべき queue connection を指定することもできます。work
command に渡された connection 名は、config/queue.php
設定ファイルで定義された接続のいずれかに対応するべきです:
php artisan queue:work redis
default として、queue:work
command は、特定の接続上の defaultqueue のみの jobs を処理します。ただし、特定の接続のための特定の queue のみを処理することにより、queue ワーカーをさらにカスタマイズすることができます。例えば、すべての mail が、あなたのredis
queue 接続上のemails
queue で処理される場合、次の command を発行して、その queue のみを処理するワーカーを開始することができます:
php artisan queue:work redis --queue=emails
Processing という特定の数の Jobs の処理
--once
オプションは、ワーカーに対して queue から single job を process するよう指示するために使用できます:
php artisan queue:work --once
--max-jobs
option は、指定された数の jobs を処理した後にワーカーに終了を指示するために使用できます。この option はSupervisorと組み合わせることで、ワーカーが特定の数の jobs の処理を完了した後に自動的に再開され、蓄積されたメモリを解放する際に役立つかもしれません。
php artisan queue:work --max-jobs=1000
Processing すべてのキューに入れられた Jobs そしてその後終了
--stop-when-empty
option は、全ての jobs を process した後にワーカーに対して正常終了するよう指示するために使用できます。Docker コンテナ内で Laravel queues を processing していて、 queue が空になった後にコンテナをシャットダウンしたい場合に便利な option です:
php artisan queue:work --stop-when-empty
特定の秒数での Processing Jobs
--max-time
オプションは、指定された秒数だけ processing jobs を行い、その後で終了するようにワーカーに指示するために使用できます。このオプションは、ワーカーが指定された時間だけ process jobs を行った後に自動的に再起動され、蓄積したメモリが解放されるように、Supervisorと組み合わせて使用すると便利です。
# Process jobs for one hour and then exit...
php artisan queue:work --max-time=3600
ワーカーの Sleep 時間
jobs が queue に利用可能な時、ワーカーは jobs の processing jobs を delay なしに続けます。ただし、sleep
オプションは、利用可能な jobs がない場合にワーカーが眠る秒数を決定します。もちろん、眠っている間、ワーカーは新しい jobs を process しません:
php artisan queue:work --sleep=3
Maintenance Mode and Queues
あなたの application がmaintenance modeにある間、キューに入った jobs は処理されません。 jobs は、 application がメンテナンスモードから戻った後、通常どおりに処理されます。
--force
オプションを使用して、メンテナンスモードが有効でも queue のワーカーに process jobs させることができます。
php artisan queue:work --force
Resource の考慮事項
Daemon queue ワーカーは、各 job を processing する前にフレームワークを "reboot"しません。したがって、各 job が終了した後は、重い resources を release する必要があります。例えば、GD library を用いて image 操作を行っている場合は、 image の processing が終了したら imagedestroy
でメモリを解放すべきです。
Queue の優先度
時折、どのように queues が処理されるかを優先したい場合があります。例えば、config/queue.php
の設定ファイルで、redis
の connection に対する default のqueue
をlow
に設定することができます。しかし、たまには、以下のようにhigh
の優先度を持つ queue に job をプッシュしたいこともあるでしょう。
dispatch((new Job)->onQueue('high'));
high
queue jobs がすべて処理されてから、 low
queue の jobs に進むことを確認するワーカーを開始するには、 work
command にカンマで区切られた queue の名前のリストを渡します。
php artisan queue:work --queue=high,low
Queue ワーカーと Deployment
queue ワーカーは長寿命なプロセスなので、再起動されるまであなたの code の変更に気づかないでしょう。よって、 queue ワーカーを使って application をデプロイする最もシンプルな方法は、 deployment process の間にワーカーを再起動することです。 queue:restart
command を発行することで、すべてのワーカーを優雅に再起動することができます。
php artisan queue:restart
この command は、すべての queue ワーカーに対して、現在の job の processing が終了したら、既存の jobs が失われないように、優雅に終了するように指示します。 queue ワーカーは queue:restart
command が実行されると終了するので、 queue ワーカーを自動的に再起動するために、 Supervisor のような process マネージャーを実行している必要があります。
NOTE
queue は再起動シグナルを保存するためにcacheを使用します。したがって、この feature を使用する前に、 application の cache driver が適切に設定されていることを確認するべきです。
Job の有効期限とタイムアウト
Job の有効期限
config/queue.php
設定ファイルでは、各 queue connection が retry_after
オプションを定義しています。このオプションは、queue connection が job の再試行を行う前にどのくらいの秒数待機するかを指定します。例えば、retry_after
の value が 90
に設定されている場合、job が 90 秒間処理され、リリースまたは削除されなかった場合、queue に再度リリースされます。通常、retry_after
の value を、jobs が完了するのに合理的にかかる最大秒数に設定する必要があります。
WARNING
retry_after
の value を持たない唯一の queue 接続は Amazon SQS です。SQS は、AWS の console 内で管理されているDefault の視認性タイムアウト に基づいて job を再試行します。
ワーカータイムアウト
queue:work
の Artisan command には--timeout
option があります。default では、--timeout
の value は 60 秒です。もし job の処理がタイムアウト値で指定された秒数より長く続く場合、その job を処理しているワーカーは error で終了します。通常、ワーカーはあなたの servers に設定された process マネージャーによって自動的に再起動されます。
php artisan queue:work --timeout=60
retry_after
設定オプションと--timeout
CLI オプションは異なりますが、一緒に動作して jobs が失われないことと、 jobs が一度だけ正常に処理されることを確保します。
WARNING
--timeout
の value は、常にretry_after
の設定 value よりも少なくとも数秒短くなければなりません。これにより、固まった job を processing しているワーカーが、 job が再試行される前に常に終了することが保証されます。もし--timeout
オプションがretry_after
の設定 value よりも長い場合、 jobs が二度処理される可能性があります。
Supervisor Configuration
production では、queue:work
timeout を稼働し続ける方法が必要です。queue:work
process は、ワーカータイムアウト を超過したり、queue:restart
command の実行など、様々な理由で稼働を停止することがあります。
このため、あなたの queue:work
プロセスが終了したときに検出し、自動的に再起動できる process モニターを設定する必要があります。さらに、 process モニターは、あなたが concurrently で実行したい queue:work
プロセスの数を指定できます。Supervisor は、Linux environments で一般的に使用される process モニターであり、次のドキュメントでその設定方法について説明します。
Installing Supervisor
Supervisor は Linux オペレーティングシステムの process 監視ツールであり、queue:work
プロセスが失敗した場合は自動的に再起動します。Ubuntu に Supervisor をインストールするには、次の command を使用することができます:
sudo apt-get install supervisor
NOTE
Supervisor 自体の設定と管理が厳しそうだと思われる方は、Laravel Forge の使用を検討してみてください。これにより、 production Laravel のプロジェクトに対して Supervisor が自動的にインストールおよび設定されます。
Supervisor の設定
Supervisor の構成ファイルは通常、/etc/supervisor/conf.d
ディレクトリに保存されます。このディレクトリ内では、プロセスがどのように監視されるべきかを Supervisor に指示する任意の数の設定ファイルを作成することが可能です。例えば、queue:work
プロセスを起動し監視するlaravel-worker.conf
ファイルを作成してみましょう:
[program:laravel-worker]
process_name=%(program_name)s_%(process_num)02d
command=php /home/forge/app.com/artisan queue:work sqs --sleep=3 --tries=3 --max-time=3600
autostart=true
autorestart=true
stopasgroup=true
killasgroup=true
user=forge
numprocs=8
redirect_stderr=true
stdout_logfile=/home/forge/app.com/worker.log
stopwaitsecs=3600
この例では、numprocs
ディレクティブは Supervisor に対し、8 つのqueue:work
プロセスを実行し、それらすべてを監視するよう指示します。それらが失敗した場合、自動的に再起動します。設定のcommand
ディレクティブは、あなたが望む queue connection とワーカーの options を反映するように変更すべきです。
WARNING
stopwaitsecs
の value が最長の job の実行にかかる秒数よりも大きいことを確認してください。そうでない場合、Supervisor は job が processing を終える前に殺してしまうかもしれません。
Starting Supervisor
設定ファイルが作成されたら、次のコマンドを使用して Supervisor の設定を update し、プロセスを起動することができます:
sudo supervisorctl reread
sudo supervisorctl update
sudo supervisorctl start "laravel-worker:*"
詳しくは、Supervisor のドキュメント をご覧ください。
Dealing With Failed Jobs
時折、キューに入れた jobs が失敗します。心配しないでください、物事が常に計画通りに進むわけではありません! Laravel には、job が試行される最大回数を指定する 便利な方法が含まれています。非同期の job がこの試行回数を超えると、それは failed_jobs
database テーブルに挿入されます。同期的にディスパッチされた jobsは、失敗してもこのテーブルには保存されず、その exceptions はすぐに application によって処理されます。
failed_jobs
テーブルを作成する migration は、新規の Laravel アプリケーションには通常すでに存在します。しかし、もし、あなたの application がこのテーブル用の migration を含んでいない場合、make:queue-failed-table
command を使用して migration を作成することができます:
php artisan make:queue-failed-table
php artisan migrate
queue worker process を実行する際に、 --tries
スイッチを使用して、 job が試行される最大回数を指定することができます。 queue:work
command の --tries
オプションに value を指定しない場合、 jobs は一回だけ試行されるか、 job クラスの $tries
プロパティで指定された回数だけ試行されます。
php artisan queue:work redis --tries=3
--backoff
option を使用すると、 Laravel が例外に遭遇した job を再試行するまで何秒待つべきかを指定できます。 default では、 job はすぐに queue に戻されて再試行できるようになります:
php artisan queue:work redis --tries=3 --backoff=3
Laravel が例外に遭遇した job を再試行するまでに待つべき秒数をジョブごとに設定したい場合は、backoff
プロパティを job class に定義することで行うことができます:
/**
* The number of seconds to wait before retrying the job.
*
* @var int
*/
public $backoff = 3;
もし、ジョブの backoff 時間を決定するためのより複雑なロジックが必要な場合、 job class にbackoff
method を定義することができます。
/**
* Calculate the number of seconds to wait before retrying the job.
*/
public function backoff(): int
{
return 3;
}
backoff
method からの array を返すことで、"指数関数的"なバックオフを簡単に設定できます。この例では、最初のリトライのためのリトライ遅延は 1 秒、2 回目のリトライは 5 秒、3 回目のリトライは 10 秒、それ以降の残りの試行回数がある場合のすべてのリトライは 10 秒となります。
/**
* Calculate the number of seconds to wait before retrying the job.
*
* @return array<int, int>
*/
public function backoff(): array
{
return [1, 5, 10];
}
失敗した Jobs の後始末
特定の job が失敗した場合、 users に alert を送信したり、 job によって部分的に完了したアクションを取り消すことを望むかもしれません。これを達成するために、 job class 上に failed
method を定義することができます。 job を失敗させた原因となる Throwable
インスタンスが failed
method に渡されます:
<?php
namespace App\Jobs;
use App\Models\Podcast;
use App\Services\AudioProcessor;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use Throwable;
class ProcessPodcast implements ShouldQueue
{
use InteractsWithQueue, Queueable, SerializesModels;
/**
* Create a new job instance.
*/
public function __construct(
public Podcast $podcast,
) {}
/**
* Execute the job.
*/
public function handle(AudioProcessor $processor): void
{
// Process uploaded podcast...
}
/**
* Handle a job failure.
*/
public function failed(?Throwable $exception): void
{
// Send user notification of failure, etc...
}
}
WARNING
failed
メソッドを呼び出す前に、新しい job のインスタンスが生成されます。したがって、handle
method 内で発生したかもしれない class プロパティの変更は失われます。
失敗した Jobs の再試行
failed_jobs
database テーブルに挿入されたすべての失敗した jobs を表示するには、queue:failed
Artisan command を使用できます。
php artisan queue:failed
queue:failed
command は、job の ID、接続、queue、失敗時刻、及びその他の情報をリストします。job の ID は、失敗した job を再試行するために使用することができます。例えば、ID がce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece
の失敗した job を再試行するには、次の command を入力します。
php artisan queue:retry ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece
必要に応じて、複数の ID を command に渡すことができます:
php artisan queue:retry ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece 91401d2c-0784-4f43-824c-34f94a33c24d
また、特定の queue に対して失敗したすべての jobs を retry することもできます:
php artisan queue:retry --queue=name
すべての失敗した jobs を retry するには、queue:retry
command を実行し、ID としてall
を渡します:
php artisan queue:retry all
失敗した job を delete したい場合は、queue:forget
という command を使用できます:
php artisan queue:forget 91401d2c-0784-4f43-824c-34f94a33c24d
NOTE
Horizonを使用するとき、失敗した job を delete するためには、
queue:forget
command の代わりにhorizon:forget
command を使うべきです。
failed_jobs
テーブルからすべての失敗した jobs を delete するために、queue:flush
command を使用することができます:
php artisan queue:flush
不足している Models を無視する
Eloquent model を job に注入すると、 model は自動的にシリアル化されて queue に置かれ、 job が処理されるときに database から再取得されます。しかし、 job がワーカーによって処理を待っている間に model が削除されていた場合、 job は ModelNotFoundException
で失敗する可能性があります。
便宜上、ジョブの deleteWhenMissingModels
プロパティを true
に設定して、欠落している models を伴う job を自動的に 削除することを選択することができます。このプロパティが true
に設定されている場合、Laravel は例外を発生させずに job を静かに破棄します。
/**
* Delete the job if its models no longer exist.
*
* @var bool
*/
public $deleteWhenMissingModels = true;
Jobs の剪定に失敗しました
failed_jobs
テーブルのレコードを剪定するために、queue:prune-failed
Artisan command を呼び出すことができます:
php artisan queue:prune-failed
default によれば、24 時間以上前のすべての失敗した job 記録は剪定されます。もし--hours
option を command に与えると、過去 N 時間以内に挿入された失敗した job 記録のみが保持されます。例えば、以下の command は 48 時間以上前に挿入されたすべての失敗した job 記録を削除します:
php artisan queue:prune-failed --hours=48
失敗した Jobs を DynamoDB に保存する
Laravel は、失敗した job の記録をリレーショナル database のテーブルではなく、DynamoDB に保存するサポートも提供しています。ただし、失敗した job の記録をすべて保存するための DynamoDB のテーブルは手動で作成する必要があります。通常、このテーブルは failed_jobs
と名付けるべきですが、application の queue
設定ファイル内の queue.failed.table
設定の value に基づいてテーブルの名前を付けるべきです。
failed_jobs
テーブルは、application
という名前の string primary パーティションキーと、uuid
という名前の string primary ソートキーを持つべきです。キーのapplication
部分には、アプリケーションのapp
設定ファイル内のname
設定 value によって定義されるアプリケーションの名前が含まれます。 application 名は DynamoDB テーブルのキーの一部であるため、複数の Laravel アプリケーションの失敗した jobs を同じテーブルに保存することができます。
また、AWS SDK をインストールして、あなたの Laravel application が Amazon の DynamoDB と通信できるように確認してください:
composer require aws/aws-sdk-php
次に、queue.failed.driver
の設定オプションの value をdynamodb
に設定します。さらに、失敗した job の設定 array 内に、key
、secret
、およびregion
の設定 options を定義する必要があります。これらの options は、AWS への認証に使用されます。dynamodb
の driver を使用する場合、queue.failed.database
の設定オプションは不要です:
'failed' => [
'driver' => env('QUEUE_FAILED_DRIVER', 'dynamodb'),
'key' => env('AWS_ACCESS_KEY_ID'),
'secret' => env('AWS_SECRET_ACCESS_KEY'),
'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),
'table' => 'failed_jobs',
],
失敗した Job Storage の無効化
queue.failed.driver
設定オプションの value をnull
に設定することで、 Laravel に失敗した jobs を保存せずに破棄するよう指示できます。通常、これはQUEUE_FAILED_DRIVER
environment 変数を通じて達成できます:
QUEUE_FAILED_DRIVER=null
失敗した Job Events
job が失敗した時に呼び出される event リスナーを register したい場合は、Queue
ファサードのfailing
method を使用できます。例えば、 Laravel に含まれているAppServiceProvider
のboot
method からこの event にクロージャを attach することができます。
<?php
namespace App\Providers;
use Illuminate\Support\Facades\Queue;
use Illuminate\Support\ServiceProvider;
use Illuminate\Queue\Events\JobFailed;
class AppServiceProvider extends ServiceProvider
{
/**
* Register any application services.
*/
public function register(): void
{
// ...
}
/**
* Bootstrap any application services.
*/
public function boot(): void
{
Queue::failing(function (JobFailed $event) {
// $event->connectionName
// $event->job
// $event->exception
});
}
}
Clearing Jobs From Queues
NOTE
Horizon を使用するときは、jobs を queue からクリアするために
queue:clear
command の代わりにhorizon:clear
command を使用すべきです。
もし全ての jobs を default connection の default queue から delete したい場合、queue:clear
の Artisan command を使用して行うことができます:
php artisan queue:clear
また、特定の connection と queue から delete jobs を削除するために、connection
引数と queue
オプションを提供することもできます:
php artisan queue:clear redis --queue=emails
WARNING
SQS、 Redis 、および database queue ドライバーのみが queues からの jobs の削除を利用できます。さらに、SQS メッセージ削除 process は最大 60 秒かかるため、 queue をクリアした後の 60 秒以内に SQS queue に送信された jobs も削除される可能性があります。
Monitoring Your Queues
あなたの queue が急激に queue job を受け取ると、その処理能力を超えてしまい、 jobs の完了までの待ち時間が長くなる可能性があります。もしよければ、 jobs が指定した閾値を超えたときに、 Laravel を使って alert を出すことができます。
始めるには、queue:monitor
の command を毎分実行するように schedule する必要があります。 command は、監視したい queues の名前と、希望する job のカウントのしきい values を受け入れます:
php artisan queue:monitor redis:default,redis:deployments --max=100
この command をスケジュールするだけでは、キューが溢れた status を示す notification をトリガーするには十分ではありません。 command が job の数があなたの閾値を超えている queue に遭遇したとき、Illuminate\Queue\Events\QueueBusy
event がディスパッチされます。この event は、アプリケーションの AppServiceProvider
でリッスンして、あなたやあなたの開発チームに notification を送ることができます。
use App\Notifications\QueueHasLongWaitTime;
use Illuminate\Queue\Events\QueueBusy;
use Illuminate\Support\Facades\Event;
use Illuminate\Support\Facades\Notification;
/**
* Bootstrap any application services.
*/
public function boot(): void
{
Event::listen(function (QueueBusy $event) {
Notification::route('mail', 'dev@example.com')
->notify(new QueueHasLongWaitTime(
$event->connection,
$event->queue,
$event->size
));
});
}
Testing
job を dispatches する testing コードを testing する際には、job のコードを直接、そしてそれを dispatches するコードから切り離して testing することができるため、Laravel に対して実際に job を実行しないよう指示することが望ましいかもしれません。もちろん、job そのものを testing するためには、job のインスタンスを生成し、testing でhandle
method を直接呼び出すこともできます。
Queue
facade のfake
method を使用して、キューに実際にプッシュされる jobs を防ぐことができます。Queue
facade のfake
method を呼び出した後、application が jobs を queue にプッシュしようとしたことを assert することができます:
<?php
use App\Jobs\AnotherJob;
use App\Jobs\FinalJob;
use App\Jobs\ShipOrder;
use Illuminate\Support\Facades\Queue;
test('orders can be shipped', function () {
Queue::fake();
// Perform order shipping...
// Assert that no jobs were pushed...
Queue::assertNothingPushed();
// Assert a job was pushed to a given queue...
Queue::assertPushedOn('queue-name', ShipOrder::class);
// Assert a job was pushed twice...
Queue::assertPushed(ShipOrder::class, 2);
// Assert a job was not pushed...
Queue::assertNotPushed(AnotherJob::class);
// Assert that a Closure was pushed to the queue...
Queue::assertClosurePushed();
// Assert the total number of jobs that were pushed...
Queue::assertCount(3);
});
<?php
namespace Tests\Feature;
use App\Jobs\AnotherJob;
use App\Jobs\FinalJob;
use App\Jobs\ShipOrder;
use Illuminate\Support\Facades\Queue;
use Tests\TestCase;
class ExampleTest extends TestCase
{
public function test_orders_can_be_shipped(): void
{
Queue::fake();
// Perform order shipping...
// Assert that no jobs were pushed...
Queue::assertNothingPushed();
// Assert a job was pushed to a given queue...
Queue::assertPushedOn('queue-name', ShipOrder::class);
// Assert a job was pushed twice...
Queue::assertPushed(ShipOrder::class, 2);
// Assert a job was not pushed...
Queue::assertNotPushed(AnotherJob::class);
// Assert that a Closure was pushed to the queue...
Queue::assertClosurePushed();
// Assert the total number of jobs that were pushed...
Queue::assertCount(3);
}
}
assertPushed
またはassertNotPushed
methods にクロージャを渡すことで、特定の"真実 test"に合格する"job"がプッシュされたことを"主張"できます。少なくとも 1 つの"job"が指定された真実 test に合格してプッシュされた場合、その主張は成功となります:
Queue::assertPushed(function (ShipOrder $job) use ($order) {
return $job->order->id === $order->id;
});
Jobs の一部を偽装する
他の jobs を通常通り実行させつつ、特定の jobs だけを faking する必要がある場合、faking すべき jobs の class 名をfake
method に渡すことができます:
test('orders can be shipped', function () {
Queue::fake([
ShipOrder::class,
]);
// Perform order shipping...
// Assert a job was pushed twice...
Queue::assertPushed(ShipOrder::class, 2);
});
public function test_orders_can_be_shipped(): void
{
Queue::fake([
ShipOrder::class,
]);
// Perform order shipping...
// Assert a job was pushed twice...
Queue::assertPushed(ShipOrder::class, 2);
}
あなたはexcept
の method を使用して、特定のセットの jobs を除いて、すべての jobs を fake することができます。
Queue::fake()->except([
ShipOrder::class,
]);
Testing Job チェーン
job チェーンを test するには、Bus
facade のフェイキング機能を利用する必要があります。Bus
facade の assertChained
method を使用して、jobs のチェーンが dispatch されたことを assert することができます。assertChained
method は、その最初の引数としてチェーンされた jobs の array を受け入れます:
use App\Jobs\RecordShipment;
use App\Jobs\ShipOrder;
use App\Jobs\UpdateInventory;
use Illuminate\Support\Facades\Bus;
Bus::fake();
// ...
Bus::assertChained([
ShipOrder::class,
RecordShipment::class,
UpdateInventory::class
]);
上記の例で見ることができるように、連鎖した jobs の array は、ジョブの class 名の array であるかもしれません。しかし、実際の job インスタンスの array を提供することも可能です。 Laravel がその場合には、提供された job インスタンスが同じ class であり、連鎖した jobs があなたの application によって発行されたときと同じプロパティ values を持っていることを確認します。
Bus::assertChained([
new ShipOrder,
new RecordShipment,
new UpdateInventory,
]);
assertDispatchedWithoutChain
の method を使用して、チェーンなしの job がプッシュされたことを assert することができます。
Bus::assertDispatchedWithoutChain(ShipOrder::class);
Testing チェーンの変更
もし連鎖された job が既存のチェーンに jobs を先頭または末尾に追加する場合、その job が残りの jobs の期待されるチェーンを持っていることを assert するために job のassertHasChain
method を使用することができます:
$job = new ProcessPodcast;
$job->handle();
$job->assertHasChain([
new TranscribePodcast,
new OptimizePodcast,
new ReleasePodcast,
]);
assertDoesntHaveChain
method は、job の残りのチェーンが空であることを主張するために使用できます。
$job->assertDoesntHaveChain();
Testing チェーン化されたバッチ
もし、あなたの job チェーンがbatch の jobs を含んでいる場合、チェーンアサーション内にBus::chainedBatch
の定義を挿入することで、連鎖した batch があなたの期待通りであると assert することができます。
use App\Jobs\ShipOrder;
use App\Jobs\UpdateInventory;
use Illuminate\Bus\PendingBatch;
use Illuminate\Support\Facades\Bus;
Bus::assertChained([
new ShipOrder,
Bus::chainedBatch(function (PendingBatch $batch) {
return $batch->jobs->count() === 3;
}),
new UpdateInventory,
]);
Testing Job バッチ
Bus
facade の assertBatched
method は、jobs の batchが dispatch されたことを assert するために使用できます。assertBatched
method に渡されるクロージャは Illuminate\Bus\PendingBatch
のインスタンスを受け取り、それを使用して batch 内の jobs を検査することができます:
use Illuminate\Bus\PendingBatch;
use Illuminate\Support\Facades\Bus;
Bus::fake();
// ...
Bus::assertBatched(function (PendingBatch $batch) {
return $batch->name == 'import-csv' &&
$batch->jobs->count() === 10;
});
指定された batches 数が dispatch されたことをassertBatchCount
method で確認することができます:
Bus::assertBatchCount(3);
assertNothingBatched
を使って、バッチが一つもディスパッチされていないことを assert することができます。
Bus::assertNothingBatched();
Testing Job / Batch インタラクション
また、たまに個々のジョブがその基になる batch とどのように相互作用するかのテストが必要になることがあります。たとえば、 job がその batch に対するさらなる processing をキャンセルしたかどうかをテストする必要があるかもしれません。これを達成するには、withFakeBatch
method を使って job に対して fake batch を割り当てる必要があります。withFakeBatch
method は、 job インスタンスと fake batch を含むタプルを返します。
[$job, $batch] = (new ShipOrder)->withFakeBatch();
$job->handle();
$this->assertTrue($batch->cancelled());
$this->assertEmpty($batch->added);
Testing Job / Queue の相互作用
時々、queue の中にある job が自身を queue に戻すことを確認する必要があるかもしれません。または、job が自身を削除することを確認する必要があるかもしれません。これらの queue とのやり取りを test するために、job をインスタンス化し、withFakeQueueInteractions
method を呼び出すことができます。
ジョブの queue インタラクションが偽装されたら、 job の上でhandle
method を呼び出すことができます。 job を呼び出した後、assetReleased
、assertDeleted
、およびassertFailed
メソッドを使用して、ジョブの queue インタラクションに対するアサーションを行うことができます:
use App\Jobs\ProcessPodcast;
$job = (new ProcessPodcast)->withFakeQueueInteractions();
$job->handle();
$job->assertReleased(delay: 30);
$job->assertDeleted();
$job->assertFailed();
Job Events
before
とafter
メソッドをQueue
facade上で使用すると、キューの job が処理される前後に実行されるコールバックを指定することができます。これらのコールバックは、ダッシュボードのための追加の logging や increment 統計を実行する絶好の機会です。通常、これらのメソッドは、service providerのboot
method から呼び出すべきです。例えば、 Laravel に含まれるAppServiceProvider
を使用することができます:
<?php
namespace App\Providers;
use Illuminate\Support\Facades\Queue;
use Illuminate\Support\ServiceProvider;
use Illuminate\Queue\Events\JobProcessed;
use Illuminate\Queue\Events\JobProcessing;
class AppServiceProvider extends ServiceProvider
{
/**
* Register any application services.
*/
public function register(): void
{
// ...
}
/**
* Bootstrap any application services.
*/
public function boot(): void
{
Queue::before(function (JobProcessing $event) {
// $event->connectionName
// $event->job
// $event->job->payload()
});
Queue::after(function (JobProcessed $event) {
// $event->connectionName
// $event->job
// $event->job->payload()
});
}
}
looping
method を Queue
facadeに使用することで、ワーカーが queue から job を取得しようとする前に実行するコールバックを指定できます。たとえば、以前に失敗した job によって開かれたままの transactions を rollback するクロージャを register することができます。
use Illuminate\Support\Facades\DB;
use Illuminate\Support\Facades\Queue;
Queue::looping(function () {
while (DB::transactionLevel() > 0) {
DB::rollBack();
}
});