Middleware
v0.3.0+

Middleware allows you to run code at various points in an Inngest function's lifecycle.

class MyMiddleware(inngest.Middleware):
    async def transform_input(
        self,
        ctx: inngest.Context,
    ) -> inngest.Context:
        # Modify the context that is passed to an Inngest function
        return ctx

    async def before_execution(self) -> None:
        # Do stuff before executing an Inngest function
        pass

    async def after_execution(self) -> None:
        # Do stuff after executing an Inngest function
        pass

    async def transform_output(
        self,
        output: inngest.Output,
    ) -> inngest.Output:
        # Transform the output returned by a step or an Inngest function
        return output

    async def before_response(self) -> None:
        # Do stuff before responding to the Inngest server
        pass

inngest_client = inngest.Inngest(
    app_id="my_app",
    middleware=[MyMiddleware],
)