Skip to main content

Reacting to events at Orus

· 14 min read
Samuel Rossille
Samuel Rossille
Chief Technology Officer
Cover

In our previous article, we introduced how event sourcing works at Orus. Instead of mutating rows in a database, we append immutable events to an event store. We then derive the current state of any entity (a contract, an invoice, a user, or whatever...) by replaying those events through a reducer.

That covers writing data and reading it. But there's a third thing every real application needs to do: react to what happened.

When a contract is signed, an invoice needs to be generated. When a payment fails, a broker needs to be notified. When a user uploads a document, an AI analysis should kick off. None of that happens by itself just because you appended an event.

This calls for additional infrastructure.

This article is the story of how we built something that plays nice with event sourcing. We'll walk through what we started with, why it worked, and how we improved it over time and arrived to what we have today. If you want to skip ahead to the current system, jump straight to Jobs and dispatchers.

Store consumer workers

We started by introducing this simple thing: the store consumer worker.

It's a background process that reads from one event store and calls a handler for each event, in order. It keeps track of how far it has gotten using a cursor stored in MongoDB, so it picks up where it left off if it restarts. New events are handled immediately via Redis pub/sub messages. If there was no event for a given (short) interval, proactive query on the db makes sure no event was missed.

storeConsumerWorkerFactory.createStoreConsumerWorker(
contractStore,
'contract_signed',
async (event) => {
await welcomeEmailService.sendWelcomeEmail(event.payload.contractId)
},
)

The semantics are straightforward: at-least-once delivery, sequential per worker, with the cursor advancing after each successful call. If the handler throws, the worker stops and the cursor doesn't move, and the same event will be retried on the next run.

This primitive is what powers persisted views. The subscription view you can query to list contracts? It's maintained by store consumer workers listening to the relevant event stores. Whenever a new event arrives, the worker recomputes the affected document and upserts it into a MongoDB collection.

Reactors

Store consumer workers are flexible, but wiring them up manually for every use case gets repetitive quickly. Each worker needs a unique name, a starting point, and the boilerplate to create it. When you have dozens of reactions to register across a codebase, that adds up.

So we built reactors as a layer of syntactic sugar on top.

A reactor is defined as a map of event types to handler functions. Under the hood it creates one store consumer worker per event type.

So here's what it could look like:

const userAccountReactors: Reactor<UserAccountEvent> = {
'email_verified': {
react: async (event) => {
await notificationService.sendNotification({
type: 'email_verified',
userId: event.payload.userId,
})
},
},
'user_banned': {
react: async (event) => {
await authService.disableUserAccount(event.payload.userId)
},
},
}

What we loved about it

The reactor model has one quality became more and more apparent as the system grew: it organizes code around what happened in the business, not around what a service does. Each new behavior lives in isolation in a new reactor instead of being accumulated in existing code.

Take a contract signature. In a classic CRUD backend, the logic to handle a signature might live inside a ContractService.sign() method that accumulates responsibilities over time: generate the invoice, notify the broker, run a compliance check, fire a webhook, update the search index... Error management, retries and coordination are all left to the developer to manage. This can quickly lead to convoluted code and mistakes that end up in bugs.

With reactors, each reaction lives in its own handler, registered independently. The team working on compliance doesn't need to touch the invoicing code. A new integration (say, a partner webhook) is just a new reactor on contract_signed. No existing code needs to be modified, and each individual reaction is self-contained and easy to understand.

When a contract is signed today, here is what independently reacts to that single event:

  • The invoicing module generates the first invoice
  • The search module updates the contract index
  • The compliance module runs an AML scan
  • The data pipeline pushes the contract to the analytics platform
  • The Qonto module, if applicable, creates the insurance policy on the partner side
  • The organization activity module records the signature in the activity feed
  • And many more...

None of those modules know about each other. They're all just listening to the same event.

The limitations of the model

This worked well for a long time. But as the system grew, some problems started to show up.

A crude retry model.

If a handler throws, the worker retries a few times and then stops processing events for that event type entirely. This is unavoidable for the specific entity that caused the problem. You can't skip it and move on, because the next event for that entity might depend on this one being handled first. But it also blocks every other entity. If a bug prevents the welcome email from being sent to one specific user, no welcome email can be sent to anyone until the issue is fixed.

This is an unnecessary (and sometimes unacceptable) degradation of the user experience. This situation led us to implement ad hoc graceful degradations in the most critical places, and even things that started to dangerously look like a badly implemented dead letter queue.

No scheduling.

Reactors fire immediately and synchronously. There's no way to say "send this email 24 hours from now." We worked around it with cron jobs and separate scheduled tasks. This introduced additional infrastructure needs and tedious boilerplate for relatively simple things.

No state awareness.

A reactor only receives the raw event and the store. It has no concept of the aggregate state before or after the event. For handlers that need to know what the contract looked like before the change, we had to write a workaround that manually re-fetched all previous events and reduced them.

It worked, but it was expensive in terms of CPU and database reads, brittle, and hard to reason about.

The dead end

As Orus scaled, it became more and more obvious that we needed a better approach if we wanted the business code to stay simple, and exempt from infrastructure concerns.

Jobs and dispatchers

The v2 system splits the reactor's single responsibility into two distinct concepts.

A dispatcher reads from the event store and decides what work needs to be triggered. It's purely declarative. No side effects. Just "given this event, here are the tasks I want to enqueue."

A job defines how to do a piece of work. It's where the actual logic lives: sending the email, calling the external API, updating the database, etc. The job system handles all the operational concerns: retries, concurrency, scheduling, monitoring.

The dispatcher is the bridge between the event world and the job world.

Jobs

A job has a name, a typed payload, a retry policy, a concurrency setting, and a process function.

export const sendBrokerContractTerminatedNotificationEmailJob = job({
name: 'send_broker_contract_terminated_notification_email',
description: 'Send email to broker 24h after their client contract is terminated',
payloadSchema: type({
subscriptionId: 'string',
terminationReason: 'string',
}),
version: 1,
concurrency: DEFAULT_JOB_CONCURRENCY,
autoRetryStrategyBeforeStall: DEFAULT_JOB_AUTO_RETRY_STRATEGY_BEFORE_STALL,
process:
({ subscriptionView, contractTerminatedBrokerNotificationEmailService }) =>
async ({ task }) => {
const { contractId, terminationReason } = task.payload

const stateAndContext = await contractView.getContractAndStateContextById(contractId, 'display')
// ...
await contractTerminatedBrokerNotificationEmailService.sendEmailIfApplicable({
contract,
terminationReason,
})
},
})

A task is an instance of a job: a concrete unit of work with a specific payload. Tasks live in a MongoDB collection. No new infrastructure: the same database that stores events also stores the task queue.

Concurrency keys

Two of our original requirements seemed contradictory: we want tasks to be processed in order, and we want to process as many tasks in parallel as possible.

The resolution is that we almost never need global ordering. We need per-entity ordering. We want the tasks for a given contract to be processed in order, but two different contracts can be processed concurrently without any problem.

That's what a concurrency key is. Tasks with the same concurrency key for a given job are guaranteed to run in order. Tasks with different concurrency keys run in parallel.

Job "send_email"

key "contract:abc" → task 1 → task 3 → task 4
key "contract:xyz" → task 2 → task 5 → task 6

The two queues run independently. A slow or failing task in contract:abc doesn't block contract:xyz at all.

When a task exhausts its retries, its concurrency key is marked as stalled. Everything in that key stops. Everything else keeps running normally. A developer can go to the backoffice, see exactly what stalled and why, and restart it with a button.

Dispatchers

A dispatcher subscribes to an event store, iterates events one by one in order, and for each event returns a list of task requests to enqueue.

Here's the one that handles contract terminations:

export const sendBrokerContractTerminatedNotificationDispatcher = dispatcher({
name: 'send_broker_contract_terminated_notification',
getStore: ({ contractEventStore }) => contractEventStore,
eventTypes: ['protection_status_changed'],
autoRetryStrategyBeforeStall: DEFAULT_DISPATCHER_AUTO_RETRY_STRATEGY_BEFORE_STALL,
dispatch: () => async ({ event }) => {
if (event.payload.newStatus !== 'terminated') {
// If the new status is not terminated, we don't need to do anything
return []
}

const scheduledAt = new Date(event.newStatusStartTimestamp + 24 * 60 * 60 * 1000)

return [
taskRequest({
job: sendBrokerContractTerminatedNotificationEmailJob,
idempotencyKey: event.idempotencyKey,
payload: {
contractId: event.payload.contractId,
terminationReason: event.reason,
},
concurrencyKey: contractConcurrencyKey(event.contractId),
scheduledAt,
}),
]
},
bootstrap: () => ({ type: 'custom', dispatchManually: () => async function* () {} }),
})

Three things worth noticing:

The dispatcher is purely a routing decision. It doesn't load any data, doesn't call any service. It just looks at the event and decides what to create. If the contract wasn't terminated, it returns an empty array and moves on. Although it's technically possible to implement more complex logic here, it's a slippery slope to creating a bottleneck due to the sequential processing of events. Going down this path would defeat the purpose of the dispatcher, and we avoid it.

Scheduling is just a field. The scheduledAt is set to 24 hours after the termination timestamp. The job system takes care of not running the task before that time. The intent to notify the broker the day after termination is expressed in one line, right next to the business logic that determines when to trigger it.

The retry policy and stall behavior belong to the dispatcher too. If the dispatcher fails to process an event (say, a transient database error while enqueuing tasks), it retries according to its own policy. If that's exhausted, the dispatcher stalls and the backoffice shows exactly which event it stopped on.

Bootstrapping

A dispatcher keeps a single cursor: the ID of the last event it processed. When it starts for the first time, it needs to know where to begin.

There are three strategies:

  • beginning: start from the very first event in the store. Useful when you need to rebuild a full projection from scratch.
  • lastEventToSkipQuery: skip everything before a given timestamp or app version, and only process events from that point forward.
  • custom: fetch the latest event ID, run an arbitrary async generator to enqueue an initial batch of tasks (for example, one task per existing entity rather than one per historical event), then resume processing new events from that point.

The custom strategy is what makes it practical to introduce a new materialized view on a store with hundreds of thousands of events. Instead of replaying every event one by one, you iterate the existing entities, enqueue one task each, and start fresh from there.

One constraint worth noting: the custom bootstrap phase has no cursor to fall back on if it fails partway through. It's designed to be fast and lightweight — enumerate entities, enqueue tasks, done. If you find yourself loading complex views or performing heavy computation during bootstrap, that's a sign the approach may not be the right fit for that use case.

How the infrastructure works

The task queue lives in a single MongoDB collection (job_tasks_queues), with one document per (job, concurrencyKey) pair. The document holds the currently active task at its head, and the ordered backlog behind it.

When a processor finishes a task, it atomically promotes the next one from the queue. Tasks are claimed with a short-lived lease (60 seconds) renewed every 20 seconds while the job is running. If a pod crashes mid-task, the lease expires and another pod picks it up.

Work is distributed across pods using rendezvous hashing — each dispatcher and job processor runs on exactly one pod at a time, with no central coordinator.

:::note Rendezvous hashing

Each pod independently computes a score for every (podId, itemId) pair using the same deterministic hash function, and claims the items where it scores highest. Because every pod runs the same calculation against the same list of live pods, they all converge on the same assignment without talking to each other. When a pod is added or removed, only the items it owned get reassigned — everything else stays put.

:::

None of this required new infrastructure. It's all MongoDB and the Redis we already had.

Migrating from reactors to jobs and dispatchers

We didn't rewrite everything overnight. The two systems coexist, and we migrate reactors to jobs and dispatchers gradually.

The tricky part of any such migration is the handover: you need to stop the old system and start the new one without dropping any events or processing any event twice.

We handle this with a turning point date. The v1 reactor is told to stop processing events at or after a specific timestamp. The v2 dispatcher is configured to only start from that same timestamp. Before processing any event for the first time, the dispatcher waits until the reactor has confirmed it's caught up to that point in the stream.

// workers-v1-to-v2.ts
export const TURNING_POINT_DATE_PER_CONSUMER_NAME = {
'store.quote_input_set_event.reactors.changeSubscriptionUser.update':
new Date('2025-12-23T13:37:00.000Z'),
}

Once the v2 dispatcher has taken over and everything looks healthy in the logs, a second PR removes the turning point configuration and deletes the old reactor. Clean cut.

And to make the migration easier, we've added an AI agent skill that automatically generates the two PRs required for migrating a reactor to a dispatcher and a job.

Conclusion

Reactors gave us the right mental model from the start: organize code around what happened in the business, not around what a service does. That separation has real value, and we've kept it.

What jobs and dispatchers add on top is everything you need to operate that code with confidence in production: retry policies, per-entity failure isolation, deferred scheduling, and a monitoring UI that tells you exactly what's happening at any point in time.

The event store isn't just where you write data anymore. It's the clock that drives everything else.

I want to close with a personal note. This system was engineered by Marvin, and I'm very grateful for the care and thoughtfulness he put into it.

I wasn't convinced at first. Building an in-house job queue initially felt like no less than a rookie mistake.

What changed my mind was a combination of two things. First, no additional critical infrastructure to operate. No Kafka cluster to provision, monitor, and tune, no RabbitMQ to babysit in production. Second, and more subtly: the business code has no idea any of this exists. Dispatchers and jobs are pure TypeScript. If we ever needed to swap the underlying queue for Kafka or RabbitMQ, the migration surface would be small and well-contained — the business logic wouldn't move at all. Six months in, the system runs smoothly. And when something does go wrong, being able to inspect and restart a stalled job directly from the backoffice, rather than digging through a Kafka consumer group offset, is a quality-of-life improvement that's hard to overstate.

info

Want to work with event sourcing? We're based in Paris and we're hiring Software Engineers! Check out our open positions!