Skip to content

ストリームの作成

このセクションでは、Effect のStreamを作成するさまざまな方法を探ります。これらの方法により、ニーズに合わせたストリームを生成できます。

一般的なコンストラクタ

make

Stream.makeコンストラクタを使用して、純粋なストリームを作成できます。このコンストラクタは、変数のリストを引数として受け取ります。

import { Stream, Effect } from "effect";
const stream = Stream.make(1, 2, 3);
Effect.runPromise(Stream.runCollect(stream)).then(console.log);
// { _id: 'Chunk', values: [ 1, 2, 3 ] }

empty

時には、何の値も生成しないストリームが必要となる場合があります。そのような場合には、Stream.emptyを使用します。このコンストラクタは、空のままのストリームを作成します。

import { Stream, Effect } from "effect";
const stream = Stream.empty;
Effect.runPromise(Stream.runCollect(stream)).then(console.log);
// { _id: 'Chunk', values: [] }

void

単一のvoid値を含むストリームが必要な場合、Stream.voidを使用できます。このコンストラクタは、単一のイベントやシグナルを表すストリームを作成するのに便利です。

import { Stream, Effect } from "effect";
const stream = Stream.void;
Effect.runPromise(Stream.runCollect(stream)).then(console.log);
// { _id: 'Chunk', values: [ undefined ] }

range

指定した範囲[min, max]内の整数ストリームを作成するには、Stream.rangeを使用します(両端点、つまりminおよびmaxを含む)。これは、連続した数字のストリームを生成するのに特に便利です。

import { Stream, Effect } from "effect";
// 1から5までの数値のストリームを作成
const stream = Stream.range(1, 5);
Effect.runPromise(Stream.runCollect(stream)).then(console.log);
// { _id: 'Chunk', values: [ 1, 2, 3, 4, 5 ] }

iterate

Stream.iterateを使用すると、初期値に関数を適用して反復的にストリームを生成できます。初期値はストリームによって生成される最初の要素となり、その後の値はf(init), f(f(init))のように生成されます。

import { Stream, Effect } from "effect";
// 増加する数字のストリームを作成
const stream = Stream.iterate(1, (n) => n + 1); // 1, 2, 3, ...
Effect.runPromise(Stream.runCollect(stream.pipe(Stream.take(5)))).then(
console.log
);
// { _id: 'Chunk', values: [ 1, 2, 3, 4, 5 ] }

scoped

Stream.scopedは、スコープ内のリソースから単一の値を持つストリームを作成するために使用されます。この機能は、明示的な取得、使用、および解放を必要とするリソースを扱う際に便利です。

import { Stream, Effect, Console } from "effect";
// スコープ内のリソースから単一の値を持つストリームを作成
const stream = Stream.scoped(
Effect.acquireUseRelease(
Console.log("acquire"),
() => Console.log("use"),
() => Console.log("release")
)
);
Effect.runPromise(Stream.runCollect(stream)).then(console.log);
/*
出力:
acquire
use
release
{ _id: 'Chunk', values: [ undefined ] }
*/

成功と失敗から

Effectデータ型と同様に、failおよびsucceed関数を使用してStreamを生成できます:

import { Stream, Effect } from "effect";
// エラーを発生させることができるストリームを作成
const streamWithError: Stream.Stream<never, string> = Stream.fail("Uh oh!");
Effect.runPromise(Stream.runCollect(streamWithError));
// throws Error: Uh oh!
// 数値を発生させるストリームを作成
const streamWithNumber: Stream.Stream<number> = Stream.succeed(5);
Effect.runPromise(Stream.runCollect(streamWithNumber)).then(console.log);
// { _id: 'Chunk', values: [ 5 ] }

チャンクから

Chunkからストリームを構築するには、以下のようにします:

import { Stream, Chunk, Effect } from "effect";
// 単一のチャンクから値を持つストリームを作成
const stream = Stream.fromChunk(Chunk.make(1, 2, 3));
Effect.runPromise(Stream.runCollect(stream)).then(console.log);
// { _id: 'Chunk', values: [ 1, 2, 3 ] }

さらに、複数のChunkからストリームを作成することもできます:

import { Stream, Chunk, Effect } from "effect";
// 複数のチャンクから値を持つストリームを作成
const stream = Stream.fromChunks(Chunk.make(1, 2, 3), Chunk.make(4, 5, 6));
Effect.runPromise(Stream.runCollect(stream)).then(console.log);
// { _id: 'Chunk', values: [ 1, 2, 3, 4, 5, 6 ] }

Effect から

Stream.fromEffectコンストラクタを使用して Effect ワークフローからストリームを生成できます。例えば、以下のストリームは単一のランダムな数値を生成します:

import { Stream, Random, Effect } from "effect";
const stream = Stream.fromEffect(Random.nextInt);
Effect.runPromise(Stream.runCollect(stream)).then(console.log);
// 出力例: { _id: 'Chunk', values: [ 1042302242 ] }

この方法を使用すると、Effect の出力をストリームにシームレスに変換でき、ストリーム内の非同期操作を簡単に扱えるようになります。

非同期コールバックから

コールバックに依存した非同期関数があると仮定しましょう。それらのコールバックによって発生する結果をストリームとしてキャプチャしたい場合、Stream.async関数を使用できます。この関数は、コールバックを複数回呼び出し、その結果をストリームとして発行するように設計されています。

以下の例での使用方法を見てみましょう:

import { Stream, Effect, Chunk, Option, StreamEmit } from "effect";
const events = [1, 2, 3, 4];
const stream = Stream.async(
(emit: StreamEmit.Emit<never, never, number, void>) => {
events.forEach((n) => {
setTimeout(() => {
if (n === 3) {
emit(Effect.fail(Option.none())); // ストリームを終了
} else {
emit(Effect.succeed(Chunk.of(n))); // 現在のアイテムをストリームに追加
}
}, 100 * n);
});
}
);
Effect.runPromise(Stream.runCollect(stream)).then(console.log);
// { _id: 'Chunk', values: [ 1, 2 ] }

StreamEmit.Emit<R, E, A, void>型は、複数回呼び出すことができる非同期コールバックを表します。このコールバックは、Effect<Chunk<A>, Option<E>, R>型の値を取ります。この型が述べる各結果の意味は次のとおりです:

  • コールバックに渡された値が成功した場合にChunk<A>を生成すると、それらの要素がストリームの一部として発行されることを示します。

  • コールバックに渡された値がSome<E>で失敗すると、指定されたエラーでストリームが終了することを示します。

  • コールバックに渡された値がNoneで失敗すると、ストリームの終了を示すシグナルとして機能し、ストリームが終了します。

簡単に言えば、この型を使用すると、非同期コールバックがストリームとどのように相互作用し、要素を発行するタイミング、エラーで終了するタイミング、ストリームの終了をシグナルするタイミングを指定できます。

反復可能なオブジェクトから

fromIterable

Stream.fromIterableコンストラクタを使用すると、値のIterableから純粋なストリームを作成できます。これは、値のコレクションをストリームに変換する簡単な方法です。

import { Stream, Effect } from "effect";
const numbers = [1, 2, 3];
const stream = Stream.fromIterable(numbers);
Effect.runPromise(Stream.runCollect(stream)).then(console.log);
// { _id: 'Chunk', values: [ 1, 2, 3 ] }

fromIterableEffect

Iterable型の値を生成するエフェクトがある場合、Stream.fromIterableEffectコンストラクタを使用して、そのエフェクトからストリームを生成できます。

例えば、データベース操作がユーザーのリストを取得すると仮定しましょう。この操作はエフェクトを伴うので、Stream.fromIterableEffectを使用して結果をStreamに変換できます:

import { Stream, Effect, Context } from "effect";
class Database extends Context.Tag("Database")<
Database,
{ readonly getUsers: Effect.Effect<Array<string>> }
>() {}
const getUsers = Database.pipe(Effect.andThen((_) => _.getUsers));
const stream = Stream.fromIterableEffect(getUsers);
Effect.runPromise(
Stream.runCollect(
stream.pipe(
Stream.provideService(Database, {
getUsers: Effect.succeed(["user1", "user2"]),
})
)
)
).then(console.log);
// { _id: 'Chunk', values: [ 'user1', 'user2' ] }

これにより、エフェクトをシームレスに扱い、その結果をストリームに変換してさらなる処理を行うことができます。

fromAsyncIterable

非同期イテラブルもストリームに変換できるデータソースの一つです。Stream.fromAsyncIterableコンストラクタを使用することで、非同期データソースを扱い、エラーも適切に処理できます。

import { Stream, Effect } from "effect";
const myAsyncIterable = async function* () {
yield 1;
yield 2;
};
const stream = Stream.fromAsyncIterable(
myAsyncIterable(),
(e) => new Error(String(e)) // エラーハンドリング
);
Effect.runPromise(Stream.runCollect(stream)).then(console.log);
// { _id: 'Chunk', values: [ 1, 2 ] }

このコードでは、非同期イテラブルを定義し、それからstreamという名前のストリームを作成しています。また、変換過程で発生する可能性のあるエラーを処理するためのエラーハンドラ関数も提供しています。

繰り返しから

単一の値の繰り返し

Stream.repeatValueコンストラクタを使用して、特定の値を無限に繰り返すストリームを作成できます:

import { Stream, Effect } from "effect";
const stream = Stream.repeatValue(0);
Effect.runPromise(Stream.runCollect(stream.pipe(Stream.take(5)))).then(
console.log
);
// { _id: 'Chunk', values: [ 0, 0, 0, 0, 0 ] }

ストリームの内容の繰り返し

Stream.repeatを使用すると、特定のストリームの内容をスケジュールに従って繰り返すストリームを作成できます。これは、定期的なイベントや値を生成するのに便利です。

import { Stream, Effect, Schedule } from "effect";
// 無限に値を繰り返すストリームを作成
const stream = Stream.repeat(Stream.succeed(1), Schedule.forever);
Effect.runPromise(Stream.runCollect(stream.pipe(Stream.take(5)))).then(
console.log
);
// { _id: 'Chunk', values: [ 1, 1, 1, 1, 1 ] }

エフェクトの結果の繰り返し

エフェクトによる API コールがあり、その結果を使用してストリームを作成したい場合、エフェクトからストリームを作成し、それを無限に繰り返すことで実現できます。

以下は、ランダムな数値のストリームを生成する例です:

import { Stream, Effect, Random } from "effect";
const stream = Stream.repeatEffect(Random.nextInt);
Effect.runPromise(Stream.runCollect(stream.pipe(Stream.take(5)))).then(
console.log
);
/*
出力例:
{
_id: 'Chunk',
values: [ 1666935266, 604851965, 2194299958, 3393707011, 4090317618 ]
}
*/

終了条件付きでのエフェクトの繰り返し

指定された条件に基づいてストリームを終了する方法で、エフェクトを評価し続けることができます。

この例では、Iteratorからストリームを作成しています:

import { Stream, Effect, Option } from "effect";
const drainIterator = <A>(it: Iterator<A>): Stream.Stream<A> =>
Stream.repeatEffectOption(
Effect.sync(() => it.next()).pipe(
Effect.andThen((res) => {
if (res.done) {
return Effect.fail(Option.none());
}
return Effect.succeed(res.value);
})
)
);

タイムティックの生成

Stream.tickコンストラクタを使用して、指定された間隔でvoid値を発行するストリームを作成できます。これは、定期的なイベントを作成するのに便利です。

import { Stream, Effect } from "effect";
const stream = Stream.tick("100 millis");
Effect.runPromise(Stream.runCollect(stream.pipe(Stream.take(5)))).then(
console.log
);
/*
出力:
{
_id: 'Chunk',
values: [ undefined, undefined, undefined, undefined, undefined ]
}
*/

展開/ページングから

関数型プログラミングにおけるunfoldの概念は、foldの対のように考えることができます。

foldでは、データ構造を処理して戻り値を生成します。例えば、Array<number>を取り、その要素の合計を計算することができます。

一方、unfoldは、初期値から始まり、指定された状態関数を使用して 1 つずつ要素を追加しながら再帰的データ構造を生成する操作を示します。例えば、1から始まる自然数の系列を作成し、increment関数を状態関数として使用できます。

unfold

unfold

Stream モジュールには、次のように定義されたunfold関数があります:

declare const unfold: <S, A>(
initialState: S,
step: (s: S) => Option.Option<readonly [A, S]>
) => Stream<A>;

使い方は以下の通りです:

  • initialState。これは初期状態の値です。
  • step。状態関数stepは、現在の状態sを入力として取ります。この関数の結果がNoneの場合、ストリームは終了します。もしSome<[A, S]>であれば、次のストリームの要素はAであり、次のステッププロセスのために状態Sが更新されます。

例えば、Stream.unfoldを使って自然数のストリームを作成してみましょう:

import { Stream, Effect, Option } from "effect";
const stream = Stream.unfold(1, (n) => Option.some([n, n + 1]));
Effect.runPromise(Stream.runCollect(stream.pipe(Stream.take(5)))).then(
console.log
);
// { _id: 'Chunk', values: [ 1, 2, 3, 4, 5 ] }

unfoldEffect

場合によっては、展開操作中にエフェクトフルな状態変換が必要になることがあります。この場合、Stream.unfoldEffectが役立ちます。これを使用すると、ストリームを生成する際にエフェクトを扱えます。

次に、Stream.unfoldEffectを使用して無限のランダム1-1値のストリームを作成する例を示します:

import { Stream, Effect, Option, Random } from "effect";
const stream = Stream.unfoldEffect(1, (n) =>
Random.nextBoolean.pipe(
Effect.map((b) => (b ? Option.some([n, -n]) : Option.some([n, n])))
)
);
Effect.runPromise(Stream.runCollect(stream.pipe(Stream.take(5)))).then(
console.log
);
// 出力例: { _id: 'Chunk', values: [ 1, 1, 1, 1, -1 ] }

その他のバリアント

Chunkデータ型に対応するStream.unfoldChunkStream.unfoldChunkEffectなどの類似操作があります。

ページング

paginate

Stream.paginateStream.unfoldに似ていますが、値を 1 ステップ先に発行することができます。

例えば、次のストリームは0, 1, 2, 3という要素を発行します:

import { Stream, Effect, Option } from "effect";
const stream = Stream.paginate(0, (n) => [
n,
n < 3 ? Option.some(n + 1) : Option.none(),
]);
Effect.runPromise(Stream.runCollect(stream)).then(console.log);
// { _id: 'Chunk', values: [ 0, 1, 2, 3 ] }

使い方は以下の通りです:

  • 初期値として0から始まります。
  • 提供された関数は現在の値nを取り、タプルを返します。タプルの最初の要素は発行する値(n)であり、2 番目の要素は続けるか(Option.some(n + 1))停止するか(Option.none())を決定します。

その他のバリアント

Chunkデータ型に対応するStream.paginateChunkStream.paginateChunkEffectの類似操作もあります。

展開とページングの違い

unfoldpaginateのコンビネータの違いは何か、また、どちらを使用すべきかについて考えるかもしれません。次の例を見てみましょう。

例えば、ページネーションされた API があり、かなりの量のデータがページネーション方式で提供されるとします。この API にリクエストを送ると、現在のページの結果と最終ページであるか、次のページにデータがあるかどうかを示すフラグを含むResultPageオブジェクトが返されます。以下は、私たちの API の簡略化した表現です:

import { Chunk, Effect } from "effect";
export type RawData = string;
export class PageResult {
constructor(
readonly results: Chunk.Chunk<RawData>,
readonly isLast: boolean
) {}
}
const pageSize = 2;
export const listPaginated = (
pageNumber: number
): Effect.Effect<PageResult, Error> => {
return Effect.succeed(
new PageResult(
Chunk.map(
Chunk.range(1, pageSize),
(index) => `Result ${pageNumber}-${index}`
),
pageNumber === 2 // 3ページ返す
)
);
};
// @include: domain

ゴールは、このページネーション API をRowDataイベントのストリームに変換することです。初期の試みとして、Stream.unfold操作を使用しようとするかもしれません:

domain.ts
// @include: domain
// @filename: firstAttempt.ts
// ---cut---
import { Effect, Stream, Option } from "effect";
import { RawData, listPaginated } from "./domain";
const firstAttempt: Stream.Stream<RawData, Error> = Stream.unfoldChunkEffect(
0,
(pageNumber) =>
listPaginated(pageNumber).pipe(
Effect.map((page) => {
if (page.isLast) {
return Option.none();
}
return Option.some([page.results, pageNumber + 1] as const);
})
)
);
Effect.runPromise(Stream.runCollect(firstAttempt)).then(console.log);
/*
出力:
{
_id: "Chunk",
values: [ "Result 0-1", "Result 0-2", "Result 1-1", "Result 1-2" ]
}
*/

ただし、このアプローチには欠点があります。最後のページの結果が含まれません。それを補うために、結果を取得するために追加の API コールを実行します。

domain.ts
// @include: domain
// @filename: firstAttempt.ts
// ---cut---
import { Effect, Stream, Option } from "effect";
import { RawData, listPaginated } from "./domain";
const secondAttempt: Stream.Stream<RawData, Error> = Stream.unfoldChunkEffect(
Option.some(0),
(pageNumber) =>
Option.match(pageNumber, {
// 最後のページに到達している
onNone: () => Effect.succeed(Option.none()),
// まだ最後のページに到達していない
onSome: (pageNumber) =>
listPaginated(pageNumber).pipe(
Effect.map((page) =>
Option.some([
page.results,
page.isLast ? Option.none() : Option.some(pageNumber + 1),
])
)
),
})
);
Effect.runPromise(Stream.runCollect(secondAttempt)).then(console.log);
/*
出力:
{
_id: 'Chunk',
values: [
'Result 0-1',
'Result 0-2',
'Result 1-1',
'Result 1-2',
'Result 2-1',
'Result 2-2'
]
}
*/

このアプローチは機能しますが、Stream.unfoldはページネーション API からデータを取得する場合に最も友好的な選択肢とは言えません。最後のページの結果を含めるために追加の手間が必要です。

ここでStream.paginateが役立ちます。この API は、ページネーションされた API を Effect ストリームに変換するよりエルゴノミックな方法を提供します。Stream.paginateを使用して解決策を書き換えてみましょう:

domain.ts
// @include: domain
// @filename: finalAttempt.ts
// ---cut---
import { Effect, Stream, Option } from "effect";
import { RawData, listPaginated } from "./domain";
const finalAttempt: Stream.Stream<RawData, Error> = Stream.paginateChunkEffect(
0,
(pageNumber) =>
listPaginated(pageNumber).pipe(
Effect.andThen((page) => {
return [
page.results,
page.isLast ? Option.none<number>() : Option.some(pageNumber + 1),
];
})
)
);
Effect.runPromise(Stream.runCollect(finalAttempt)).then(console.log);
/*
出力:
{
_id: 'Chunk',
values: [
'Result 0-1',
'Result 0-2',
'Result 1-1',
'Result 1-2',
'Result 2-1',
'Result 2-2'
]
}
*/

キューとパブサブから

Effect には、必要な 2 つの重要な非同期メッセージングデータ型であるQueuePubSubがあります。これらのデータ型は、それぞれStream.fromQueueStream.fromPubSubを使用することで簡単にStreamに変換できます。

スケジュールから

スケジュールからストリームを作成できます。このストリームは、スケジュールから出力された各値に対して要素を発行し、スケジュールが続く限り発行を続けます。

import { Effect, Stream, Schedule } from "effect";
// 1秒ごとに合計10回発行される値
const schedule = Schedule.spaced("1 second").pipe(
Schedule.compose(Schedule.recurs(10))
);
const stream = Stream.fromSchedule(schedule);
Effect.runPromise(Stream.runCollect(stream)).then(console.log);
/*
出力:
{
_id: 'Chunk',
values: [
0, 1, 2, 3, 4,
5, 6, 7, 8, 9
]
}
*/