Middleware v0.3.0+
Middleware allows you to run code at various points in an Inngest function's lifecycle. This is useful for adding custom behavior to your functions, like error reporting and end-to-end encryption.
class MyMiddleware(inngest.Middleware):
async def before_send_events( self, events: list[inngest.Event]) -> None:
print(f"Sending {len(events)} events")
async def after_send_events(self, result: inngest.SendEventsResult) -> None:
print("Done sending events")
inngest_client = inngest.Inngest(
app_id="my_app",
middleware=[MyMiddleware],
)