Events & Subscribers
This adds a Medusa-style event/subscriber system with module auto-discovery, DI integration, and offline processing via local JSON or Redis.
Overview
- Subscribers live under
src/modules/<module>/subscribers/*.tsand export:export const metadata = { event: string, persistent?: boolean, id?: string }export default async function(payload, ctx) { /* ... */ }ctx.resolve(name)resolves services from Awilix per-request container.
- Subscribers discovered at build via
modules:prepareand registered into a global Event Bus via the core bootstrap (@open-mercato/core/bootstrap), which your app calls fromsrc/di.ts. - Emit events programmatically via
eventBus.emitEvent(event, payload, { persistent? }). - Two strategies:
- Local: online delivery + optional persistence to
.events/queue.jsonwith state in.events/state.json. - Redis: online delivery + persistence in Redis sorted set.
- Local: online delivery + optional persistence to
- Offline processing:
npm run mercato events process -- [--limit=N]replays unprocessed persistent events.
File Structure
Example subscriber file src/modules/example/subscribers/order-created.ts:
export const metadata = {
event: 'order.created',
persistent: true, // optional, default false
}
export default async function handle(payload: any, ctx: { resolve: <T=any>(name: string) => T }) {
const em = ctx.resolve('em')
// ... do something with payload using DI services
}
IDs are optional; default is "<module>:<nested_path>".
Emitting Events
From any handler with DI access:
const bus = container.resolve('eventBus')
await bus.emitEvent('order.created', { id: 123, total: 42 }, { persistent: true })
Programmatic Registration
Modules can register subscribers in di.ts:
import type { AppContainer } from '@/lib/di/container'
export function register(container: AppContainer) {
const bus = container.resolve<any>('eventBus')
bus.on('custom.event', async (payload, ctx) => {
const em = ctx.resolve('em')
// ...
})
}
Strategy & Persistence
- Select strategy via
EVENTS_STRATEGY=local|redis(defaultlocal). - Redis URL taken from
REDIS_URLorEVENTS_REDIS_URL. - Persistent events are recorded and can be replayed later.
- Local:
.events/queue.jsonand.events/state.jsonin project root. - Redis: keys
events:last_id,events:queue(sorted set),events:last_processed_id.
- Local:
Offline Processing
Process queued events since last processed id:
npm run mercato events process -- --limit=500
The process uses the DI container, so subscriber handlers can resolve services.
Clear queues:
# Remove all queued events (persistent storage)
npm run mercato events clear
# Remove only events already processed (based on last processed id)
npm run mercato events clear-processed
Emit via CLI
Quickly emit an event from the terminal (useful for testing subscribers or seeding flows):
npm run mercato events emit <event> [jsonPayload] [--persistent|-p]
Examples:
# Simple event without payload (non-persistent)
npm run mercato events emit example.event
# Emit with JSON payload (remember to quote it)
npm run mercato events emit order.created '{"id":123,"total":42.5}'
# Emit a persistent event so it is recorded and can be replayed
npm run mercato events emit order.created '{"id":124}' --persistent
# Shorthand for persistent flag
npm run mercato events emit order.created '{"id":125}' -p
Notes:
- Payload is parsed as JSON when possible; otherwise treated as a string.
- Persistent events are delivered online and also stored (local/Redis) for offline processing.
- The CLI uses the DI container, so subscribers can resolve services via
ctx.resolve.
Notes
- Subscribers are executed online on
emitEvent, and also available for offline replay when persistent. - Input validation and security remain the responsibility of the emitting producer/consumer code.
CRUD Events
The CRUD factory emits standard events for module entities:
<module>.<entity>.created<module>.<entity>.updated<module>.<entity>.deleted
Use these to react to lifecycle changes without tightly coupling modules. Mark them persistent to support offline replay.