StepflowStepflow

Stream Trigger

Generic trigger for any async iterable

Stream Trigger

The Stream trigger is a generic trigger that can consume items from any AsyncIterable. This is useful for creating custom triggers or for integrating with systems that provide async stream interfaces.

Installation

pnpm add @stepflowjs/trigger-stream

Usage

import { StreamTrigger } from "@stepflowjs/trigger-stream";

// Example async generator
async function* numberStream() {
  for (let i = 0; i < 10; i++) {
    await new Promise((resolve) => setTimeout(resolve, 1000));
    yield i;
  }
}

const trigger = new StreamTrigger({
  source: "number-stream",
  stream: numberStream(),
  transform: (n) => ({ value: n, squared: n * n }),
});

await trigger.start(async (event) => {
  console.log("Received stream item:", event.data);
  await stepflow.trigger("process-number", event.data);
});

Helper Function

You can also use the createStreamTrigger helper function:

import { createStreamTrigger } from "@stepflowjs/trigger-stream";

const trigger = createStreamTrigger("events", eventStream, (event) => ({
  ...event,
  processed: true,
}));

Configuration

OptionTypeDescription
sourcestringIdentifier for the stream source
streamAsyncIterable<T>The async iterable to consume
transform(item: T) => unknownOptional transform function to apply to each item

On this page