Skip to content

ストリームの消費

ストリームを扱う際には、それが生成するデータをどのように消費するかを理解することが重要です。このガイドでは、ストリームを消費するための一般的な方法をいくつか紹介します。

runCollectの使用

ストリームからすべての要素を単一のChunkに集めるには、Stream.runCollect関数を使用します。

import { Stream, Effect } from "effect"
const stream = Stream.make(1, 2, 3, 4, 5)
const collectedData = Stream.runCollect(stream)
Effect.runPromise(collectedData).then(console.log)
/*
出力:
{
_id: "Chunk",
values: [ 1, 2, 3, 4, 5 ]
}
*/

runForEachの使用

ストリームの要素を消費する別の方法は、Stream.runForEachを使用することです。この関数は、ストリームの各要素を受け取るコールバック関数を引数に取ります。以下はその例です。

import { Stream, Effect, Console } from "effect"
const effect = Stream.make(1, 2, 3).pipe(
Stream.runForEach((n) => Console.log(n))
)
Effect.runPromise(effect).then(console.log)
/*
出力:
1
2
3
undefined
*/

この例では、Stream.runForEachを使って各要素をコンソールにログ出力しています。

フォールド操作の使用

Stream.fold関数は、ストリームから生成された値に対してフォールド操作を行い、その結果を含むエフェクトを返すことでストリームを消費する別の方法です。以下にいくつかの例を示します。

import { Stream, Effect } from "effect"
const e1 = Stream.make(1, 2, 3, 4, 5).pipe(Stream.runFold(0, (a, b) => a + b))
Effect.runPromise(e1).then(console.log) // 出力: 15
const e2 = Stream.make(1, 2, 3, 4, 5).pipe(
Stream.runFoldWhile(
0,
(n) => n <= 3,
(a, b) => a + b
)
)
Effect.runPromise(e2).then(console.log) // 出力: 6

最初の例(e1)では、Stream.runFoldを使用してすべての要素の合計を計算しています。二つ目の例(e2)では、Stream.runFoldWhileを使って特定の条件が満たされるまで合計を計算しています。

Sinkの使用

Sinkを使用してストリームを消費するには、SinkStream.run関数に渡します。以下にその例を示します。

import { Stream, Sink, Effect } from "effect"
const effect = Stream.make(1, 2, 3).pipe(Stream.run(Sink.sum))
Effect.runPromise(effect).then(console.log) // 出力: 6

この例では、Sinkを使用してストリーム内の要素の合計を計算しています。