How a durable workflow engine works: you might not need a queue
Tony Holdstock-Brown · 9/28/2023 · 9 min read
Contents
- What are durable workflow engines?
- What are the benefits?
- How workflow engines work
- How event-driven workflow engines work
Introduction
Much has been said about queues recently. Developers are up in arms arguing whether a database can be a queue. Or if you should stick with queues like we've had since the 90s. With workflow engines, the implementation details of a queue shouldn't matter. You should be able to "just write code" without thinking about queues, state, or events altogether. The conversation about queues misses a point: we're moving on from 20 year old technology.
What are durable workflow engines?
Durable workflow engines are a powerful emerging technology which allow developers to write reliable code without worrying about queues or state management. A good workflow engine is code first, and should get out of your way and let you think about your code - we're not talking about the 2000's business process model notation, here.
Without workflow engines, it's common to write code to run as a task within a queue, push jobs to that queue, and chain jobs together to build what you need, managing state manually. Using a workflow engine is much simpler: you write code as a series of steps in a single function. Each step which is enqueued and attempted until successful, automatically.
Here's an example of a billing function written as a workflow (using TS pseudocode, but any workflow engine works):
// A single workflow function.//// `step` contains workflow primitives:// - `step.run`, a single "transaction", enqueued and ran once on success// - `step.sleep`/`step.sleepUntil`, to suspend a workflow for some amount of time//async ({ event, step }) => {// Wait until the next billing date.await step.sleepUntil(event.data.invoiceDate);for (let i = 0; i < 3; i++) {const charge = await step.run("Attempt charge", async () => {// Note: This throws an error and retries automatically if the charge// cannot be attempted. If a charge is attempted, the returning// object will specify whether the charge is successful.return await lib.stripe.charges.create({user: event.data.accountId,amount: event.data.amount,});});if (charge.success) {// Run two steps in parallel, both enqueued for reliability.// Note that both steps finish before the function continues, and the output// of both steps is available to the function in the future.const [update, send] = await Promise.all([step.run("Update DB", async () => {return await db.payments.upsert(charge)}),step.run("Send receipt", async () => {return await resend.emails.send({to: event.user.email,subject: "Your receipt for Inngest",})})]);// Done. Return the charge for observability.return charge;}// Wait 24 hours and retry.await step.sleep("24h");}// If we're here, we've retried 3 times and we haven't been able to// succeed with a payment. Handle the failure.await step.run("Handle permanently failed payment", async () => {lib.accounts.suspend(event.data.accountId);});};
This function uses workflow engine primitives to create reliable, retriable steps within a single block of code. Workflow engines should retry steps on error - whether that's a network error, worker crash, or some other failure. Each step's data should be recorded in "memory", available for use later within the function.
What are the benefits?
Writing code in this style has lots of benefits. One important benefit is code is declarative and colocated. Instead of having many individually scattered jobs linking to each other, you write a single function with straightforward logic. This is simpler to write, maintain, and debug.
Depending on the workflow engine or platform that you use, lots of other nuances should also be taken care of for you. Things like retries, idempotency, observability, concurrency, and alerting are all provided (with varying levels of ease or defaults). This is a force multiplier. Any engineer can pick this up and work on complex sequences without worrying about race conditions or the complexity of eg. idempotency or backpressure. Frontend engineers, backend engineers, and product engineers can jump right in without learning queueing specifics. This frees up time and makes teams more effective.
How workflow engines work
Every workflow engine provides primitives that developers use to assemble their core workflows.
Two primitives available in every platform are steps
(single transactions that must complete as close
to exactly once as possible) and sleeps
(the ability to wait a specific amount of time and reusme
functions). Both of these require a queue under the hood. Therefore, fundamentally all workflow engines
abstract some sort of queue.
Modern workflow engines typically all do something similar:
- When a workflow is initialized (a
run
), a job is enqueued with the workflow's arguments (input event) recorded. - An execution engine reads jobs from the queue and invokes the workflow, passing in the workflow's current state as an argument to the SDK
- The SDK reads the current state and invokes the underlying workflow code. Any previously completed step is memoized so that it doesn't re-run.
- Another step in the workflow runs. The workflow run's state is updated atomically, and a new step is enqueued.
At a high level, it's a pretty simple loop. Run the workflow, check for a new step, update state, and re-run the function.
While you could describe workflow engines as memoization engines, it's also fair to say that workflow engines turn your functions into something like a generator. Each step runs once, and the function yields to new steps once.
This belies a lot of complexity under the hood to make workflows reliable. What if workflows never end? What if backpressure grows, and workflows are started faster than they finish? How does step parallelism work? These are not simple problems, and the solution changes depending on which workflow engine you use. Some engines push the complexity to you, the developer. Some have no answer to these problems, and stick with single-threaded no-concurrency execution.
How event-driven workflow engines work
Like it says on the tin, event-driven workflow engines also combine semantics that you'd
find in an event stream. If you're familiar with onClick
handlers on the web, you're familiar with events.
An event is an object that contains data which is created whenever something happens. A webhook is an event.
A user signup is an event. And so on.
In this style, workflows are simple functions. They run any time specific events are received. You can also sample events from a stream within a function, or add automatic function cancellation based off of events:
import { lib } from "src/lib";import { inngest } from "src/inngest/client"export default inngest.createFunction({id: "user-onboarding",name: "User onboarding campaign"cancelOn: [// Automatically cancel this function any time the user is deleted. This prevents// developers from having to pass or store workflow run IDs around, or handle state// checking within functions.{event: "app/user.deleted",if: "event.data.userId == async.data.userId",},]},{ event: "app/user.signup" }, // Automatically run instantly any time this event is receivedasync ({ event, step }) => {// Wait for the `app/user.profile.completed` event to be received with the same user// ID for up to 24 hours. Whenever the event is received, this function will be// resumed and `profileComplete` will hold the received event data. If this times out// the variable will be `null`.const profileComplete = await step.waitForEvent("app/user.profile.completed",{timeout: "24h",if: `async.data.userId == "${event.data.userId}"`,});if (profileComplete === null) {await step.run("send-followup", async () => {// Send the user a followup.});}});
This decouples code. Instead of modifying code which handles user signups to invoke functions, we create a workflow which automatically runs any time the signup event is received. An event can trigger more than one function at once.
To make this happen, a few extra additions are necessary to a regular workflow engine. The engine itself needs to be modified to work with event streams. It needs to combine queues with message brokers. This is a non-trivial change.
When an event is received by the workflow engine, a few things happen in parallel:
- New function runs are started if they're triggered by the event
- Function cancellations are evaluated and processed
waitForEvent
signals are evaluated and processed
When triggering new function runs, several extra things happen automatically (though this is unique to Inngest):
- User-defined rate limiting is applied for each function
- User-defined debounces are applied for each function
- Function-based idempotency is checked and applied
- Events are bundled into batches, if a function opts in to event batching
From there, these workflow engines work as expected: it pushes a job into a queue so that workflows can be executed reliably.
Side note: Inngest is an event-driven workflow engine that provides everything out of the box (eg. batching, concurrency, rate-limiting, debounce, branch deploys, and so on). Inngest's queue is unique. It creates two queues for every function deployed. Workers are shared-nothing, and pick up work from each queue without communication and minimal contention, whilst still having guarantees that the oldest jobs are worked on first. It prioritizes finishing functions in order (excepting retries), and allows simple management of backpressure and load-shedding. This improves scale, running functions seamlessly across a fleet of executors without worrying about conflicting state or coordination. We'll dive into event matching and queueing systems in future blog posts.
Side side note: workflows are such an overloaded term and I think does this category a disservice. We really need a better name for this style of working.
I hope this gives an overview to workflow engines, how they're used, and the basic execution loops that power them. Workflow engines are modern replacements for queues, offering tooling and semantics above and beyond what raw infrastructure provides. They're a force multiplier, a new type of tool that improves how we build products day to day.
Help shape the future of Inngest
Ask questions, give feedback, and share feature requests
Join our Discord!