Dustin LeBlanc

Dustin LeBlanc

· 2 min read

Keeping all the balls in the air

Async Laravel with jobs and events

Interesting software often needs to do some time-or-cpu intensive tasks. Whether it's sending an email, starting an automation script, or something else, if the user doesn't need to see the result before showing them a web page, it should probably be done in an asynchronous job.

Photo by Yi Liu on Unsplash

Laravel developers are living the good life, and at times we don't realize how good. Other frameworks (or CMS systems) don't have the good fortune to be built with all the tools packed into Laravel. While systems like Drupal have a queue and batch system, I find that working with these systems in Laravel (and on infrastructure that is used to supporting Laravel), it is much easier to write software that makes use of them.

When working on Dropship CI, we made heavy use of Laravel's queue system. Almost everything that powered the system ran on a combination of queued jobs and a Kubernetes cluster that was being orchestrated by the application. When a commit was pushed to Github, the results in Dropship looked like this:

A simplified diagram of a build job starting in Dropship CI

In this example, the WebhookController's only job is to receive webhooks, verify their authenticity, save them to the database, and then queue up a job to decide what to do with them. Github gets a response back almost immediately, which is important when your app may be getting thousands of such requests an hour. The controller class is only 31 lines long, including import statements:

<?php

namespace App\Http\Controllers;

use App\Jobs\ProcessWebhook;
use App\Models\WebhookReceipt;
use Illuminate\Http\Request;
use Illuminate\Http\Response;

class WebhookController extends Controller
{
    public function handle(Request $request)
    {
        // Validate the request is from our Github Webhook.
        $sha1 = hash_hmac(
            'sha1',
            $request->getContent(),
            env('GITHUB_WEBHOOK_SECRET')
        );
        $signature = "sha1={$sha1}";
        if (!hash_equals($request->header('X-Hub-Signature'), $signature)) {
            return new Response('Invalid Request', 500);
        }
        $receipt = WebhookReceipt::create([
            'event_name' => $request->header('X-GitHub-Event') ?? 'push',
            'event_data' => $request->getContent()
        ]);
        ProcessWebhook::dispatch($receipt);
        return new Response('This hook was legit!', 200);
    }
}

The code that processes that webhook is a lot more complex, depending on which event we're receiving from Github, we may want to start a build, save some new repository metadata, delete some authorizations, etc.

None of this matters to serving back the response though. If we made the request wait for everything that just started as a result of the hook, it would probably be waiting for more than 10 minutes for most builds.

The magic of Laravel's jobs system comes in the two statements before the last return:

$receipt = WebhookReceipt::create([
    'event_name' => $request->header('X-GitHub-Event') ?? 'push',
    'event_data' => $request->getContent()
]);
ProcessWebhook::dispatch($receipt);

These five lines are first saving a new model to the database (called WebhookReciept), and then passing that model immediately into a new ProcessWebhook queued job. The details of this job are stored in a Redis queue, waiting for our worker process to come along and pluck the item out of the queue and work on it.

The job class is a bit more complex, here is a truncated version:

<?php

namespace App\Jobs;

use App\Models\Build;
use App\Events\Builds\BuildCreated;
use App\Models\Project;
use App\Models\WebhookReceipt;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use Illuminate\Support\Facades\Log;

class ProcessWebhook implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
    /**
     * @var WebhookReceipt
     */
    public $webhookReceipt;

    /**
     * Create a new job instance.
     *
     * @param WebhookReceipt $webhookReceipt
     */
    public function __construct(WebhookReceipt $webhookReceipt)
    {

        $this->webhookReceipt = $webhookReceipt;
    }

    /**
     * Execute the job.
     *
     * @return void
     */
    public function handle()
    {
        $handlers = [
            'installation' => 'handleInstallationEvent',
            'installation_repositories' => 'handleInstallationUpdateEvent',
            'push' => 'handlePushEvent'
        ];
        return array_key_exists($this->webhookReceipt->event_name, $handlers) ? call_user_func(
            [
                $this,
                $handlers[$this->webhookReceipt->event_name]
            ]
        ) : null ;
    }
  
    protected function handlePushEvent()
    {
        $eventData = $this->webhookReceipt->event_data;
        // Immediately bail on branch deletions.
        if ($eventData->deleted === true) {
            return;
        }
        $repository = GitRepository::where('provider_repo_id', $eventData->repository->id)->firstOrFail();
        $project = Project::where(['git_repository_id' => $repository->id])->firstOrFail();
        $previousBuilds = $project->builds->sortByDesc('project_build_number');
        $buildNumber = count($previousBuilds) >= 1 ? $previousBuilds->first()->project_build_number + 1 : 1;

        // create new build in db
        $build = Build::create([
            'project_id' => $project->id,
            'branch' => $eventData->ref ?? 'refs/head/master',
            'commit_id' => $eventData->head_commit->id ?? '',
            'project_build_number' => $buildNumber,
            'previous_state' => $project->lastBuild->state ?? '',
            'created_by' => $eventData->pusher->email,
            'state' => 'Pending',
            'summary' => $eventData->head_commit->message ?? '',
        ]);
        BuildCreated::dispatch($build);
    }
}

I've removed all the handlers except the push event handler to keep things a bit more simple. You can see the bones of a Laravel job here though, take a look at the constructor:

    /**
     * Create a new job instance.
     *
     * @param WebhookReceipt $webhookReceipt
     */
    public function __construct(WebhookReceipt $webhookReceipt)
    {

        $this->webhookReceipt = $webhookReceipt;
    }

Here is where we pass in the webhook receipt when the job is created. In Laravel, the constructor of the job class gets called when the job gets saved to the queue, so set up any data you need to store with the job here so that the job can access it when it starts.

The rest of the job class is concerned with doing the work. we need a main entry point that is executed by the handle method on the job:

    /**
     * Execute the job.
     *
     * @return void
     */
    public function handle()
    {
        $handlers = [
            'installation' => 'handleInstallationEvent',
            'installation_repositories' => 'handleInstallationUpdateEvent',
            'push' => 'handlePushEvent'
        ];
        return array_key_exists($this->webhookReceipt->event_name, $handlers) ? call_user_func(
            [
                $this,
                $handlers[$this->webhookReceipt->event_name]
            ]
        ) : null ;
    }

This generic handle method is abstracted so that it can deal with a multitude of different events. The $handlers array maps the event name (the keys) to a specific method to call (the values), and then we handoff to the dedicated handler,

The meat of a push event is handled in the handlePushEvent method, which looks like this:

protected function handlePushEvent()
{
    $eventData = $this->webhookReceipt->event_data;
    // Immediately bail on branch deletions.gi
    if ($eventData->deleted === true) {
      return;
    }
    $repository = GitRepository::where('provider_repo_id', $eventData->repository->id)->firstOrFail();
    $project = Project::where(['git_repository_id' => $repository->id])->firstOrFail();
    $previousBuilds = $project->builds->sortByDesc('project_build_number');
    $buildNumber = count($previousBuilds) >= 1 ? $previousBuilds->first()->project_build_number + 1 : 1;

    // create new build in db
    $build = Build::create([
      'project_id' => $project->id,
      'branch' => $eventData->ref ?? 'refs/head/master',
      'commit_id' => $eventData->head_commit->id ?? '',
      'project_build_number' => $buildNumber,
      'previous_state' => $project->lastBuild->state ?? '',
      'created_by' => $eventData->pusher->email,
      'state' => 'Pending',
      'summary' => $eventData->head_commit->message ?? '',
    ]);
    BuildCreated::dispatch($build);
}

You can see this job ends similarly to how we ended the webhook receipt controller but with a small difference. After creating the build model, we dispatch an event rather than dispatching another job. In this case, we're taking advantage of another handy asynchronous system in laravel so that we can have both our job system and our web client listen for builds to be created. We can talk about how Laravel Echo lets us listen to a websocket server to send users push notifications in real-time in a future post, but for now, we're just going to start another series of jobs as a response to the event.

<?php

namespace App\Listeners\Builds;

use App\Jobs\BuildSteps\BuildTheme;
use App\Jobs\BuildSteps\CloneRepo;
use App\Jobs\BuildSteps\LintCode;
use App\Jobs\BuildSteps\PrepareForPantheon;
use App\Jobs\BuildSteps\TerminusAddSshKey;
use App\Jobs\BuildSteps\DeployToPantheon;
use App\Jobs\BuildSteps\InstallDeps;
use App\Events\Builds\BuildCreated;
use App\Jobs\BuildSteps\TerminusAuth;
use App\Jobs\BuildSteps\TerminusCleanUpPullRequests;
use App\Jobs\BuildSteps\TerminusClearDrupalCache;
use App\Jobs\BuildSteps\TerminusClearEnvCache;
use App\Jobs\BuildSteps\TerminusConfigImport;
use App\Jobs\BuildSteps\TerminusUpdateDatabase;
use App\Jobs\CleanupBuild;
use App\Services\PodManager;

class StartBuild
{
    /**
     * @var PodManager
     */
    private $podManager;

    /**
     * Create the event listener.
     *
     * @param  PodManager  $podManager
     */
    public function __construct(PodManager $podManager)
    {
        $this->podManager = $podManager;
    }

    /**
     * Handle the event.
     *
     * @param BuildCreated $event
     * @return void
     */
    public function handle(BuildCreated $event)
    {
        $build = $event->build->resource;
        $build->state = "Pending";
        $secret = $this->podManager->generateSecret($build);
        $pod = $this->podManager->startPodFromBuild($build);
        $build->pod_data = $pod;
        $build->secret_data = $secret;
        $build->save();
        while (!$this->podManager->podIsUp($pod)) {
            sleep(1);
        }
        $chain = [
            'composer_install' => [
                ['step' => new InstallDeps($build), 'weight' => 0],
            ],
            'lint_code' => [
                ['step' => new LintCode($build), 'weight' => 1]
            ],
            'theme_compile' => [
                ['step' => new BuildTheme($build), 'weight' => 2]
            ],
            'deploy_to_pantheon' => [
                ['step' => new TerminusAuth($build), 'weight' => 3],
                ['step' => new TerminusAddSshKey($build), 'weight' => 4],
                ['step' => new PrepareForPantheon($build), 'weight' => 5],
                ['step' => new DeployToPantheon($build), 'weight' => 6],
                ['step' => new TerminusUpdateDatabase($build), 'weight' => 7],
                ['step' => new TerminusConfigImport($build), 'weight' => 8],
                ['step' => new TerminusClearDrupalCache($build), 'weight' => 9],
                ['step' => new TerminusClearEnvCache($build), 'weight' => 10],
                ['step' => new TerminusCleanUpPullRequests($build), 'weight' => 11],
            ],
            'default' => [
                ['step' => new CleanupBuild($build), 'weight' => 12],
            ]
        ];
        $doing = collect($build->project->build_steps)->flatMap(function ($step) use ($chain) {
            return $chain[$step];
        })->unique()->values()->sortBy('weight')->pluck('step')->all();
        CloneRepo::withChain($doing)->dispatch($build)->allOnQueue('builds');
    }
}

Here we have an event listener for our BuildCreated event. To explain what is going on here, first, we set the state of the build to pending, then we ask our Kubernetes cluster to start a new pod for us, we save the metadata about that pod back to the job, and then wait for the pod to come up in the cluster.

Once the pod has reported back as up, we build out a list of jobs to run for the CI build, depending on what options the user has configured for their project. Finally, we sort those jobs out, and dispatch them as a a chained set of jobs that are executed against the Kubernetes pod we just created:

$chain = [
            'composer_install' => [
                ['step' => new InstallDeps($build), 'weight' => 0],
            ],
            'lint_code' => [
                ['step' => new LintCode($build), 'weight' => 1]
            ],
            'theme_compile' => [
                ['step' => new BuildTheme($build), 'weight' => 2]
            ],
            'deploy_to_pantheon' => [
                ['step' => new TerminusAuth($build), 'weight' => 3],
                ['step' => new TerminusAddSshKey($build), 'weight' => 4],
                ['step' => new PrepareForPantheon($build), 'weight' => 5],
                ['step' => new DeployToPantheon($build), 'weight' => 6],
                ['step' => new TerminusUpdateDatabase($build), 'weight' => 7],
                ['step' => new TerminusConfigImport($build), 'weight' => 8],
                ['step' => new TerminusClearDrupalCache($build), 'weight' => 9],
                ['step' => new TerminusClearEnvCache($build), 'weight' => 10],
                ['step' => new TerminusCleanUpPullRequests($build), 'weight' => 11],
            ],
            'default' => [
                ['step' => new CleanupBuild($build), 'weight' => 12],
            ]
        ];
        $doing = collect($build->project->build_steps)->flatMap(function ($step) use ($chain) {
            return $chain[$step];
        })->unique()->values()->sortBy('weight')->pluck('step')->all();
        CloneRepo::withChain($doing)->dispatch($build)->allOnQueue('builds');

Each one of those new calls is a different job that gets dispatched onto the queue, and they are set to process in order, and on failure of any of the jobs, the pod is cleaned up and the results are reported.

I hope this inspires you to think of some things in your application that could be better handled by queued jobs. If you have any questions, reach out to me either via the contact form or on Twitter @DustinLeblanc.

Get updates straight to your inbox

We'll send out an update about once a week with new posts from the blog and news about what we're up to.

We care about the protection of your data. Read our Privacy Policy.

© 2024 Unrealist Technologies, LLC. All rights reserved.