Streams π»
For a while a "stream" sounded complex to me: keep the connection, multiple values over time, figure out errors and how to put all together.
But then
effect,Stream, and it all came back to something simple ππΌββοΈ
Here is an example: streaming AI responses with @effect/ai and Stream π
AI API? Stream it
If yesterday streams were more "niche", today they are required. Thanks AI.
But can you avoid streams, and just wait for AI and send the full response? π€
That's what I did at the beginning (MVP). It worked for a (really short) while, before the 10+ seconds of waiting time crushed the experience of the app π«
Needless to say, I converted all the requests to streams. And once again, welcome to effect π
Effect AI package
@effect/ai solves your AI problems (and streaming as well).
With
@effect/aiyou compose AI client and AI model, all using Effect's dependency injection ποΈ
Once again, as easy as it gets:
import { GoogleClient, GoogleLanguageModel } from "@effect/ai-google";
// π AI client (provider)
const Gemini = GoogleClient.layerConfig({
apiKey: Config.redacted("GEMINI_API_KEY"),
}).pipe(Layer.provide(FetchHttpClient.layer));
// π AI model (composed with AI client)
const Gemini25FlashLite = GoogleLanguageModel.model("gemini-2.5-flash").pipe(
Layer.provide(Gemini)
);All you need then is a LanguageModel and you are connected with AI π€
export class Ai extends Effect.Service<Ai>()("Ai", {
dependencies: [Gemini25FlashLite],
effect: Effect.gen(function* () {
const model = yield* LanguageModel.LanguageModel;
// model.streamText is next β‘οΈ
}),
}This is all:
- Define a client and model
- Provide api key using
Config - Extract a
LanguageModeland make requests
Streaming AI with effect
AI setup done, streaming requests just as simple: streamText.
const model = yield* LanguageModel.LanguageModel;
return model.streamText({
prompt: Prompt.make([
{
role: "system",
content: "You are a language learning tutor, correct the sentence",
},
{
role: "user",
content: "η°‘εγ§γγγ",
},
]),
});streamText returns a Stream, all into effect territory here (don't bother with AI API details and such).
This is all, really. Now we have a Stream, let's do something with it π
NDJSON AI streams
In my app I want to stream responses one by one to the client as they come. A full JSON won't work, instead the answer is NDJSON format (Newline Delimited JSON).
Guess what? Effect solved NDJSON as well π
We have a Stream, we want to extract schemas from it, and stream them one by one to the client as soon as they are ready.
A single module and problem solved: Ndjson.
A
Channelis applied to aStreamto convert its content from some input to some output (with error handling included) πͺ
// π Define a `Channel` from `Ndjson` to `inputSchema`
const ndjson = Ndjson.unpackSchemaString(inputSchema)<AiError.AiError>();
// π `Stream.Stream<Uint8Array<ArrayBufferLike>, ParseResult.ParseError>`
const responseStream = model.streamText(/* ... */).pipe(
Stream.filter((part) => part.type === "text-delta"),
Stream.map((part) => part.delta),
Stream.changes,
Stream.pipeThroughChannelOrFail(ndjson),
Stream.zipWithIndex,
Stream.mapEffect(Schema.encode(Schema.parseJson(outputSchema))),
Stream.intersperse("\n"),
Stream.encodeText
);
return HttpServerResponse.stream(responseStream);Aside from the details, notice how the Stream module allows to pipe a series of operations on the content of the stream.
All readable, type safe, errors and schemas included π
It took me some time to get to this final setup, but now AI is solved.
All I needed was found in effect (once again). And it's only getting better (looking at you v4 π).
See you next π
