Lang x Lang

Queues

Table of Contents

Introduction

あなたの web application を構築する際に、アップロードされた CSV ファイルの解析や格納など、一般的な web request の間に実行するには時間がかかりすぎるタスクがいくつかあるかもしれません。幸いなことに、 Laravel は、バックグラウンドで処理できるキューに入った jobs を簡単に作成することを可能にしてくれます。時間を要するタスクを queue に移すことで、あなたの application は驚異的な速さで web リクエストに対応し、ユーザーにより良い user 体験を提供することができます。

Laravel queues は、Amazon SQS Redis 、あるいは関係 types database など、さまざまな queue バックエンドを通じて統一されたキューイング API を提供します。

Laravel の queue 設定 options は、アプリケーションのconfig/queue.php設定ファイルに保存されます。このファイルには、フレームワークに含まれる各 queue ドライバのための connection 設定があり、 database 、Amazon SQS Redis 、およびBeanstalkd ドライバ、さらには jobs を即時に実行する同期 driver (ローカル開発中に使用)が含まれています。また、キューに追加した jobs を破棄するnull queue driver も含まれています。

NOTE

今、 Laravel は Horizon を提供しています。これは、 Redis で動作する queues に対する美しいダッシュボードと設定システムです。詳細については、Horizon documentationをご覧ください。

接続 vs. Queues

Laravel queues を始める前に、 "connections" と "queues" の違いを理解することが重要です。config/queue.php 設定ファイルには、connections 設定 array があります。このオプションは、Amazon SQS、Beanstalk、または Redis などのバックエンド queue services への接続を定義します。ただし、任意の queue connection は、異なるスタックやキューの山と考えられる複数の "queues" を持つことができます。

queue設定ファイルの各 connection 設定例には、queueの attribute が含まれていることに注意してください。これは、 jobs が特定の connection に送信されるときにディスパッチされる default queue です。つまり、どの queue にディスパッチするかを明示的に定義せずに job を dispatch すると、その job はqueueの attribute で定義された queue に配置されます、という connection の設定です:

use App\Jobs\ProcessPodcast;

// This job is sent to the default connection's default queue...
ProcessPodcast::dispatch();

// This job is sent to the default connection's "emails" queue...
ProcessPodcast::dispatch()->onQueue('emails');

一部の applications では、複数の queues に jobs をプッシュする必要が一切なく、代わりに一つのシンプルな queue を持つことを好むかもしれません。しかし、jobs を複数の queues にプッシュすることは、jobs の処理を優先順位付けしたり、分割したりしたい applications にとって特に有用で、Laravel queue ワーカーは優先順位で処理すべき queues を指定することを可能にします。例えば、highの queue に jobs をプッシュしたら、それらにより高い processing 優先順位を与えるワーカーを実行することができます:

php artisan queue:work --queue=high,default

Driver のメモと前提条件

Database

databaseの queue driver を使用するためには、jobs を保持するための database テーブルが必要です。通常、これは Laravel の default 0001_01_01_000002_create_jobs_table.phpdatabase マイグレーションに含まれています。しかし、あなたの application にこのマイグレーションが含まれていない場合は、make:queue-tableの Artisan command を使用して作成することができます。

php artisan make:queue-table

php artisan migrate

Redis

redisの queue driver を使用するためには、config/database.php 設定ファイルで Redis database connection を設定する必要があります。

WARNING

serializer および compression Redis options は、redis queue driver ではサポートされていません。

Redis クラスター

あなたの Redis queue connection が Redis クラスターを使用している場合、あなたの queue 名には key hash tag を含める必要があります。これは、指定された queue のすべての Redis keys が同じ hash スロットに配置されることを確保するために required です。

'redis' => [
    'driver' => 'redis',
    'connection' => env('REDIS_QUEUE_CONNECTION', 'default'),
    'queue' => env('REDIS_QUEUE', '{default}'),
    'retry_after' => env('REDIS_QUEUE_RETRY_AFTER', 90),
    'block_for' => null,
    'after_commit' => false,
],

Blocking

Redis queue を使用する際、block_for設定 option を使用して、driver がワーカーループを反復し、Redisdatabase を再ポーリングする前に job が利用可能になるまでどのくらい待つべきかを指定することができます。

この value を queue の負荷に基づいて調整することは、新しい jobs を探すために Redis database を継続的にポーリングするよりも効率的です。例えば、driver が job を待機する間に 5 秒間ブロックするように value を 5 に設定できます。

'redis' => [
    'driver' => 'redis',
    'connection' => env('REDIS_QUEUE_CONNECTION', 'default'),
    'queue' => env('REDIS_QUEUE', 'default'),
    'retry_after' => env('REDIS_QUEUE_RETRY_AFTER', 90),
    'block_for' => 5,
    'after_commit' => false,
],

WARNING

block_for0に設定すると、 queue の作業者は、 job が利用可能になるまで無期限にブロックされます。これにより、次の job が処理されるまで、SIGTERMなどのシグナルが処理されなくなります。

その他の Driver に必要な条件

次に示す依存関係は、リストされた queue ドライバに必要です。これらの依存関係は Composer パッケージマネージャーを介してインストールできます。

  • Amazon SQS: aws/aws-sdk-php ~3.0
  • Beanstalkd: pda/pheanstalk ~5.0
  • Redis: predis/predis ~2.0 または phpRedis PHP 拡張

Creating Jobs

Job クラスの生成

default によると、あなたの application のすべての queue 可能な jobs は app/Jobs ディレクトリに保存されます。もし app/Jobs ディレクトリが存在しない場合、make:job Artisan command を実行した時に作成されます。

php artisan make:job ProcessPodcast

生成された class は Illuminate\Contracts\Queue\ShouldQueue インターフェースを実装します。これは Laravel に対して、 job が非同期に実行するために queue にプッシュされるべきであることを示しています。

NOTE

Job スタブは スタブの公開を使用してカスタマイズすることができます。

Class の構造

Job クラスは非常にシンプルで、通常はhandle method のみを含み、このメソッドは job が queue によって処理される時に呼び出されます。始めるには、例の job class を見てみましょう。この例では、ポッドキャスト公開 service を管理していると仮定し、公開前にアップロードされたポッドキャストファイルを process する必要があるとします:

<?php

namespace App\Jobs;

use App\Models\Podcast;
use App\Services\AudioProcessor;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;

class ProcessPodcast implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    /**
     * Create a new job instance.
     */
    public function __construct(
        public Podcast $podcast,
    ) {}

    /**
     * Execute the job.
     */
    public function handle(AudioProcessor $processor): void
    {
        // Process uploaded podcast...
    }
}

この例では、Eloquent modelsを queue に入れられたジョブのコンストラクタに直接渡すことができたことに注目してください。SerializesModels という特性を使っているため、job によって、Eloquent models とそれらがロードしたリレーションシップは、job が処理される際にスムーズにシリアライズおよびアンシリアライズされます。

キューに追加された job がコンストラクタで Eloquent model を受け取る場合、queue には model の識別子だけがシリアライズされます。実際に job が処理されると、queue システムは database からフルの model インスタンスとそのロードされたリレーションシップを自動的に再取得します。この model シリアライゼーションのアプローチにより、queue driver に送信される job ペイロードが大幅に小さくなります。

handle Method Dependency Injection

handle method は、 job が queue によって処理されたときに呼び出されます。我々は、 job のhandle method に対する依存関係を型ヒントとして指定することができることに注意してください。 Laravel service containerは自動的にこれらの依存関係を注入します。

コンテナが依存関係を handle method に注入する方法を完全に制御したい場合は、コンテナの bindMethod method を使用することができます。bindMethod method は、 job とコンテナを受け取るコールバックを受け付けます。コールバック内では、handle method を自由に呼び出すことができます。通常、App\Providers\AppServiceProviderboot method からこの method を呼び出すべきです。service provider:

use App\Jobs\ProcessPodcast;
use App\Services\AudioProcessor;
use Illuminate\Contracts\Foundation\Application;

$this->app->bindMethod([ProcessPodcast::class, 'handle'], function (ProcessPodcast $job, Application $app) {
    return $job->handle($app->make(AudioProcessor::class));
});

WARNING

バイナリ data、たとえば生の image の内容は、 Binary data に待機している job に渡す前に base64_encode 関数を通すべきです。そうでなければ、その job は queue に置かれる際に JSON に適切にシリアライズされないかもしれません。

キューの関係

すべてのロードされた Eloquent model の関連性も、 job がキューに積まれるときにシリアライズされるため、シリアライズされた job string はかなり大きくなることがあります。さらに、 job が逆シリアライズされて model の関連性が database から再取得されると、それらは完全に retrieved されます。 job のキューイング process 中にシリアライズされた model に以前に適用されていた関連性の制約は、 job が逆シリアライズされるときには適用されません。したがって、特定の関連性の一部分で作業したい場合は、キューに積まれた job 内でその関連性を再制約する必要があります。

または、リレーションがシリアライズされるのを防ぐために、プロパティ value を設定する際に model にwithoutRelations method を呼び出すことができます。この method はロードされたリレーションシップなしの model のインスタンスを返します:

/**
 * Create a new job instance.
 */
public function __construct(Podcast $podcast)
{
    $this->podcast = $podcast->withoutRelations();
}

あなたが PHP コンストラクタプロパティプロモーションを使用しており、 Eloquent model がその関連性をシリアライズしないように指示したい場合、WithoutRelations attribute を使用することができます:

use Illuminate\Queue\Attributes\WithoutRelations;

/**
 * Create a new job instance.
 */
public function __construct(
    #[WithoutRelations]
    public Podcast $podcast
) {
}

もし job が single model の代わりに collection または array の Eloquent models を受け取る場合、その collection 内の models は、 job がデシリアル化され実行されたときに関係性が復元されません。これは、大量の models を扱う jobs における過度の resource 使用を防ぐためです。

Unique Jobs

WARNING

uniquejobs は、ロックをサポートする cachedrivers が必要です。現在、memcachedredisdynamodbdatabasefile及びarraycachedrivers はアトミックロックをサポートしています。また、uniquejobs の制約は batch 内の jobs には適用されません。

時々、特定の job がある時点で queue 上に一つだけ存在することを確認したい場合があります。その場合、ShouldBeUniqueインターフェースを job class に実装することにより実現できます。このインターフェースは、あなたの class に追加のメソッドを定義する必要はありません:

<?php

use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Contracts\Queue\ShouldBeUnique;

class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique
{
    ...
}

上記の例では、UpdateSearchIndexの job は unique です。したがって、job は、同じ job の別のインスタンスが既に queue にあり、処理がまだ終了していない場合には dispatch されません。

特定のケースでは、 job unique を作る特定の"key"を定義したい場合や、 job が unique のままでいられない timeout を指定したい場合があります。これを実現するために、uniqueIduniqueForのプロパティまたはメソッドを job class に定義できます:

<?php

use App\Models\Product;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Contracts\Queue\ShouldBeUnique;

class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique
{
    /**
     * The product instance.
     *
     * @var \App\Product
     */
    public $product;

    /**
     * The number of seconds after which the job's unique lock will be released.
     *
     * @var int
     */
    public $uniqueFor = 3600;

    /**
     * Get the unique ID for the job.
     */
    public function uniqueId(): string
    {
        return $this->product->id;
    }
}

上記の例では、UpdateSearchIndex job は製品 ID によって unique です。ですから、同じ製品 ID を持つ新たな job の dispatches は、既存の job の処理が完了するまで無視されます。さらに、既存の job が 1 時間以内に処理されない場合、unique ロックは解除され、同じ uniquekey を持つ別の job が queue に dispatches することが可能になります。

WARNING

あなたの application が複数の web サーバーやコンテナから jobs をディスパッチしている場合、すべてのサーバーが同じ中央の cache サーバーと通信していることを確認するべきです。そうすれば Laravel が job が unique であるかどうかを正確に判断することができます。

Jobs Unique を Processing が始まるまで保持する

default では、unique job は job が処理を終了するか、すべてのリトライ試行に失敗した後に アンロックされます。ただし、job が処理される前にすぐにアンロックしたい場合もあるかもしれません。それを実現するためには、job はShouldBeUniqueコントラクトの代わりにShouldBeUniqueUntilProcessingコントラクトを実装するべきです。

<?php

use App\Models\Product;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Contracts\Queue\ShouldBeUniqueUntilProcessing;

class UpdateSearchIndex implements ShouldQueue, ShouldBeUniqueUntilProcessing
{
    // ...
}

Unique Job ロック

裏側では、ShouldBeUniqueの job が dispatch されると、 Laravel はuniqueIdkey を用いてロックを取得しようとします。もしロックが取得できなければ、その job は dispatch されません。このロックは、job が処理を完了するか、全てのリトライの試行が失敗すると release されます。default では、Laravel はこのロックを取得するために defaultcache ドライバを使用します。しかし、ロック取得のために別の driver を使用したい場合は、使用すべき cache ドライバを返すuniqueViaの method を定義することができます:

use Illuminate\Contracts\Cache\Repository;
use Illuminate\Support\Facades\Cache;

class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique
{
    ...

    /**
     * Get the cache driver for the unique job lock.
     */
    public function uniqueVia(): Repository
    {
        return Cache::driver('redis');
    }
}

NOTE

job の同時処理数を制限するだけであれば、代わりにWithoutOverlapping ジョブ middleware を使用してください。

暗号化された Jobs

Laravel を使用すると、encryptionを介してジョブの data のプライバシーと integrity を確保できます。始めるには、単に ShouldBeEncrypted インターフェースを job class に追加します。このインターフェースが class に追加されると、 Laravel は自動的にあなたの job を暗号化し、それを queue にプッシュします:

<?php

use Illuminate\Contracts\Queue\ShouldBeEncrypted;
use Illuminate\Contracts\Queue\ShouldQueue;

class UpdateSearchIndex implements ShouldQueue, ShouldBeEncrypted
{
    // ...
}

Job Middleware

Job middleware は、キューに入れられた jobs の実行に対して custom ロジックを適用し、それ自体の jobs での定型的なコードを減らすことができます。例えば、以下のhandle method は、Laravel の Redis rate limiting features を利用して、5 秒ごとに 1 つの job だけを process させるようにします:

use Illuminate\Support\Facades\Redis;

/**
 * Execute the job.
 */
public function handle(): void
{
    Redis::throttle('key')->block(0)->allow(1)->every(5)->then(function () {
        info('Lock obtained...');

        // Handle job...
    }, function () {
        // Could not obtain lock...

        return $this->release(5);
    });
}

この code は有効ですが、handle method の実装は Redis rate limiting ロジックが散見されるために騒々しいものとなります。さらに、この rate limiting ロジックは、レート制限を行いたい他の jobs のためにも複製しなければなりません。

handle method での rate limiting の代わりに、 job middleware を定義して rate limiting を処理することができます。 Laravel には job middleware の default の位置がないため、あなたの application のどこにでも job middleware を配置しても構いません。この例では、app/Jobs/Middlewareディレクトリに middleware を配置します。

<?php

namespace App\Jobs\Middleware;

use Closure;
use Illuminate\Support\Facades\Redis;

class RateLimited
{
    /**
     * Process the queued job.
     *
     * @param  \Closure(object): void  $next
     */
    public function handle(object $job, Closure $next): void
    {
        Redis::throttle('key')
                ->block(0)->allow(1)->every(5)
                ->then(function () use ($job, $next) {
                    // Lock obtained...

                    $next($job);
                }, function () use ($job) {
                    // Could not obtain lock...

                    $job->release(5);
                });
    }
}

ご覧の通り、route middlewareと同様に、 job middleware は処理中の job と、その job の processing を続行するために呼び出される必要があるコールバックを受け取ります。

job middleware を作成した後、それらは middleware method から返すことで job に関連付けることができます。 この method は make:job Artisan command で設営した jobs には存在しないため、手動で job class に追加する必要があります。

use App\Jobs\Middleware\RateLimited;

/**
 * Get the middleware the job should pass through.
 *
 * @return array<int, object>
 */
public function middleware(): array
{
    return [new RateLimited];
}

NOTE

Job middleware は、キュー可能な event listeners 、mailables、および notifications にも割り当てることができます。

Rate Limiting

私たちはちょうどあなた自身の rate limiting job middleware の書き方をデモンストレーションしましたが、実際には Laravel はあなたが jobs のレート制限に使うことのできる rate limiting middleware を含みます。route rate limitersのように、 job rate limiters は、RateLimiter ファサードの for method を使って定義されます。

たとえば、 users が自身の data を 1 時間に 1 回バックアップできるように許可したい場合や、プレミアム顧客にはそのような制限を課さずに許可したい場合があります。これを達成するためには、AppServiceProviderの method であるbootRateLimiterを定義することができます。

use Illuminate\Cache\RateLimiting\Limit;
use Illuminate\Support\Facades\RateLimiter;

/**
 * Bootstrap any application services.
 */
public function boot(): void
{
    RateLimiter::for('backups', function (object $job) {
        return $job->user->vipCustomer()
                    ? Limit::none()
                    : Limit::perHour(1)->by($job->user->id);
    });
}

上記の例では、時間単位のレート制限を定義しましたが、perMinute method を使用して分単位でのレート制限を簡単に定義できます。さらに、レート制限のby method に任意の value を渡すことができます。ただし、この value は最もよく、顧客ごとのレート制限を区分するために使用されます:

return Limit::perMinute(50)->by($job->user->id);

あなたが一度レート制限を定義したら、Illuminate\Queue\Middleware\RateLimited middleware を使用してそのレートリミッターをあなたの job に attach することができます。 job がレート制限を超えるたびに、この middleware は job をレート制限の期間に基づいた適切な delay で queue に再び release します。

use Illuminate\Queue\Middleware\RateLimited;

/**
 * Get the middleware the job should pass through.
 *
 * @return array<int, object>
 */
public function middleware(): array
{
    return [new RateLimited('backups')];
}

レート制限された job を queue に戻すと、その job の attempts の総数は増加します。triesmaxExceptions のプロパティを jobclass に合わせて調整することをお勧めします。または、retryUntil methodを使用して、job を試行しない時間を定義することもお勧めします。

もし job がレート制限に達した際に再試行させたくない場合、dontRelease method を使用することができます。

/**
 * Get the middleware the job should pass through.
 *
 * @return array<int, object>
 */
public function middleware(): array
{
    return [(new RateLimited('backups'))->dontRelease()];
}

NOTE

Redis を使用している場合、Illuminate\Queue\Middleware\RateLimitedWithRedis middleware を使用できます。これは Redis に最適化されており、基本的な rate limiting middleware よりも効率的です。

Job の重複を防ぐ

Laravel には、任意の key に基づいて job の重複を防ぐことができるIlluminate\Queue\Middleware\WithoutOverlapping middleware が含まれています。これは、一度に一つの job だけが変更すべき“ resource を変更する待機中の“ job がある場合に役立つことがあります。

例えば、ユーザーのクレジットスコアを更新する job が queue に入っていて、同じ user ID に対するクレジットスコアの update job が重複するのを防ぎたいとします。これを達成するためには、ジョブのmiddleware method からWithoutOverlapping middleware を返せばよいです。

use Illuminate\Queue\Middleware\WithoutOverlapping;

/**
 * Get the middleware the job should pass through.
 *
 * @return array<int, object>
 */
public function middleware(): array
{
    return [new WithoutOverlapping($this->user->id)];
}

同じ type の重複する jobs は、 queue に戻されます。また、リリースされた job が再度試みられるまでの秒数も指定することができます。

/**
 * Get the middleware the job should pass through.
 *
 * @return array<int, object>
 */
public function middleware(): array
{
    return [(new WithoutOverlapping($this->order->id))->releaseAfter(60)];
}

あなたが、重複する jobs をすぐに delete して再試行されないようにしたい場合は、dontRelease method を使用することができます:

/**
 * Get the middleware the job should pass through.
 *
 * @return array<int, object>
 */
public function middleware(): array
{
    return [(new WithoutOverlapping($this->order->id))->dontRelease()];
}

WithoutOverlappingという middleware は、Laravel のアトミックロック機能によって機能します。時折、あなたの job が予期せずに失敗したりタイムアウトしたりするとロックが解除されないことがあります。したがって、expireAftermethod を使用して、明示的にロックの有効期限を設定することができます。例えば、以下の例では、Laravel に job が処理を開始してから 3 分後にWithoutOverlappingロックを解放するように指示します:

/**
 * Get the middleware the job should pass through.
 *
 * @return array<int, object>
 */
public function middleware(): array
{
    return [(new WithoutOverlapping($this->order->id))->expireAfter(180)];
}

WARNING

WithoutOverlapping middleware は、ロックをサポートする cache driver が必要です。現在、memcachedredisdynamodbdatabasefilearray の cache drivers がアトミックロックをサポートしています。

Job クラス間でのロックの Keys 共有

default では、WithoutOverlappingmiddleware は同じ class の job のみがオーバーラップしないようにします。したがって、2 つの異なる jobclass が同じロック key を使用しても、それらがオーバーラップするのを防ぐことはありません。ただし、sharedmethod を使用して、Laravel に jobclass 全体に key を適用するよう指示することができます。

use Illuminate\Queue\Middleware\WithoutOverlapping;

class ProviderIsDown
{
    // ...


    public function middleware(): array
    {
        return [
            (new WithoutOverlapping("status:{$this->provider}"))->shared(),
        ];
    }
}

class ProviderIsUp
{
    // ...


    public function middleware(): array
    {
        return [
            (new WithoutOverlapping("status:{$this->provider}"))->shared(),
        ];
    }
}

スロットリング Exceptions

Laravel には、Illuminate\Queue\Middleware\ThrottlesExceptionsという middleware が含まれており、これにより例外のスロットリングを行うことができます。一旦 job が一定の数の例外を throws すると、その後の job の実行は、指定された時間間隔が経過するまで遅延します。この middleware は、不安定なサードパーティの services と対話する job に特に有用です。

たとえば、queue に入った job が、例外を throw し始めるサードパーティーの API とやり取りするとしましょう。これらの例外をスロットル(調整)するために、あなたはThrottlesExceptions middleware をあなたのジョブのmiddleware method から返すことができます。通常、この middleware は時間ベースの試行を実装する job と組み合わせて使用すべきです。

use DateTime;
use Illuminate\Queue\Middleware\ThrottlesExceptions;

/**
 * Get the middleware the job should pass through.
 *
 * @return array<int, object>
 */
public function middleware(): array
{
    return [new ThrottlesExceptions(10, 5)];
}

/**
 * Determine the time at which the job should timeout.
 */
public function retryUntil(): DateTime
{
    return now()->addMinutes(5);
}

middleware が受け入れる最初のコンストラクタ引数は、job が throw できる exceptions の数であり、2 つめのコンストラクタ引数は、job がスロットルされた後、再試行するまでの経過すべき分数です。上記の code 例で、job が 5 分以内に 10 の exceptions を throw した場合、job を再試行するまで 5 分待つことになります。

job が例外を throws したが、例外の閾 values にはまだ達していない場合、その job は通常すぐに再試行されます。しかし、その job が遅延すべき分数を指定することができます。これは、 middleware を job に attach するときに backoff method を呼び出すことで行います。

use Illuminate\Queue\Middleware\ThrottlesExceptions;

/**
 * Get the middleware the job should pass through.
 *
 * @return array<int, object>
 */
public function middleware(): array
{
    return [(new ThrottlesExceptions(10, 5))->backoff(5)];
}

内部的に、この middleware は Laravel の cache システムを使用して rate limiting を実装し、ジョブの class 名が cache の key として使用されます。このキーは、by method を使用して middleware をあなたの job に添付するときに上書きすることができます。これは、同じサードパーティの service とやりとりをする複数の jobs を持ち、それらが一般的なスロットリング bucket を共有したい場合に役立つかもしれません。

use Illuminate\Queue\Middleware\ThrottlesExceptions;

/**
 * Get the middleware the job should pass through.
 *
 * @return array<int, object>
 */
public function middleware(): array
{
    return [(new ThrottlesExceptions(10, 10))->by('key')];
}

default では、この middleware はすべての例外をスロットルします。この挙動を変更するには、when method を用いて middleware をあなたの job に attach する際に呼び出します。その結果、when method がtrueを返す場合にのみ例外がスロットルされます。

use Illuminate\Http\Client\HttpClientException;
use Illuminate\Queue\Middleware\ThrottlesExceptions;

/**
 * Get the middleware the job should pass through.
 *
 * @return array<int, object>
 */
public function middleware(): array
{
    return [(new ThrottlesExceptions(10, 10))->when(
        fn (Throwable $throwable) => $throwable instanceof HttpClientException
    )];
}

例外( exceptions )をアプリケーションの例外( handler )に報告したい場合は、 middleware を job にアタッチする際に report method を呼び出すことで実現できます。オプションとして、report method にクロージャを提供することも可能で、その場合、指定されたクロージャが true を返すときのみ例外が報告されます:

use Illuminate\Http\Client\HttpClientException;
use Illuminate\Queue\Middleware\ThrottlesExceptions;

/**
 * Get the middleware the job should pass through.
 *
 * @return array<int, object>
 */
public function middleware(): array
{
    return [(new ThrottlesExceptions(10, 10))->report(
        fn (Throwable $throwable) => $throwable instanceof HttpClientException
    )];
}

NOTE

Redis を使用している場合、Illuminate\Queue\Middleware\ThrottlesExceptionsWithRedisという middleware を使用することができます。これは Redis に特化して調整されており、基本的な例外スロットリングの middleware よりも効率的です。

Dispatching Jobs

job class を書き終えたら、job 自身のdispatch method を使ってそれを dispatch できます。dispatch method に渡される引数は、job のコンストラクタに渡されます。

<?php

namespace App\Http\Controllers;

use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;

class PodcastController extends Controller
{
    /**
     * Store a new podcast.
     */
    public function store(Request $request): RedirectResponse
    {
        $podcast = Podcast::create(/* ... */);

        // ...

        ProcessPodcast::dispatch($podcast);

        return redirect('/podcasts');
    }
}

もし条件付きで dispatch を job に送りたい場合、dispatchIfdispatchUnlessメソッドを使用することができます:

ProcessPodcast::dispatchIf($accountActive, $podcast);

ProcessPodcast::dispatchUnless($accountSuspended, $podcast);

新しい Laravel アプリケーションでは、sync driver は default queue driver です。この driver は、現在の request のフォアグラウンドで jobs を同期的に実行するため、ローカル開発中によく便利です。実際に jobs をバックグラウンドの processing にキューに入れたい場合、アプリケーションのconfig/queue.php設定ファイル内で異なる queue driver を指定することができます。

遅延ディスパッチ

もし job がすぐに queue ワーカーによる processing 可能な状態にしないことを指定したい場合は、 job をディスパッチする際にdelay method を使用できます。例えば、ディスパッチされてから 10 分後まで job が processing 可能な状態にならないように指定しましょう:

<?php

namespace App\Http\Controllers;

use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;

class PodcastController extends Controller
{
    /**
     * Store a new podcast.
     */
    public function store(Request $request): RedirectResponse
    {
        $podcast = Podcast::create(/* ... */);

        // ...

        ProcessPodcast::dispatch($podcast)
                    ->delay(now()->addMinutes(10));

        return redirect('/podcasts');
    }
}

WARNING

Amazon SQS の queue service の最大 delay 時間は 15 分です。

ブラウザへの Response 送信後のディスパッチ

また、あなたの webserver が FastCGI を使用している場合、dispatchAfterResponsemethod は、jobs の dispatch を HTTP response が user のブラウザに送信されてから遅らせます。これにより、queue に入っている jobs がまだ実行中であっても、user が application を使用し始めることが可能になります。これは、mail の送信など、約 1 秒かかる jobs に対して通常使用されるべきです。これらは現在の HTTP request の中で処理されるため、この方法で dispatch された jobs を処理するためには、queue ワーカーが稼働している必要はありません。

use App\Jobs\SendNotification;

SendNotification::dispatchAfterResponse();

また、dispatchでクロージャーを実行し、afterResponseの method をdispatchhelper にチェーンして、 HTTP response がブラウザに送信された後にクロージャーを実行することもできます:

use App\Mail\WelcomeMessage;
use Illuminate\Support\Facades\Mail;

dispatch(function () {
    Mail::to('taylor@example.com')->send(new WelcomeMessage);
})->afterResponse();

同期ディスパッチ

すぐに(同期的に) job を dispatch したい場合は、 dispatchSync method を使用することができます。この method を使用すると、 job は queue に入れられず、現在の process 内で即座に実行されます。

<?php

namespace App\Http\Controllers;

use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;

class PodcastController extends Controller
{
    /**
     * Store a new podcast.
     */
    public function store(Request $request): RedirectResponse
    {
        $podcast = Podcast::create(/* ... */);

        // Create podcast...

        ProcessPodcast::dispatchSync($podcast);

        return redirect('/podcasts');
    }
}

Jobs & Database Transactions

database transactions 内で dispatch jobs を行うことは全く問題ありませんが、 job が実際に成功することを確実にするためには特別な注意が必要です。 transaction 内で job を発行すると、 job が親の transaction がコミットされる前にワーカーによって処理される可能性があります。これが起こると、 database トランザクション中に models または database レコードに対して行った更新は、まだ database に反映されていない可能性があります。さらに、トランザクション中に作成された models または database レコードは、 database に存在しない可能性があります。

幸いなことに、Laravel はこの問題を回避するためのいくつかの方法を提供しています。まず、after_commit 接続 option をあなたの queue 接続の設定 array に設定することができます:

'redis' => [
    'driver' => 'redis',
    // ...
    'after_commit' => true,
],

after_commit option が true の場合、databasetransactions 内で job を dispatch することができます。しかし、Laravel は、親の database transactions が実際にコミットされるまでジョブの dispatch を待つでしょう。もちろん、現在オープンしている database transactions がない場合、job はすぐに dispatch されます。

transaction が発生中の例外によりロールバックされた場合、その transaction 中に dispatch された jobs は破棄されます。

NOTE

after_commit設定オプションをtrueに設定すると、キューに入った event listeners 、メール、 notifications 、および broadcast events は、すべての開いている database transactions がコミットされた後に処理されます。

コミット Dispatch の振る舞いをインラインで指定する

after_commitの queue 接続設定 option をtrueに設定しない場合でも、特定の job がすべての開いている database transactions がコミットされた後に queue connection するように指定することができます。これを達成するために、 dispatch 操作に対してafterCommit method をチェーンさせることができます。

use App\Jobs\ProcessPodcast;

ProcessPodcast::dispatch($podcast)->afterCommit();

同様に、after_commit設定オプションがtrueに設定されている場合、特定の job が、開かれた database transactions がコミットされるのを待たずに直ちにディスパッチされるべきであることを示すことができます。

ProcessPodcast::dispatch($podcast)->beforeCommit();

Job Chaining

Job chaining は、待機中の job リストを指定して、それらを primary job が正常に実行した後に sequence で実行することを可能にします。sequence 中の 1 つの job が失敗すると、残りの job は実行されません。待機中の job チェインを実行するには、Bus facade によって提供されるchain method を使うことができます。Laravel の command バスは、待機中の job の dispatch が構築される低レベルの component です:

use App\Jobs\OptimizePodcast;
use App\Jobs\ProcessPodcast;
use App\Jobs\ReleasePodcast;
use Illuminate\Support\Facades\Bus;

Bus::chain([
    new ProcessPodcast,
    new OptimizePodcast,
    new ReleasePodcast,
])->dispatch();

job class のインスタンスを連鎖させるだけでなく、クロージャも連鎖させることができます。

Bus::chain([
    new ProcessPodcast,
    new OptimizePodcast,
    function () {
        Podcast::update(/* ... */);
    },
])->dispatch();

WARNING

Deleting jobs を$this->delete()の method で job 内から削除しても、連鎖的な jobs の処理が阻止されるわけではありません。チェーンの実行が停止するのは、チェーン内の job が失敗した場合のみです。

チェーン Connection と Queue

チェーンされた jobs に使用するべき connection と queue を指定したい場合は、onConnectionおよびonQueueメソッドを使用することができます。これらのメソッドは、 queue connection と queue の名前を指定します。これらは、キューに入れられた job が明示的に別の connection / queue が割り当てられていない unless 場合に使用するべきものです:

Bus::chain([
    new ProcessPodcast,
    new OptimizePodcast,
    new ReleasePodcast,
])->onConnection('redis')->onQueue('podcasts')->dispatch();

チェーンに Jobs を追加する

たまに、既存の job チェーンから別の job チェーンに job を追加したり、その前に追加したりする必要があります。これは、prependToChainおよびappendToChainmethod を使用して実現できます。

/**
 * Execute the job.
 */
public function handle(): void
{
    // ...

    // Prepend to the current chain, run job immediately after current job...
    $this->prependToChain(new TranscribePodcast);

    // Append to the current chain, run job at end of chain...
    $this->appendToChain(new TranscribePodcast);
}

チェーンの失敗

jobs を連鎖させる際には、チェーン内の job が失敗した場合に呼び出されるべきクロージャを指定するためにcatch method を使用することができます。与えられたコールバックは、 job の失敗を引き起こしたThrowableインスタンスを受け取ります:

use Illuminate\Support\Facades\Bus;
use Throwable;

Bus::chain([
    new ProcessPodcast,
    new OptimizePodcast,
    new ReleasePodcast,
])->catch(function (Throwable $e) {
    // A job within the chain has failed...
})->dispatch();

WARNING

チェーンコールバックはシリアル化され、後で Laravel queue によって実行されるため、チェーンコールバック内で$this変数を使用すべきではありません。

Queue と Connection のカスタマイズ

特定の Queue へのディスパッチ

onQueue method を使用して jobs を dispatch するときに、'queues'を特定するために以下の通りとなります。'jobs'を異なる'queues'に押し出すことで、queues に入れた'jobs'を"カテゴリー化"することができ、さらに各'queues'に割り当てるワーカーの数を優先順位付けすることができます。しかし、これは'jobs'をあなたの'queues'設定ファイルで定義された異なる'queues' "接続"に押し出すものではなく、単一の接続内の特定の'queues'に対してのみ押し出します。

<?php

namespace App\Http\Controllers;

use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;

class PodcastController extends Controller
{
    /**
     * Store a new podcast.
     */
    public function store(Request $request): RedirectResponse
    {
        $podcast = Podcast::create(/* ... */);

        // Create podcast...

        ProcessPodcast::dispatch($podcast)->onQueue('processing');

        return redirect('/podcasts');
    }
}

あるいは、ジョブのコンストラクタ内で onQueue method を呼び出すことにより、ジョブの queue を指定することもできます。

<?php

namespace App\Jobs;

 use Illuminate\Bus\Queueable;
 use Illuminate\Contracts\Queue\ShouldQueue;
 use Illuminate\Foundation\Bus\Dispatchable;
 use Illuminate\Queue\InteractsWithQueue;
 use Illuminate\Queue\SerializesModels;

class ProcessPodcast implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    /**
     * Create a new job instance.
     */
    public function __construct()
    {
        $this->onQueue('processing');
    }
}

特定の Connection へのディスパッチ

あなたの application が複数の queue 接続とやり取りを行う場合、onConnection method を使用して job をプッシュする connection を指定できます。

<?php

namespace App\Http\Controllers;

use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;

class PodcastController extends Controller
{
    /**
     * Store a new podcast.
     */
    public function store(Request $request): RedirectResponse
    {
        $podcast = Podcast::create(/* ... */);

        // Create podcast...

        ProcessPodcast::dispatch($podcast)->onConnection('sqs');

        return redirect('/podcasts');
    }
}

onConnectiononQueue の methods を連結して、job の コネクションと queue を指定することができます。

ProcessPodcast::dispatch($podcast)
              ->onConnection('sqs')
              ->onQueue('processing');

あるいは、ジョブのコンストラクタ内でonConnection method を呼び出すことにより、ジョブの connection を指定することもできます:

<?php

namespace App\Jobs;

 use Illuminate\Bus\Queueable;
 use Illuminate\Contracts\Queue\ShouldQueue;
 use Illuminate\Foundation\Bus\Dispatchable;
 use Illuminate\Queue\InteractsWithQueue;
 use Illuminate\Queue\SerializesModels;

class ProcessPodcast implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    /**
     * Create a new job instance.
     */
    public function __construct()
    {
        $this->onConnection('sqs');
    }
}

最大 Job の試行回数 / Timeout Values の指定

最大試行回数

あなたのキューに入っている jobs の一つが error に遭遇している場合、おそらくそれが無限に再試行することは望ましくないでしょう。そのため、 Laravel は、 job が試みられる回数や期間を指定するさまざまな方法を提供しています。

仕事の試行可能最大数を指定する一つの方法は、--triesスイッチを Artisan command ラインに使うことです。これは、処理中の仕事が試行可能な回数を指定しない限り、ワーカーによって処理されるすべての仕事に適用されます。

php artisan queue:work --tries=3

jobs が最大試行回数を超えると、"失敗した" job とみなされます。失敗した job の取り扱いについての詳細は、失敗した job のドキュメンテーションをご覧ください。--tries=0queue:work command に提供された場合、 job は無限に再試行されます。

job class 自体で job が試みることができる最大回数を定義することで、より詳細なアプローチを取ることができます。最大試行回数が job 上で指定されている場合、それは command ライン上で提供される--tries value よりも優先されます。

<?php

namespace App\Jobs;

class ProcessPodcast implements ShouldQueue
{
    /**
     * The number of times the job may be attempted.
     *
     * @var int
     */
    public $tries = 5;
}

特定の job の最大試行回数に対して動的な制御が必要な場合は、その job にtries method を定義することができます。

/**
 * Determine number of times the job may be attempted.
 */
public function tries(): int
{
    return 5;
}

時間ベースの試み

job が失敗するまでに何度試行されるかを定義する代わりに、 job が試行されなくなるべき時間を定義することもできます。これにより、特定の時間枠内であれば job を何度でも試行することができます。 job が試行されなくなるべき時間を定義するために、retryUntil method を job class に追加してください。この method はDateTimeのインスタンスを返すべきです:

use DateTime;

/**
 * Determine the time at which the job should timeout.
 */
public function retryUntil(): DateTime
{
    return now()->addMinutes(10);
}

NOTE

また、queue に入れられた event listenersに対して、triesプロパティまたはretryUntil method を定義することもできます。

Max Exceptions

時々、 job を何度も試行できるように設定したい場合がありますが、リトライが特定の数の未処理の exceptions によってトリガーされた場合には失敗すべきだと考えられます(release method によって直接リリースされるのではなく)。これを実現するために、maxExceptionsプロパティを job class に定義することができます:

<?php

namespace App\Jobs;

use Illuminate\Support\Facades\Redis;

class ProcessPodcast implements ShouldQueue
{
    /**
     * The number of times the job may be attempted.
     *
     * @var int
     */
    public $tries = 25;

    /**
     * The maximum number of unhandled exceptions to allow before failing.
     *
     * @var int
     */
    public $maxExceptions = 3;

    /**
     * Execute the job.
     */
    public function handle(): void
    {
        Redis::throttle('key')->allow(10)->every(60)->then(function () {
            // Lock obtained, process the podcast...
        }, function () {
            // Unable to obtain lock...
            return $this->release(10);
        });
    }
}

この例では、もし application が Redis ロックを取得できない場合、 job は 10 秒間リリースされ、最大 25 回まで再試行されます。ただし、 job が job によってスローされた 3 つの未処理の exceptions により失敗します。

Timeout

多くの場合、あなたは自分の queue に入れられた jobs がどのくらいの時間かかるかを概算しています。このため、 Laravel では、timeout の value を指定することができます。 default では、 timeout value は 60 秒です。 job が timeout value で指定された秒数よりも長く processing していた場合、その job を processing しているワーカーは error で終了します。通常、ワーカーはあなたの servers で設定された process マネージャーによって自動的に再起動されます。

jobs が実行できる最大秒数は、 Artisan command 行で--timeoutスイッチを使用して指定できます。

php artisan queue:work --timeout=30

job がタイムアウトを繰り返して最大試行回数を超えると、失敗とマークされます。

また、 job class 自体に job が実行を許可される最大秒数を定義することもできます。もし timeout が job 上で指定されている場合、それは command ライン上で指定された任意の timeout よりも優先されます。

<?php

namespace App\Jobs;

class ProcessPodcast implements ShouldQueue
{
    /**
     * The number of seconds the job can run before timing out.
     *
     * @var int
     */
    public $timeout = 120;
}

場合によっては、ソケットや発信用の HTTP 接続などの IO ブロッキングプロセスがあなたが指定した timeout を尊重しない場合があります。そのため、これらの features を使用するときは、常にそれらの API を使用して timeout を指定するように attempt するべきです。例えば、Guzzle を使用する場合は、常に connection と request timeout value を指定するべきです。

WARNING

pcntl PHP 拡張は、"job"のタイムアウトを指定するためにインストールされている必要があります。さらに、job の"timeout"value は常に、その"retry after"value よりも小さくなければなりません。 さもないと、"job"が実際に実行され終わったりタイムアウトする前に再試行される可能性があります。

Timeout で失敗

job を失敗とマークするべきであることを示したい場合は、タイムアウト時に$failOnTimeoutプロパティを jobclass で定義することができます。

/**
 * Indicate if the job should be marked as failed on timeout.
 *
 * @var bool
 */
public $failOnTimeout = true;

Error Handling

job が処理されている最中に例外がスローされた場合、その job は自動的に queue に戻され、再度試行することができます。 job は、あなたの application で許されている最大試行回数に達するまでリリースされ続けます。最大試行回数は、queue:work Artisan command で使用される--triesスイッチによって定義されます。また、最大試行回数は job class 自体で定義することもできます。 queue ワーカーの実行に関する詳細情報はこちらから参照できます。

手動での Job のリリース

時には、手動で release 、 job を queue に戻し、後で再試行できるようにすることが望ましいかもしれません。これは、release method を呼び出すことで実現できます。

/**
 * Execute the job.
 */
public function handle(): void
{
    // ...

    $this->release();
}

default では、release method は job を queue にすぐに処理するために戻します。しかし、整数または日付のインスタンスをrelease method に渡すことで、一定の秒数が経過するまで job を処理できるように queue が作動しないように指示することができます:

$this->release(10);

$this->release(now()->addSeconds(10));

手動で Job を失敗させる

たまに、"" job ""を失敗として手動でマークする必要があるかもしれません。その場合は、fail "" method ""を呼び出すことができます。

/**
 * Execute the job.
 */
public function handle(): void
{
    // ...

    $this->fail();
}

例外をキャッチしたために、 job を失敗としてマークしたい場合、その例外をfail method に渡すことができます。また、便宜上、 string error メッセージを渡すこともできます。これはあなたのために例外に変換されます:

$this->fail($exception);

$this->fail('Something went wrong.');

NOTE

失敗した jobs に関する詳細情報は、job の失敗に対処するためのドキュメンテーションをご覧ください。

Job Batching

Laravel の job バッチ feature を使用すると、簡単に batch の jobs を実行し、batch の jobs が実行を完了したときに何らかの action を実行できます。始める前に、完了率などの job バッチに関する meta 情報を含むテーブルを作成するための database マイグレーションを作成する必要があります。このマイグレーションは、make:queue-batches-table Artisan command を使用して生成できます。

php artisan make:queue-batches-table

php artisan migrate

バッチ可能な Jobs の定義

batch 化可能な job を定義するには、通常どおりに queue 可能な jobを作成します。ただし、Illuminate\Bus\Batchableトレイトを jobclass に追加する必要があります。このトレイトは、batchmethod へのアクセスを提供し、job が実行中の現在の batch を取得するために使用することができます:

<?php

namespace App\Jobs;

use Illuminate\Bus\Batchable;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;

class ImportCsv implements ShouldQueue
{
    use Batchable, Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    /**
     * Execute the job.
     */
    public function handle(): void
    {
        if ($this->batch()->cancelled()) {
            // Determine if the batch has been cancelled...

            return;
        }

        // Import a portion of the CSV file...
    }
}

バッチのディスパッチ

batch オブ jobs を dispatch するには、 Bus facade の batch method を使用する必要があります。もちろん、batch 処理は完了コールバックと組み合わせて使用することが主に有効です。 したがって、 batch の完了コールバックを定義するために、 thencatch、および finally method を使用することができます。 これらのコールバックは、呼び出されるときに Illuminate\Bus\Batch インスタンスを受け取ります。この例では、CSV ファイルから指定された行数を process する jobs の batch を queue イングしていると想像してみてください:

use App\Jobs\ImportCsv;
use Illuminate\Bus\Batch;
use Illuminate\Support\Facades\Bus;
use Throwable;

$batch = Bus::batch([
    new ImportCsv(1, 100),
    new ImportCsv(101, 200),
    new ImportCsv(201, 300),
    new ImportCsv(301, 400),
    new ImportCsv(401, 500),
])->before(function (Batch $batch) {
    // The batch has been created but no jobs have been added...
})->progress(function (Batch $batch) {
    // A single job has completed successfully...
})->then(function (Batch $batch) {
    // All jobs completed successfully...
})->catch(function (Batch $batch, Throwable $e) {
    // First batch job failure detected...
})->finally(function (Batch $batch) {
    // The batch has finished executing...
})->dispatch();

return $batch->id;

batch の ID は、$batch->idプロパティを介してアクセスでき、dispatch 後の batch に関する情報について、query という Laravel command バスを使用して問い合わせることができます。

WARNING

batch のコールバックはシリアル化され、後で Laravel queue によって実行されますので、コールバック内で$this変数を使用しないでください。さらに、バッチ化された jobs は database transactions 内でラップされているため、暗黙のコミットを実行する database ステートメントは jobs 内で実行されるべきではありません。

バッチの命名

バッチが名前付けられている場合、 Laravel Horizon や Laravel Telescope のようなツールは、バッチに対するよりユーザーフレンドリーな debug 情報を提供するかもしれません。任意の名前を batch に割り当てるためには、name method を batch を定義する際に呼び出すことができます。

$batch = Bus::batch([
    // ...
])->then(function (Batch $batch) {
    // All jobs completed successfully...
})->name('Import CSV')->dispatch();

Batch Connection と Queue

バッチ化された jobs で使用するべき connection と queue を指定したい場合は、onConnection メソッドと onQueue メソッドを使用することができます。すべてのバッチ化された jobs は、同じ connection と queue 内で実行する必要があります:

$batch = Bus::batch([
    // ...
])->then(function (Batch $batch) {
    // All jobs completed successfully...
})->onConnection('redis')->onQueue('imports')->dispatch();

チェーンとバッチ

chained jobs のセットを、 array に chained jobs を置くことにより batch 内で定義することができます。例えば、二つの job チェーンを平行して実行し、両方の job チェーンが processing を終了したときにコールバックを実行することができます:

use App\Jobs\ReleasePodcast;
use App\Jobs\SendPodcastReleaseNotification;
use Illuminate\Bus\Batch;
use Illuminate\Support\Facades\Bus;

Bus::batch([
    [
        new ReleasePodcast(1),
        new SendPodcastReleaseNotification(1),
    ],
    [
        new ReleasePodcast(2),
        new SendPodcastReleaseNotification(2),
    ],
])->then(function (Batch $batch) {
    // ...
})->dispatch();

逆に、chain内でバッチを定義することにより、一連の jobs を一括で実行できます。例えば、最初に複数のポッドキャストを release するための batch の jobs を実行し、その後 release notifications を送るための batch の jobs を実行できます。

use App\Jobs\FlushPodcastCache;
use App\Jobs\ReleasePodcast;
use App\Jobs\SendPodcastReleaseNotification;
use Illuminate\Support\Facades\Bus;

Bus::chain([
    new FlushPodcastCache,
    Bus::batch([
        new ReleasePodcast(1),
        new ReleasePodcast(2),
    ]),
    Bus::batch([
        new SendPodcastReleaseNotification(1),
        new SendPodcastReleaseNotification(2),
    ]),
])->dispatch();

バッチに Jobs を追加する

時には、バッチ化された job の中から追加の jobs を batch に加えることが有用な場合があります。これは、数千の jobs を batch 化する必要があり、それが web request の間に dispatch するには時間がかかりすぎる場合に役立ちます。したがって、代わりに、最初の batch を"loader"の jobs で dispatch し、その batch をさらに多くの jobs で補充することを希望するかもしれません:

$batch = Bus::batch([
    new LoadImportBatch,
    new LoadImportBatch,
    new LoadImportBatch,
])->then(function (Batch $batch) {
    // All jobs completed successfully...
})->name('Import Contacts')->dispatch();

この例では、LoadImportBatch job を使用して、追加の jobs で batch を補充します。これを達成するためには、ジョブのbatch method を介してアクセス可能な batch インスタンス上のadd method を使用することができます。

use App\Jobs\ImportContacts;
use Illuminate\Support\Collection;

/**
 * Execute the job.
 */
public function handle(): void
{
    if ($this->batch()->cancelled()) {
        return;
    }

    $this->batch()->add(Collection::times(1000, function () {
        return new ImportContacts;
    }));
}

WARNING

同じ batch に所属する job からのみ、 jobs を batch に追加することができます。

バッチの検査

Illuminate\Bus\Batchインスタンスは、 batch の完了コールバックに提供され、特定の batch の jobs を操作および検査するためのさまざまなプロパティとメソッドを備えています。

// The UUID of the batch...
$batch->id;

// The name of the batch (if applicable)...
$batch->name;

// The number of jobs assigned to the batch...
$batch->totalJobs;

// The number of jobs that have not been processed by the queue...
$batch->pendingJobs;

// The number of jobs that have failed...
$batch->failedJobs;

// The number of jobs that have been processed thus far...
$batch->processedJobs();

// The completion percentage of the batch (0-100)...
$batch->progress();

// Indicates if the batch has finished executing...
$batch->finished();

// Cancel the execution of the batch...
$batch->cancel();

// Indicates if the batch has been cancelled...
$batch->cancelled();

Routes からバッチを返す

すべての Illuminate\Bus\Batch インスタンスは JSON シリアライズ可能であり、つまり、あなたのアプリケーションの routes から直接それらを返すことができます。これにより、 batch に関する情報、特にその完了進捗の情報を含む JSON ペイロードを取得できます。これにより、アプリケーションの UI でバッチの完了進捗情報を表示することが便利になります。

ID によって batch を取得するには、Busfacade のfindBatch method を使用することができます:

use Illuminate\Support\Facades\Bus;
use Illuminate\Support\Facades\Route;

Route::get('/batch/{batchId}', function (string $batchId) {
    return Bus::findBatch($batchId);
});

バッチのキャンセル

時々、特定のバッチの実行を cancel する必要があるかもしれません。これは、Illuminate\Bus\Batchインスタンス上のcancel method を呼び出すことで達成できます:

/**
 * Execute the job.
 */
public function handle(): void
{
    if ($this->user->exceedsImportLimit()) {
        return $this->batch()->cancel();
    }

    if ($this->batch()->cancelled()) {
        return;
    }
}

前の例でお気づきかもしれませんが、batch 化された job は、その対応する batch が cancel される前に通常決定する必要があります。ただし、便宜上、SkipIfBatchCancelled middleware を job に代入することもできます。その名前が示すように、この middleware は、Laravel に対して、その対応する batch が cancel された場合は job を処理しないよう指示します。

use Illuminate\Queue\Middleware\SkipIfBatchCancelled;

/**
 * Get the middleware the job should pass through.
 */
public function middleware(): array
{
    return [new SkipIfBatchCancelled];
}

Batch 失敗

バッチ処理の job が失敗すると、catchコールバック(割り当てられている場合)が呼び出されます。このコールバックは、 batch 内で最初に失敗した job に対してのみ呼び出されます。

失敗を許可する

batch 内の job が失敗した場合、 Laravel は自動的に batch をキャンセル済にマークします。必要であれば、 job の失敗が自動的に batch をキャンセルしないように、この動作を無効にすることも可能です。これは、 batch をディスパッチしている間に、allowFailures method を呼び出すことで実現できます。

$batch = Bus::batch([
    // ...
])->then(function (Batch $batch) {
    // All jobs completed successfully...
})->allowFailures()->dispatch();

失敗した Batch Jobs の再試行

便利のために、Laravel はqueue:retry-batch Artisan command を提供しており、特定の batch の失敗した jobs すべてを簡単に retry することができます。 queue:retry-batch command は、retry するための失敗した jobs を持つ batch の UUID を受け付けます。

php artisan queue:retry-batch 32dbc76c-4f82-4749-b610-a639fe0099b5

バッチの剪定

job_batchesテーブルが非常に迅速にレコードを蓄積することがあります。これを軽減するためには、queue:prune-batchesの Artisan command をscheduleして、毎日実行すべきです。

use Illuminate\Support\Facades\Schedule;

Schedule::command('queue:prune-batches')->daily();

default では、24 時間以上前に終了したすべての batches が整理されます。 hours option を使って、command を呼び出す際に batchesdata を保持する時間を決定することができます。例えば、次の command は、48 時間前に終了したすべての batches を削除します:

use Illuminate\Support\Facades\Schedule;

Schedule::command('queue:prune-batches --hours=48')->daily();

時折、あなたの jobs_batches テーブルは、成功して完了しなかったバッチの batch レコードを蓄積する可能性があります。例えば、 job が失敗し、その job が成功するまで再試行されなかったバッチなどです。これら未完の batch レコードを削除するために、あなたはqueue:prune-batches command にunfinishedオプションを使用して指示することができます:

use Illuminate\Support\Facades\Schedule;

Schedule::command('queue:prune-batches --hours=48 --unfinished=72')->daily();

同様に、あなたの jobs_batches テーブルはキャンセルされたバッチの batch レコードも蓄積するかもしれません。 queue:prune-batches command を指示して、cancelled オプションを使ってこれらのキャンセルされた batch レコードを削除することができます:

use Illuminate\Support\Facades\Schedule;

Schedule::command('queue:prune-batches --hours=48 --cancelled=72')->daily();

バッチを DynamoDB に保存する

Laravel は、リレーショナルな database の代わりに、DynamoDB に batch meta 情報を格納するサポートも提供しています。ただし、 batch レコードをすべて格納するための DynamoDB テーブルを手動で作成する必要があります。

通常、このテーブルはjob_batchesと名付けるべきですが、application のqueue設定ファイル内のqueue.batching.table設定の value に基づいてテーブルに名前をつけるべきです。

DynamoDB Batch テーブル設定

job_batchesテーブルは、applicationという名前の string primary パーティションキーと、idという名前の string primary ソートキーを持つべきです。キーのapplication部分には、アプリケーションのapp設定ファイル内のname設定 value で定義されたアプリケーションの名前が含まれるでしょう。 application 名は DynamoDB テーブルのキーの一部であるため、同じテーブルを使用して複数の Laravel アプリケーションの job バッチを保存することができます。

さらに、自動的な batch 剪定を活用したい場合は、テーブルに対してttl attribute を定義することもできます。

DynamoDB の設定

次に、あなたの Laravel application が Amazon の DynamoDB と通信できるように、AWS SDK をインストールします。

composer require aws/aws-sdk-php

次に、queue.batching.driver設定オプションの value をdynamodbに設定します。さらに、keysecret、およびregion設定の options をbatching設定の array 内に定義する必要があります。これらの options は、AWS との認証に使用されます。dynamodb driver を使用する場合、queue.batching.database設定オプションは不要です。

'batching' => [
    'driver' => env('QUEUE_FAILED_DRIVER', 'dynamodb'),
    'key' => env('AWS_ACCESS_KEY_ID'),
    'secret' => env('AWS_SECRET_ACCESS_KEY'),
    'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),
    'table' => 'job_batches',
],

DynamoDB でのバッチの剪定

DynamoDB を使用して job batch 情報を保存する場合、リレーショナル database に保存された batches を剪定するために使用される一般的な剪定 commands は機能しません。代わりに、DynamoDB のネイティブ TTL 機能 を利用して、古い batches のレコードを自動的に削除することができます。

DynamoDB テーブルを ttl attribute で定義した場合、batch レコードを削除する方法を Laravel に指示するための設定パラメータを定義できます。queue.batching.ttl_attribute 設定 value は TTL を保持する attribute の名前を定義し、queue.batching.ttl 設定 value はレコードが更新されてからどのくらいの時間が経過した後に batch レコードを DynamoDB テーブルから削除できるかを定義します。

'batching' => [
    'driver' => env('QUEUE_FAILED_DRIVER', 'dynamodb'),
    'key' => env('AWS_ACCESS_KEY_ID'),
    'secret' => env('AWS_SECRET_ACCESS_KEY'),
    'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),
    'table' => 'job_batches',
    'ttl_attribute' => 'ttl',
    'ttl' => 60 * 60 * 24 * 7, // 7 days...
],

Queueing Closures

job class を queue に dispatch する代わりに、クロージャを dispatch することもできます。これは、現在の request サイクルの外部で実行する必要がある速やかで単純なタスクに最適です。クロージャを queue に dispatch する際、クロージャの code content は、転送中に変更されないように暗号化されて署名されます。

$podcast = App\Podcast::find(1);

dispatch(function () use ($podcast) {
    $podcast->publish();
});

catch method を使用すると、queue の設定された retry の試みをすべて使い果たした後に、queue のクロージャが正常に完了しなかった場合に実行されるべきクロージャを提供できます。

use Throwable;

dispatch(function () use ($podcast) {
    $podcast->publish();
})->catch(function (Throwable $e) {
    // This job has failed...
});

WARNING

catchのコールバックはシリアル化され、後で Laravel queue によって実行されるため、catchコールバック内で$this変数を使用すべきではありません。

Running the Queue Worker

queue:work Command

Laravel には Artisan command が含まれており、queue 上に新しい jobs が追加されると、queue の作業者を起動して新しい jobs を process します。queue:work Artisan command を使用して作業者を実行することができます。queue:work command を開始すると、手動で停止するかターミナルを閉じるまで実行し続けることに注意してください。

php artisan queue:work

NOTE

queue:work process をバックグラウンドで常に実行し続けるためには、 process 監視ツールであるSupervisorのようなものを使用して、 queue ワーカーが実行を停止しないようにするべきです。

コマンドの output に処理済みの job ID を含めたい場合、queue:work command を呼び出す際に、-v フラグを含めることができます。

php artisan queue:work -v

覚えておいてください、 queue のワーカーは長時間生き続けるプロセスで、起動した application state をメモリに保存します。したがって、ワーカーが起動した後では、 code ベースの変更は認識しないでしょう。したがって、 deployment process の間には、必ずqueue のワーカーを再起動してください。さらに、あなたの application が作成または変更した任意の静的な state は、 jobs の間に自動的に reset されないことを覚えておいてください。

あるいは、queue:listenの command を実行することもできます。queue:listenの command を使用すると、更新されたコードをリロードしたり、application の状態を reset したいときに、手動でワーカーを再起動する必要はありません。しかし、この command はqueue:workの command よりも効率が著しく低いです。

php artisan queue:listen

複数の Queue ワーカーの実行

複数のワーカーを queue に割り当てて process jobs concurrently するためには、単純に複数のqueue:workプロセスを開始するだけです。これは、ターミナルの複数のタブをローカルで使用するか、または production で process マネージャーの設定を使用して行うことができます。Supervisor を使用する場合numprocs設定の value を使用することができます。

Connection と Queue を指定する

また、ワーカーが使用するべき queue connection を指定することもできます。work command に渡された connection 名は、config/queue.php設定ファイルで定義された接続のいずれかに対応するべきです:

php artisan queue:work redis

default として、queue:work command は、特定の接続上の defaultqueue のみの jobs を処理します。ただし、特定の接続のための特定の queue のみを処理することにより、queue ワーカーをさらにカスタマイズすることができます。例えば、すべての mail が、あなたのredis queue 接続上のemails queue で処理される場合、次の command を発行して、その queue のみを処理するワーカーを開始することができます:

php artisan queue:work redis --queue=emails

Processing という特定の数の Jobs の処理

--onceオプションは、ワーカーに対して queue から single job を process するよう指示するために使用できます:

php artisan queue:work --once

--max-jobsoption は、指定された数の jobs を処理した後にワーカーに終了を指示するために使用できます。この option はSupervisorと組み合わせることで、ワーカーが特定の数の jobs の処理を完了した後に自動的に再開され、蓄積されたメモリを解放する際に役立つかもしれません。

php artisan queue:work --max-jobs=1000

Processing すべてのキューに入れられた Jobs そしてその後終了

--stop-when-empty option は、全ての jobs を process した後にワーカーに対して正常終了するよう指示するために使用できます。Docker コンテナ内で Laravel queues を processing していて、 queue が空になった後にコンテナをシャットダウンしたい場合に便利な option です:

php artisan queue:work --stop-when-empty

特定の秒数での Processing Jobs

--max-timeオプションは、指定された秒数だけ processing jobs を行い、その後で終了するようにワーカーに指示するために使用できます。このオプションは、ワーカーが指定された時間だけ process jobs を行った後に自動的に再起動され、蓄積したメモリが解放されるように、Supervisorと組み合わせて使用すると便利です。

# Process jobs for one hour and then exit...
php artisan queue:work --max-time=3600

ワーカーの Sleep 時間

jobs が queue に利用可能な時、ワーカーは jobs の processing jobs を delay なしに続けます。ただし、sleepオプションは、利用可能な jobs がない場合にワーカーが眠る秒数を決定します。もちろん、眠っている間、ワーカーは新しい jobs を process しません:

php artisan queue:work --sleep=3

Maintenance Mode and Queues

あなたの application がmaintenance modeにある間、キューに入った jobs は処理されません。 jobs は、 application がメンテナンスモードから戻った後、通常どおりに処理されます。

--forceオプションを使用して、メンテナンスモードが有効でも queue のワーカーに process jobs させることができます。

php artisan queue:work --force

Resource の考慮事項

Daemon queue ワーカーは、各 job を processing する前にフレームワークを "reboot"しません。したがって、各 job が終了した後は、重い resources を release する必要があります。例えば、GD library を用いて image 操作を行っている場合は、 image の processing が終了したら imagedestroy でメモリを解放すべきです。

Queue の優先度

時折、どのように queues が処理されるかを優先したい場合があります。例えば、config/queue.phpの設定ファイルで、redisの connection に対する default のqueuelowに設定することができます。しかし、たまには、以下のようにhighの優先度を持つ queue に job をプッシュしたいこともあるでしょう。

dispatch((new Job)->onQueue('high'));

high queue jobs がすべて処理されてから、 low queue の jobs に進むことを確認するワーカーを開始するには、 work command にカンマで区切られた queue の名前のリストを渡します。

php artisan queue:work --queue=high,low

Queue ワーカーと Deployment

queue ワーカーは長寿命なプロセスなので、再起動されるまであなたの code の変更に気づかないでしょう。よって、 queue ワーカーを使って application をデプロイする最もシンプルな方法は、 deployment process の間にワーカーを再起動することです。 queue:restart command を発行することで、すべてのワーカーを優雅に再起動することができます。

php artisan queue:restart

この command は、すべての queue ワーカーに対して、現在の job の processing が終了したら、既存の jobs が失われないように、優雅に終了するように指示します。 queue ワーカーは queue:restart command が実行されると終了するので、 queue ワーカーを自動的に再起動するために、 Supervisor のような process マネージャーを実行している必要があります。

NOTE

queue は再起動シグナルを保存するためにcacheを使用します。したがって、この feature を使用する前に、 application の cache driver が適切に設定されていることを確認するべきです。

Job の有効期限とタイムアウト

Job の有効期限

config/queue.php 設定ファイルでは、各 queue connection が retry_after オプションを定義しています。このオプションは、queue connection が job の再試行を行う前にどのくらいの秒数待機するかを指定します。例えば、retry_after の value が 90 に設定されている場合、job が 90 秒間処理され、リリースまたは削除されなかった場合、queue に再度リリースされます。通常、retry_after の value を、jobs が完了するのに合理的にかかる最大秒数に設定する必要があります。

WARNING

retry_after の value を持たない唯一の queue 接続は Amazon SQS です。SQS は、AWS の console 内で管理されているDefault の視認性タイムアウト に基づいて job を再試行します。

ワーカータイムアウト

queue:workの Artisan command には--timeoutoption があります。default では、--timeoutの value は 60 秒です。もし job の処理がタイムアウト値で指定された秒数より長く続く場合、その job を処理しているワーカーは error で終了します。通常、ワーカーはあなたの servers に設定された process マネージャーによって自動的に再起動されます。

php artisan queue:work --timeout=60

retry_after設定オプションと--timeout CLI オプションは異なりますが、一緒に動作して jobs が失われないことと、 jobs が一度だけ正常に処理されることを確保します。

WARNING

--timeoutの value は、常にretry_afterの設定 value よりも少なくとも数秒短くなければなりません。これにより、固まった job を processing しているワーカーが、 job が再試行される前に常に終了することが保証されます。もし--timeoutオプションがretry_afterの設定 value よりも長い場合、 jobs が二度処理される可能性があります。

Supervisor Configuration

production では、queue:work timeout を稼働し続ける方法が必要です。queue:work process は、ワーカータイムアウト を超過したり、queue:restart command の実行など、様々な理由で稼働を停止することがあります。

このため、あなたの queue:work プロセスが終了したときに検出し、自動的に再起動できる process モニターを設定する必要があります。さらに、 process モニターは、あなたが concurrently で実行したい queue:work プロセスの数を指定できます。Supervisor は、Linux environments で一般的に使用される process モニターであり、次のドキュメントでその設定方法について説明します。

Installing Supervisor

Supervisor は Linux オペレーティングシステムの process 監視ツールであり、queue:workプロセスが失敗した場合は自動的に再起動します。Ubuntu に Supervisor をインストールするには、次の command を使用することができます:

sudo apt-get install supervisor

NOTE

Supervisor 自体の設定と管理が厳しそうだと思われる方は、Laravel Forge の使用を検討してみてください。これにより、 production Laravel のプロジェクトに対して Supervisor が自動的にインストールおよび設定されます。

Supervisor の設定

Supervisor の構成ファイルは通常、/etc/supervisor/conf.dディレクトリに保存されます。このディレクトリ内では、プロセスがどのように監視されるべきかを Supervisor に指示する任意の数の設定ファイルを作成することが可能です。例えば、queue:workプロセスを起動し監視するlaravel-worker.confファイルを作成してみましょう:

[program:laravel-worker]
process_name=%(program_name)s_%(process_num)02d
command=php /home/forge/app.com/artisan queue:work sqs --sleep=3 --tries=3 --max-time=3600
autostart=true
autorestart=true
stopasgroup=true
killasgroup=true
user=forge
numprocs=8
redirect_stderr=true
stdout_logfile=/home/forge/app.com/worker.log
stopwaitsecs=3600

この例では、numprocsディレクティブは Supervisor に対し、8 つのqueue:workプロセスを実行し、それらすべてを監視するよう指示します。それらが失敗した場合、自動的に再起動します。設定のcommandディレクティブは、あなたが望む queue connection とワーカーの options を反映するように変更すべきです。

WARNING

stopwaitsecs の value が最長の job の実行にかかる秒数よりも大きいことを確認してください。そうでない場合、Supervisor は job が processing を終える前に殺してしまうかもしれません。

Starting Supervisor

設定ファイルが作成されたら、次のコマンドを使用して Supervisor の設定を update し、プロセスを起動することができます:

sudo supervisorctl reread

sudo supervisorctl update

sudo supervisorctl start "laravel-worker:*"

詳しくは、Supervisor のドキュメント をご覧ください。

Dealing With Failed Jobs

時折、キューに入れた jobs が失敗します。心配しないでください、物事が常に計画通りに進むわけではありません! Laravel には、job が試行される最大回数を指定する 便利な方法が含まれています。非同期の job がこの試行回数を超えると、それは failed_jobs database テーブルに挿入されます。同期的にディスパッチされた jobsは、失敗してもこのテーブルには保存されず、その exceptions はすぐに application によって処理されます。

failed_jobsテーブルを作成する migration は、新規の Laravel アプリケーションには通常すでに存在します。しかし、もし、あなたの application がこのテーブル用の migration を含んでいない場合、make:queue-failed-table command を使用して migration を作成することができます:

php artisan make:queue-failed-table

php artisan migrate

queue worker process を実行する際に、 --tries スイッチを使用して、 job が試行される最大回数を指定することができます。 queue:work command の --tries オプションに value を指定しない場合、 jobs は一回だけ試行されるか、 job クラスの $tries プロパティで指定された回数だけ試行されます。

php artisan queue:work redis --tries=3

--backoffoption を使用すると、 Laravel が例外に遭遇した job を再試行するまで何秒待つべきかを指定できます。 default では、 job はすぐに queue に戻されて再試行できるようになります:

php artisan queue:work redis --tries=3 --backoff=3

Laravel が例外に遭遇した job を再試行するまでに待つべき秒数をジョブごとに設定したい場合は、backoffプロパティを job class に定義することで行うことができます:

/**
 * The number of seconds to wait before retrying the job.
 *
 * @var int
 */
public $backoff = 3;

もし、ジョブの backoff 時間を決定するためのより複雑なロジックが必要な場合、 job class にbackoff method を定義することができます。

/**
* Calculate the number of seconds to wait before retrying the job.
*/
public function backoff(): int
{
    return 3;
}

backoffmethod からの array を返すことで、"指数関数的"なバックオフを簡単に設定できます。この例では、最初のリトライのためのリトライ遅延は 1 秒、2 回目のリトライは 5 秒、3 回目のリトライは 10 秒、それ以降の残りの試行回数がある場合のすべてのリトライは 10 秒となります。

/**
* Calculate the number of seconds to wait before retrying the job.
*
* @return array<int, int>
*/
public function backoff(): array
{
    return [1, 5, 10];
}

失敗した Jobs の後始末

特定の job が失敗した場合、 users に alert を送信したり、 job によって部分的に完了したアクションを取り消すことを望むかもしれません。これを達成するために、 job class 上に failed method を定義することができます。 job を失敗させた原因となる Throwable インスタンスが failed method に渡されます:

<?php

namespace App\Jobs;

use App\Models\Podcast;
use App\Services\AudioProcessor;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use Throwable;

class ProcessPodcast implements ShouldQueue
{
    use InteractsWithQueue, Queueable, SerializesModels;

    /**
     * Create a new job instance.
     */
    public function __construct(
        public Podcast $podcast,
    ) {}

    /**
     * Execute the job.
     */
    public function handle(AudioProcessor $processor): void
    {
        // Process uploaded podcast...
    }

    /**
     * Handle a job failure.
     */
    public function failed(?Throwable $exception): void
    {
        // Send user notification of failure, etc...
    }
}

WARNING

failedメソッドを呼び出す前に、新しい job のインスタンスが生成されます。したがって、handle method 内で発生したかもしれない class プロパティの変更は失われます。

失敗した Jobs の再試行

failed_jobs database テーブルに挿入されたすべての失敗した jobs を表示するには、queue:failed Artisan command を使用できます。

php artisan queue:failed

queue:failed command は、job の ID、接続、queue、失敗時刻、及びその他の情報をリストします。job の ID は、失敗した job を再試行するために使用することができます。例えば、ID がce7bb17c-cdd8-41f0-a8ec-7b4fef4e5eceの失敗した job を再試行するには、次の command を入力します。

php artisan queue:retry ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece

必要に応じて、複数の ID を command に渡すことができます:

php artisan queue:retry ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece 91401d2c-0784-4f43-824c-34f94a33c24d

また、特定の queue に対して失敗したすべての jobs を retry することもできます:

php artisan queue:retry --queue=name

すべての失敗した jobs を retry するには、queue:retry command を実行し、ID としてallを渡します:

php artisan queue:retry all

失敗した job を delete したい場合は、queue:forgetという command を使用できます:

php artisan queue:forget 91401d2c-0784-4f43-824c-34f94a33c24d

NOTE

Horizonを使用するとき、失敗した job を delete するためには、queue:forget command の代わりにhorizon:forget command を使うべきです。

failed_jobsテーブルからすべての失敗した jobs を delete するために、queue:flush command を使用することができます:

php artisan queue:flush

不足している Models を無視する

Eloquent model を job に注入すると、 model は自動的にシリアル化されて queue に置かれ、 job が処理されるときに database から再取得されます。しかし、 job がワーカーによって処理を待っている間に model が削除されていた場合、 job は ModelNotFoundException で失敗する可能性があります。

便宜上、ジョブの deleteWhenMissingModels プロパティを true に設定して、欠落している models を伴う job を自動的に 削除することを選択することができます。このプロパティが true に設定されている場合、Laravel は例外を発生させずに job を静かに破棄します。

/**
 * Delete the job if its models no longer exist.
 *
 * @var bool
 */
public $deleteWhenMissingModels = true;

Jobs の剪定に失敗しました

failed_jobs テーブルのレコードを剪定するために、queue:prune-failed Artisan command を呼び出すことができます:

php artisan queue:prune-failed

default によれば、24 時間以上前のすべての失敗した job 記録は剪定されます。もし--hoursoption を command に与えると、過去 N 時間以内に挿入された失敗した job 記録のみが保持されます。例えば、以下の command は 48 時間以上前に挿入されたすべての失敗した job 記録を削除します:

php artisan queue:prune-failed --hours=48

失敗した Jobs を DynamoDB に保存する

Laravel は、失敗した job の記録をリレーショナル database のテーブルではなく、DynamoDB に保存するサポートも提供しています。ただし、失敗した job の記録をすべて保存するための DynamoDB のテーブルは手動で作成する必要があります。通常、このテーブルは failed_jobs と名付けるべきですが、application の queue 設定ファイル内の queue.failed.table 設定の value に基づいてテーブルの名前を付けるべきです。

failed_jobsテーブルは、applicationという名前の string primary パーティションキーと、uuidという名前の string primary ソートキーを持つべきです。キーのapplication部分には、アプリケーションのapp設定ファイル内のname設定 value によって定義されるアプリケーションの名前が含まれます。 application 名は DynamoDB テーブルのキーの一部であるため、複数の Laravel アプリケーションの失敗した jobs を同じテーブルに保存することができます。

また、AWS SDK をインストールして、あなたの Laravel application が Amazon の DynamoDB と通信できるように確認してください:

composer require aws/aws-sdk-php

次に、queue.failed.driverの設定オプションの value をdynamodbに設定します。さらに、失敗した job の設定 array 内に、keysecret、およびregionの設定 options を定義する必要があります。これらの options は、AWS への認証に使用されます。dynamodbの driver を使用する場合、queue.failed.databaseの設定オプションは不要です:

'failed' => [
    'driver' => env('QUEUE_FAILED_DRIVER', 'dynamodb'),
    'key' => env('AWS_ACCESS_KEY_ID'),
    'secret' => env('AWS_SECRET_ACCESS_KEY'),
    'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),
    'table' => 'failed_jobs',
],

失敗した Job Storage の無効化

queue.failed.driver設定オプションの value をnullに設定することで、 Laravel に失敗した jobs を保存せずに破棄するよう指示できます。通常、これはQUEUE_FAILED_DRIVER environment 変数を通じて達成できます:

QUEUE_FAILED_DRIVER=null

失敗した Job Events

job が失敗した時に呼び出される event リスナーを register したい場合は、Queueファサードのfailing method を使用できます。例えば、 Laravel に含まれているAppServiceProviderboot method からこの event にクロージャを attach することができます。

<?php

namespace App\Providers;

use Illuminate\Support\Facades\Queue;
use Illuminate\Support\ServiceProvider;
use Illuminate\Queue\Events\JobFailed;

class AppServiceProvider extends ServiceProvider
{
    /**
     * Register any application services.
     */
    public function register(): void
    {
        // ...
    }

    /**
     * Bootstrap any application services.
     */
    public function boot(): void
    {
        Queue::failing(function (JobFailed $event) {
            // $event->connectionName
            // $event->job
            // $event->exception
        });
    }
}

Clearing Jobs From Queues

NOTE

Horizon を使用するときは、jobs を queue からクリアするために queue:clear command の代わりに horizon:clear command を使用すべきです。

もし全ての jobs を default connection の default queue から delete したい場合、queue:clearの Artisan command を使用して行うことができます:

php artisan queue:clear

また、特定の connection と queue から delete jobs を削除するために、connection引数と queueオプションを提供することもできます:

php artisan queue:clear redis --queue=emails

WARNING

SQS、 Redis 、および database queue ドライバーのみが queues からの jobs の削除を利用できます。さらに、SQS メッセージ削除 process は最大 60 秒かかるため、 queue をクリアした後の 60 秒以内に SQS queue に送信された jobs も削除される可能性があります。

Monitoring Your Queues

あなたの queue が急激に queue job を受け取ると、その処理能力を超えてしまい、 jobs の完了までの待ち時間が長くなる可能性があります。もしよければ、 jobs が指定した閾値を超えたときに、 Laravel を使って alert を出すことができます。

始めるには、queue:monitorの command を毎分実行するように schedule する必要があります。 command は、監視したい queues の名前と、希望する job のカウントのしきい values を受け入れます:

php artisan queue:monitor redis:default,redis:deployments --max=100

この command をスケジュールするだけでは、キューが溢れた status を示す notification をトリガーするには十分ではありません。 command が job の数があなたの閾値を超えている queue に遭遇したとき、Illuminate\Queue\Events\QueueBusy event がディスパッチされます。この event は、アプリケーションの AppServiceProvider でリッスンして、あなたやあなたの開発チームに notification を送ることができます。

use App\Notifications\QueueHasLongWaitTime;
use Illuminate\Queue\Events\QueueBusy;
use Illuminate\Support\Facades\Event;
use Illuminate\Support\Facades\Notification;

/**
 * Bootstrap any application services.
 */
public function boot(): void
{
    Event::listen(function (QueueBusy $event) {
        Notification::route('mail', 'dev@example.com')
                ->notify(new QueueHasLongWaitTime(
                    $event->connection,
                    $event->queue,
                    $event->size
                ));
    });
}

Testing

job を dispatches する testing コードを testing する際には、job のコードを直接、そしてそれを dispatches するコードから切り離して testing することができるため、Laravel に対して実際に job を実行しないよう指示することが望ましいかもしれません。もちろん、job そのものを testing するためには、job のインスタンスを生成し、testing でhandle method を直接呼び出すこともできます。

Queuefacade のfakemethod を使用して、キューに実際にプッシュされる jobs を防ぐことができます。Queuefacade のfakemethod を呼び出した後、application が jobs を queue にプッシュしようとしたことを assert することができます:

<?php

use App\Jobs\AnotherJob;
use App\Jobs\FinalJob;
use App\Jobs\ShipOrder;
use Illuminate\Support\Facades\Queue;

test('orders can be shipped', function () {
    Queue::fake();

    // Perform order shipping...

    // Assert that no jobs were pushed...
    Queue::assertNothingPushed();

    // Assert a job was pushed to a given queue...
    Queue::assertPushedOn('queue-name', ShipOrder::class);

    // Assert a job was pushed twice...
    Queue::assertPushed(ShipOrder::class, 2);

    // Assert a job was not pushed...
    Queue::assertNotPushed(AnotherJob::class);

    // Assert that a Closure was pushed to the queue...
    Queue::assertClosurePushed();

    // Assert the total number of jobs that were pushed...
    Queue::assertCount(3);
});
<?php

namespace Tests\Feature;

use App\Jobs\AnotherJob;
use App\Jobs\FinalJob;
use App\Jobs\ShipOrder;
use Illuminate\Support\Facades\Queue;
use Tests\TestCase;

class ExampleTest extends TestCase
{
    public function test_orders_can_be_shipped(): void
    {
        Queue::fake();

        // Perform order shipping...

        // Assert that no jobs were pushed...
        Queue::assertNothingPushed();

        // Assert a job was pushed to a given queue...
        Queue::assertPushedOn('queue-name', ShipOrder::class);

        // Assert a job was pushed twice...
        Queue::assertPushed(ShipOrder::class, 2);

        // Assert a job was not pushed...
        Queue::assertNotPushed(AnotherJob::class);

        // Assert that a Closure was pushed to the queue...
        Queue::assertClosurePushed();

        // Assert the total number of jobs that were pushed...
        Queue::assertCount(3);
    }
}

assertPushedまたはassertNotPushedmethods にクロージャを渡すことで、特定の"真実 test"に合格する"job"がプッシュされたことを"主張"できます。少なくとも 1 つの"job"が指定された真実 test に合格してプッシュされた場合、その主張は成功となります:

Queue::assertPushed(function (ShipOrder $job) use ($order) {
    return $job->order->id === $order->id;
});

Jobs の一部を偽装する

他の jobs を通常通り実行させつつ、特定の jobs だけを faking する必要がある場合、faking すべき jobs の class 名をfake method に渡すことができます:

test('orders can be shipped', function () {
    Queue::fake([
        ShipOrder::class,
    ]);

    // Perform order shipping...

    // Assert a job was pushed twice...
    Queue::assertPushed(ShipOrder::class, 2);
});
public function test_orders_can_be_shipped(): void
{
    Queue::fake([
        ShipOrder::class,
    ]);

    // Perform order shipping...

    // Assert a job was pushed twice...
    Queue::assertPushed(ShipOrder::class, 2);
}

あなたはexceptの method を使用して、特定のセットの jobs を除いて、すべての jobs を fake することができます。

Queue::fake()->except([
    ShipOrder::class,
]);

Testing Job チェーン

job チェーンを test するには、Bus facade のフェイキング機能を利用する必要があります。Bus facade の assertChained method を使用して、jobs のチェーンが dispatch されたことを assert することができます。assertChained method は、その最初の引数としてチェーンされた jobs の array を受け入れます:

use App\Jobs\RecordShipment;
use App\Jobs\ShipOrder;
use App\Jobs\UpdateInventory;
use Illuminate\Support\Facades\Bus;

Bus::fake();

// ...

Bus::assertChained([
    ShipOrder::class,
    RecordShipment::class,
    UpdateInventory::class
]);

上記の例で見ることができるように、連鎖した jobs の array は、ジョブの class 名の array であるかもしれません。しかし、実際の job インスタンスの array を提供することも可能です。 Laravel がその場合には、提供された job インスタンスが同じ class であり、連鎖した jobs があなたの application によって発行されたときと同じプロパティ values を持っていることを確認します。

Bus::assertChained([
    new ShipOrder,
    new RecordShipment,
    new UpdateInventory,
]);

assertDispatchedWithoutChainの method を使用して、チェーンなしの job がプッシュされたことを assert することができます。

Bus::assertDispatchedWithoutChain(ShipOrder::class);

Testing チェーンの変更

もし連鎖された job が既存のチェーンに jobs を先頭または末尾に追加する場合、その job が残りの jobs の期待されるチェーンを持っていることを assert するために job のassertHasChainmethod を使用することができます:

$job = new ProcessPodcast;

$job->handle();

$job->assertHasChain([
    new TranscribePodcast,
    new OptimizePodcast,
    new ReleasePodcast,
]);

assertDoesntHaveChain method は、job の残りのチェーンが空であることを主張するために使用できます。

$job->assertDoesntHaveChain();

Testing チェーン化されたバッチ

もし、あなたの job チェーンがbatch の jobs を含んでいる場合、チェーンアサーション内にBus::chainedBatchの定義を挿入することで、連鎖した batch があなたの期待通りであると assert することができます。

use App\Jobs\ShipOrder;
use App\Jobs\UpdateInventory;
use Illuminate\Bus\PendingBatch;
use Illuminate\Support\Facades\Bus;

Bus::assertChained([
    new ShipOrder,
    Bus::chainedBatch(function (PendingBatch $batch) {
        return $batch->jobs->count() === 3;
    }),
    new UpdateInventory,
]);

Testing Job バッチ

Bus facade の assertBatched method は、jobs の batchが dispatch されたことを assert するために使用できます。assertBatched method に渡されるクロージャは Illuminate\Bus\PendingBatch のインスタンスを受け取り、それを使用して batch 内の jobs を検査することができます:

use Illuminate\Bus\PendingBatch;
use Illuminate\Support\Facades\Bus;

Bus::fake();

// ...

Bus::assertBatched(function (PendingBatch $batch) {
    return $batch->name == 'import-csv' &&
           $batch->jobs->count() === 10;
});

指定された batches 数が dispatch されたことをassertBatchCountmethod で確認することができます:

Bus::assertBatchCount(3);

assertNothingBatched を使って、バッチが一つもディスパッチされていないことを assert することができます。

Bus::assertNothingBatched();

Testing Job / Batch インタラクション

また、たまに個々のジョブがその基になる batch とどのように相互作用するかのテストが必要になることがあります。たとえば、 job がその batch に対するさらなる processing をキャンセルしたかどうかをテストする必要があるかもしれません。これを達成するには、withFakeBatch method を使って job に対して fake batch を割り当てる必要があります。withFakeBatch method は、 job インスタンスと fake batch を含むタプルを返します。

[$job, $batch] = (new ShipOrder)->withFakeBatch();

$job->handle();

$this->assertTrue($batch->cancelled());
$this->assertEmpty($batch->added);

Testing Job / Queue の相互作用

時々、queue の中にある job が自身を queue に戻すことを確認する必要があるかもしれません。または、job が自身を削除することを確認する必要があるかもしれません。これらの queue とのやり取りを test するために、job をインスタンス化し、withFakeQueueInteractions method を呼び出すことができます。

ジョブの queue インタラクションが偽装されたら、 job の上でhandle method を呼び出すことができます。 job を呼び出した後、assetReleasedassertDeleted、およびassertFailedメソッドを使用して、ジョブの queue インタラクションに対するアサーションを行うことができます:

use App\Jobs\ProcessPodcast;

$job = (new ProcessPodcast)->withFakeQueueInteractions();

$job->handle();

$job->assertReleased(delay: 30);
$job->assertDeleted();
$job->assertFailed();

Job Events

beforeafterメソッドをQueuefacade上で使用すると、キューの job が処理される前後に実行されるコールバックを指定することができます。これらのコールバックは、ダッシュボードのための追加の logging や increment 統計を実行する絶好の機会です。通常、これらのメソッドは、service providerboot method から呼び出すべきです。例えば、 Laravel に含まれるAppServiceProviderを使用することができます:

<?php

namespace App\Providers;

use Illuminate\Support\Facades\Queue;
use Illuminate\Support\ServiceProvider;
use Illuminate\Queue\Events\JobProcessed;
use Illuminate\Queue\Events\JobProcessing;

class AppServiceProvider extends ServiceProvider
{
    /**
     * Register any application services.
     */
    public function register(): void
    {
        // ...
    }

    /**
     * Bootstrap any application services.
     */
    public function boot(): void
    {
        Queue::before(function (JobProcessing $event) {
            // $event->connectionName
            // $event->job
            // $event->job->payload()
        });

        Queue::after(function (JobProcessed $event) {
            // $event->connectionName
            // $event->job
            // $event->job->payload()
        });
    }
}

looping method を Queue facadeに使用することで、ワーカーが queue から job を取得しようとする前に実行するコールバックを指定できます。たとえば、以前に失敗した job によって開かれたままの transactions を rollback するクロージャを register することができます。

use Illuminate\Support\Facades\DB;
use Illuminate\Support\Facades\Queue;

Queue::looping(function () {
    while (DB::transactionLevel() > 0) {
        DB::rollBack();
    }
});

当社サイトでは、Cookie を使用しています。各規約をご確認の上ご利用ください:
Cookie Policy, Privacy Policy および Terms of Use