β€’

tech

Streaming AI with Effect

With AI, streaming is not optional anymore. But streams are complex, aren't they? Well, not really with Effect. This is how I use Effect to make AI requests with streams, parse the result, and wire all together server to client.


Sandro Maglione

Sandro Maglione

Software

Streams πŸ‘»

For a while a "stream" sounded complex to me: keep the connection, multiple values over time, figure out errors and how to put all together.

But then effect, Stream, and it all came back to something simple πŸ’πŸΌβ€β™‚οΈ

Here is an example: streaming AI responses with @effect/ai and Stream πŸ‘‡


AI API? Stream it

If yesterday streams were more "niche", today they are required. Thanks AI.

But can you avoid streams, and just wait for AI and send the full response? πŸ€”

That's what I did at the beginning (MVP). It worked for a (really short) while, before the 10+ seconds of waiting time crushed the experience of the app 🫠

Needless to say, I converted all the requests to streams. And once again, welcome to effect πŸ‘‡

Effect AI package

@effect/ai solves your AI problems (and streaming as well).

With @effect/ai you compose AI client and AI model, all using Effect's dependency injection πŸ—οΈ

Once again, as easy as it gets:

import { GoogleClient, GoogleLanguageModel } from "@effect/ai-google";

// πŸ‘‡ AI client (provider)
const Gemini = GoogleClient.layerConfig({
  apiKey: Config.redacted("GEMINI_API_KEY"),
}).pipe(Layer.provide(FetchHttpClient.layer));

// πŸ‘‡ AI model (composed with AI client)
const Gemini25FlashLite = GoogleLanguageModel.model("gemini-2.5-flash").pipe(
  Layer.provide(Gemini)
);

All you need then is a LanguageModel and you are connected with AI πŸ€–

export class Ai extends Effect.Service<Ai>()("Ai", {
  dependencies: [Gemini25FlashLite],
  effect: Effect.gen(function* () {
    const model = yield* LanguageModel.LanguageModel;
    
    // model.streamText is next ⚑️
  }),
}

This is all:

  1. Define a client and model
  2. Provide api key using Config
  3. Extract a LanguageModel and make requests

Streaming AI with effect

AI setup done, streaming requests just as simple: streamText.

const model = yield* LanguageModel.LanguageModel;

return model.streamText({
  prompt: Prompt.make([
    {
      role: "system",
      content: "You are a language learning tutor, correct the sentence",
    },
    {
      role: "user",
      content: "η°‘ε˜γ§γ—γŸγ­",
    },
  ]),
});

streamText returns a Stream, all into effect territory here (don't bother with AI API details and such).

This is all, really. Now we have a Stream, let's do something with it πŸ‘‡

NDJSON AI streams

In my app I want to stream responses one by one to the client as they come. A full JSON won't work, instead the answer is NDJSON format (Newline Delimited JSON).

Guess what? Effect solved NDJSON as well πŸ™Œ

We have a Stream, we want to extract schemas from it, and stream them one by one to the client as soon as they are ready.

A single module and problem solved: Ndjson.

A Channel is applied to a Stream to convert its content from some input to some output (with error handling included) πŸͺ„

// πŸ‘‡ Define a `Channel` from `Ndjson` to `inputSchema`
const ndjson = Ndjson.unpackSchemaString(inputSchema)<AiError.AiError>();

// πŸ‘‡ `Stream.Stream<Uint8Array<ArrayBufferLike>, ParseResult.ParseError>`
const responseStream = model.streamText(/* ... */).pipe(
  Stream.filter((part) => part.type === "text-delta"),
  Stream.map((part) => part.delta),
  Stream.changes,
  Stream.pipeThroughChannelOrFail(ndjson),
  Stream.zipWithIndex,
  Stream.mapEffect(Schema.encode(Schema.parseJson(outputSchema))),
  Stream.intersperse("\n"),
  Stream.encodeText
);

return HttpServerResponse.stream(responseStream);

Aside from the details, notice how the Stream module allows to pipe a series of operations on the content of the stream.

All readable, type safe, errors and schemas included πŸš€


It took me some time to get to this final setup, but now AI is solved.

All I needed was found in effect (once again). And it's only getting better (looking at you v4 πŸ”œ).

See you next πŸ‘‹

Start here.

Every week I dive headfirst into a topic, uncovering every hidden nook and shadow, to deliver you the most interesting insights

Not convinced? Well, let me tell you more about it