Middleware v0.3.0+
Middleware allows you to run code at various points in an Inngest function's lifecycle.
class MyMiddleware(inngest.Middleware):
async def transform_input(
self,
ctx: inngest.Context,
) -> inngest.Context:
# Modify the context that is passed to an Inngest function
return ctx
async def before_execution(self) -> None:
# Do stuff before executing an Inngest function
pass
async def after_execution(self) -> None:
# Do stuff after executing an Inngest function
pass
async def transform_output(
self,
output: inngest.Output,
) -> inngest.Output:
# Transform the output returned by a step or an Inngest function
return output
async def before_response(self) -> None:
# Do stuff before responding to the Inngest server
pass
inngest_client = inngest.Inngest(
app_id="my_app",
middleware=[MyMiddleware],
)