โ€ข

tech

Streaming AI with Effect

With AI, streaming is not optional anymore. But streams are complex, aren't they? Well, not really with Effect. This is how I use Effect to make AI requests with streams, parse the result, and wire all together server to client.


Sandro Maglione

Sandro Maglione

Software

Streams ๐Ÿ‘ป

For a while a "stream" sounded complex to me: keep the connection, multiple values over time, figure out errors and how to put all together.

But then effect, Stream, and it all came back to something simple ๐Ÿ’๐Ÿผโ€โ™‚๏ธ

Here is an example: streaming AI responses with @effect/ai and Stream ๐Ÿ‘‡


AI API? Stream it

If yesterday streams were more "niche", today they are required. Thanks AI.

But can you avoid streams, and just wait for AI and send the full response? ๐Ÿค”

That's what I did at the beginning (MVP). It worked for a (really short) while, before the 10+ seconds of waiting time crushed the experience of the app ๐Ÿซ 

Needless to say, I converted all the requests to streams. And once again, welcome to effect ๐Ÿ‘‡

Effect AI package

@effect/ai solves your AI problems (and streaming as well).

With @effect/ai you compose AI client and AI model, all using Effect's dependency injection ๐Ÿ—๏ธ

Once again, as easy as it gets:

import { GoogleClient, GoogleLanguageModel } from "@effect/ai-google";

// ๐Ÿ‘‡ AI client (provider)
const Gemini = GoogleClient.layerConfig({
  apiKey: Config.redacted("GEMINI_API_KEY"),
}).pipe(Layer.provide(FetchHttpClient.layer));

// ๐Ÿ‘‡ AI model (composed with AI client)
const Gemini25FlashLite = GoogleLanguageModel.model("gemini-2.5-flash").pipe(
  Layer.provide(Gemini)
);

All you need then is a LanguageModel and you are connected with AI ๐Ÿค–

export class Ai extends Effect.Service<Ai>()("Ai", {
  dependencies: [Gemini25FlashLite],
  effect: Effect.gen(function* () {
    const model = yield* LanguageModel.LanguageModel;
    
    // model.streamText is next โšก๏ธ
  }),
}

This is all:

  1. Define a client and model
  2. Provide api key using Config
  3. Extract a LanguageModel and make requests

Streaming AI with effect

AI setup done, streaming requests just as simple: streamText.

const model = yield* LanguageModel.LanguageModel;

return model.streamText({
  prompt: Prompt.make([
    {
      role: "system",
      content: "You are a language learning tutor, correct the sentence",
    },
    {
      role: "user",
      content: "็ฐกๅ˜ใงใ—ใŸใญ",
    },
  ]),
});

streamText returns a Stream, all into effect territory here (don't bother with AI API details and such).

This is all, really. Now we have a Stream, let's do something with it ๐Ÿ‘‡

NDJSON AI streams

In my app I want to stream responses one by one to the client as they come. A full JSON won't work, instead the answer is NDJSON format (Newline Delimited JSON).

Guess what? Effect solved NDJSON as well ๐Ÿ™Œ

We have a Stream, we want to extract schemas from it, and stream them one by one to the client as soon as they are ready.

A single module and problem solved: Ndjson.

A Channel is applied to a Stream to convert its content from some input to some output (with error handling included) ๐Ÿช„

// ๐Ÿ‘‡ Define a `Channel` from `Ndjson` to `inputSchema`
const ndjson = Ndjson.unpackSchemaString(inputSchema)<AiError.AiError>();

// ๐Ÿ‘‡ `Stream.Stream<Uint8Array<ArrayBufferLike>, ParseResult.ParseError>`
const responseStream = model.streamText(/* ... */).pipe(
  Stream.filter((part) => part.type === "text-delta"),
  Stream.map((part) => part.delta),
  Stream.changes,
  Stream.pipeThroughChannelOrFail(ndjson),
  Stream.zipWithIndex,
  Stream.mapEffect(Schema.encode(Schema.parseJson(outputSchema))),
  Stream.intersperse("\n"),
  Stream.encodeText
);

return HttpServerResponse.stream(responseStream);

Aside from the details, notice how the Stream module allows to pipe a series of operations on the content of the stream.

All readable, type safe, errors and schemas included ๐Ÿš€


It took me some time to get to this final setup, but now AI is solved.

All I needed was found in effect (once again). And it's only getting better (looking at you v4 ๐Ÿ”œ).

See you next ๐Ÿ‘‹

Start here.

Every week I dive headfirst into a topic, uncovering every hidden nook and shadow, to deliver you the most interesting insights

Not convinced? Well, let me tell you more about it