We've just raised $6.1M in new funding led by a16z.
Featured image for AI in production: Managing capacity with flow control blog post
Image by Didssph on Unsplash

AI in production: Managing capacity with flow control

What do you need to take your LLM based product from demo to production?

Dan Farrelly · 4/18/2024 · 13 min read

Whether it's a core part of your functionality or a behind-the-scenes helper, if you're building right now, you're probably building with AI. AI offers so much functionality to your product that it has become almost expected.

But like any API, once it is integrated into your product, you must manage it. AI APIs are constantly under load, and AI services have responded to that by locking them down with rate limits and high costs. You must carefully monitor and manage your AI usage to avoid exceeding limits or incurring unexpectedly high bills.

Here's how Inngest can help.

The problems you'll hit when you build with AI

If you build AI functionality into your product, your first port of call will probably be the documentation for any AI services. Let's go with OpenAI as an example. Here's the example code for a text generation function:

ts
import OpenAI from "openai";
const openai = new OpenAI();
async function main() {
const completion = await openai.chat.completions.create({
messages: [{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Who won the world series in 2020?"},
{"role": "assistant", "content": "The Los Angeles Dodgers won the World Series in 2020."},
{"role": "user", "content": "Where was it played?"}],
model: "gpt-3.5-turbo",
});
console.log(completion.choices[0]);
}
main();

Easy. Copy, paste, ready to go.

Obviously not. Taking this simple snippet from POC to production is a challenge. It might work when you have a few users. It's not going to work once you are even into triple-digit customers.

What are the problems you're going to face? We can split them into two basic categories.

Dealing with users

First, you have the good problem of users. Your product is so popular that users just can't help themselves, putting strain on your AI API calls. You have to manage this capacity to ensure a good experience for these users.

Second, you have the bad problem of users—people looking to exploit weaknesses in your service. Unrestrained, these users will spam your APIs, driving up your costs and causing service outages for regular users.

Third, a really good problem to have. Some users will upgrade to a paid account and expect a better experience and more throughput. How will you give that to them using that simple snippet above?

Dealing with the API

So, user problems beget API problems. As your user base grows, you'll run into limitations and challenges with the AI APIs themselves.

First, AI APIs are rate-limited. Scale isn't infinite. Even major providers like OpenAI, Google, and Anthropic must limit their services to ensure they can handle the load and maintain performance.

Second, AI APIs are expensive. The scale they have costs money, so they can be expensive to call. OpenAI, for example, charges $30 per 1 million tokens. This may seem like a lot, but once you have hundreds or thousands of users calling your service each day, costs can quickly add up.

Third, AI APIs are flaky. Scale doesn't always work perfectly, so in their nascent state, these AI services can flake out constantly, so you need a way to retry calls to ensure that the flakiness doesn't cascade to your product and impact your users' experience.

Durable execution and flow control are required. Adding these allows you to run AI reliably while controlling costs and user experience.

Comply with API rate limits with throttling

OpenAI has a 5k requests/minute rate limit on the mid-tier level. This may seem like a lot initially, but it is easy to hit once you are seeing success.

Throttle can be used to flatten spikes in usage and ensure you don't hit the rate limits. Let's say OpenAI releases a new model, and all your users suddenly want a piece of the action. If everyone tries to access the service simultaneously, you get the problems outlined above, such as bumping up against the rate limits, leading to a poor user experience.

With throttle, you can limit the number of calls to a function over a set period. In this case, let's say the OpenAI API can only be called ten times per minute. We can define this limit on the function:

ts
inngest.createFunction(
{
id: "ai-generate-summary",
throttle: {
limit: 10,
period: "1m",
}
},
{ event: "ai/summary.requested" },
async ({ event, step }) => {
/* call OpenAI */
}
);

Here, the configuration ensures that only ten runs of this function will execute per minute. The other runs will be held in a virtual First-In, First-Out queue, and each will then execute as the other runs finish.

Smooth usage using concurrency

Concurrency can also be used to flatten out spikes, but is a little different from throttle. Concurrency is about managing capacity (concurrent work), while throttle is about managing rate. This is useful for preventing a single user from utilizing disproportionate amount of capacity. It also helps when you run your own self-hosted models and there is not specific rate limit, but you want to control how many concurrent calls can be made.

Concurrency can be configured to be applied to different "scopes" within Inngest. You can control concurrency at the function scope (a single queue) or the account scope (across all functions and their queues).

You can also specify a key to apply each concurrency limit to a specific user or group of users. This means the capacity is spread evenly across users for fairness, and no single user can monopolize the service.

ts
inngest.createFunction(
{
id: "ai-generate-summary",
concurrency: {
scope: "fn",
key: "event.data.user_id",
limit: 1,
}
},
{ event: "ai/summary.requested" },
async ({ event, step }) => {
/* call OpenAI */
}
);

Here, the key is the user ID, which limits each user to a single concurrent call of the function. We also can apply multiple levels of concurrency or combine concurrency with throttle:

ts
inngest.createFunction(
{
id: "ai-generate-summary",
throttle: {
limit: 10,
period: "1m",
}
concurrency: {
scope: "fn",
key: "event.data.user_id",
limit: 1,
},
},
{ event: "ai/summary.requested" },
async ({ event, step }) => {
/* call OpenAI */
}
);

So here, this function will only execute ten times per minute, and each user can only have one concurrent execution.

Using models with different rate limits

We can also use throttle to manage different rate limits where we can call one API more frequently than another. Let's say you use LLM chaining within your application and can call one LLM for preprocessing more often than the main LLM. In that case, we can create two separate functions:

ts
const highLimitFunction = inngest.createFunction(
{
id: "pre-process-text-with-llama",
throttle: {
limit: 100,
period: '1m',
},
},
{ event: "ai/preprocess.text" },
async ({ event, step }) => {
/* call Llama */
}
)
const lowLimitFunction = inngest.createFunction(
{
id: "summarize-with-openai",
throttle: {
limit: 10,
period: '1m',
},
},
{ event: "ai/create.summary" },
async ({ event, step }) => {
/* call OpenAI */
}
)

Then, in a third function, we can reference these two functions:

ts
const mainFunction = inngest.createFunction(
{ id: "main-function" },
{ event: "ai/summary.requested" },
async ({ event, step }) => {
const preprocessed_text = await step.invoke("preprocess-text-with-llama", {
function: highLimitFunction,
data: { string: event.data.text },
})
const preprocessed_text = await step.invoke("summarize-text-with-openai", {
function: lowLimitFunction,
data: { string: preprocessed_text },
})
}
)

The main function calls the two other functions, which each have their own throttle rates. This allows you to manage the rate limits of each API independently, giving you more fine-grained control. This also can be useful if you want to conditionally call one LLM or another based on the input data (for example, free users get routed to the cheaper model with higher limits).

Control costs using debounce

Even if we keep within the rate limit, costs can swell if we're not careful about how we call the APIs. Bad actors can cause the first issue here. Users might spam your services, trying to access the AI APIs continually. This can be caught partly by the concurrency controls above, but if the API is returning quickly and you don't have high load, users might still be able to call the APIs frequently, increasing costs.

Debouncing is an easy answer to this. Debouncing controls how often a single function from a user can run in a set time period:

ts
inngest.createFunction(
{
id: "summarize-user-generated-content",
debounce: {
period: "1m",
key: "event.data.user_id",
},
},
{ event: "ai/summary.requested" },
async ({ event, step }) => {
/* call OpenAI */
}
)

Here, each user can only trigger this function to run after the debounce period, one minute, has passed after receiving the last event. For example, for jobs that summarize long user-generated content, like a blog post, you can use debounce to delay the function call until the user has stopped editing for a period of time. This can help wasted calls if the user saves their post then immediately goes back to fix a typo and saves again.

This should be adjusted for realistic use on your product, but finding the right balance between good user experience for the good users and bad user experience for the bad users will be essential.

It's not always bad actors causing problems. As we said above, the AI fail whale often rears its head. If your product requires a single API call to complete, this isn't too much of a problem, cost-wise. But if you are LLM chaining or creating agents or RAG flows with multiple API calls, retrying this multi-step process can result in high costs as everything must be called again.

We can fix that using chaining with durable execution. Every individual API call can become a separate run in a multi-step process. Using multi-step functions, you can:

  • Enhance the reliability of your code by retrying executable blocks.
  • Pause execution until an event matches specific rules.
  • Hit pause on execution for a specified duration or until a certain time.

Here's a single function that chains together OpenAI, Hugging Face, and Anthropic:

ts
export const userWorkflow = inngest.createFunction(
fnOptions,
fnListener,
async ({ event, step }) => {
const similar = await step.run("query-vectordb", async () => {
// Query a vectorDB for similar results given input
const embedding = createEmbedding(event.data.input)
return await index.query({ vector: embedding, topK: 3 }).matches
})
const response = await step.run("generate-llm-response", async () => {
// Inject our prompt given similar search results and event.data.input
const prompt = createAgentPrompt(similar, event.data.input)
return await llm.createCompletion({
model: "gpt-3.5-turbo",
prompt,
})
})
const entities = await step.run("extract-entities-hf", async () => {
// Extract entities from the generated response using Hugging Face's named entity recognition model
let pipe = await pipeline(
"entity-extraction",
"Xenova/bert-base-multilingual-uncased-sentiment"
)
return await pipe(response)
})
const summary = await step.run("generate-summary-anthropic", async () => {
// Generate a summary document using the extracted entities and the Anthropic API
const anthropic = new Anthropic()
const anthropicPrompt = `The following entities were mentioned in the response: ${entities.join(
", "
)}. Please generate a summary document based on these entities and the original response:\n\nResponse: ${response}`
return await anthropic.messages.create({
model: "claude-3-opus-20240229",
max_tokens: 1024,
messages: [{ role: "user", content: anthropicPrompt }],
})
})
await step.run("save-to-db", async () => {
// Save the generated response, extracted entities, and summary to the database
await db.summaries.create({
requestID: event.data.requestID,
response,
entities,
summary,
})
})
}
)

If the OpenAI and Hugging Face calls work, but the Anthropic call doesn't, this flow doesn't require you to re-run the first two API calls, thus incurring extra costs. Durable execution means state and retries (as well as logging and observability so you know what went wrong) will be handled for you.

Prioritize paid users using priority keys

In all of the above examples, everyone is treated equally.

In the future, our AI overlords will be given priority, but until then paid users go to the head of the queue. As you build out AI services, you need a way for paid users to access your service ahead of others, especially with limited capacity.

Function run priority allows you to do this. With function run priority, you set a priority key that can be used to prioritize specific user's jobs. Here, any user with an 'enterprise' account type will have a higher priority, and their job will be executed ahead of any other job that has been enqueued in the last two minutes:

ts
export default inngest.createFunction(
{
id: "unique-function-id",
priority: {
run: "event.data.account_type == 'enterprise' ? 120 : 0",
},
},
{ event: "ai/summary.requested" },
async ({ event, step }) => {
/** call LLM */
}
)

If not, they will be given a priority of 0, so will just join the end of the queue.

This prioritization is critical for AI services that are under heavy load. If you have a lot of users and have to manage access, prioritization ensures the users paying for your service are never at the back of that queue for access.

If you have been using v0.dev, you have been using Inngest function run priority in the wild:

And if you have a paid account with v0.dev, you should have seen the benefits.

Control Capacity for a Better User Experience

v0.dev is an excellent example of what we are talking about here — a successful product that relies heavily on AI and has to control capacity. Without using methods of flow control, like throttling, concurrency, debouncing, chaining, and prioritization, there would only be two options:

  1. Vercel developers would have to build these functionalities from scratch, which would take time away from the core product and slow product development.
  2. The user experience would be worse as the product would stall from hitting rate limits and overuse, and paid users wouldn't be able to generate UIs quickly, making it less attractive for those users and, thus less attractive for Vercel to build.

With Inngest, services built on top of AI APIs can manage their capacity without having to think about managing rates, state, or retries. This cuts costs while allowing developers to build better products for users.

If you are building with AI and need this functionality, sign up for Inngest for free, read more about AI use cases, or reach out to chat with us about how Inngest can help you build better AI-powered products.

Help shape the future of Inngest

Ask questions, give feedback, and share feature requests

Join our Discord!

Ready to start building?

Ship background functions & workflows like never before

$ npx inngest-cli devGet started for free