Skip to content

Sinksの作成

ストリームの世界では、Sinks はストリームの要素を消費し処理するために使用されます。ここでは、特定のタスクのために Sinks を作成することを可能にする一般的な Sink コンストラクタをいくつか紹介します。

一般的なコンストラクタ

head Sink は、ストリームの最初の要素をキャプチャする Sink を作成します。ストリームが空の場合、Noneを返します。

import { Stream, Sink, Effect } from "effect";
const effect = Stream.make(1, 2, 3, 4).pipe(Stream.run(Sink.head()));
Effect.runPromise(effect).then(console.log);
/*
出力:
{
_id: "Option",
_tag: "Some",
value: 1
}
*/

last

last Sink は、ストリームのすべての要素を消費し、ストリームの最後の要素を返します。

import { Stream, Sink, Effect } from "effect";
const effect = Stream.make(1, 2, 3, 4).pipe(Stream.run(Sink.last()));
Effect.runPromise(effect).then(console.log);
/*
出力:
{
_id: "Option",
_tag: "Some",
value: 4
}
*/

count

count Sink は、ストリームのすべての要素を消費し、与えられた要素の数をカウントします。

import { Stream, Sink, Effect } from "effect";
const effect = Stream.make(1, 2, 3, 4).pipe(Stream.run(Sink.count));
Effect.runPromise(effect).then(console.log);
/*
出力:
4
*/

sum

sum Sink は、ストリームのすべての要素を消費し、受け取った数値の合計を計算します。

import { Stream, Sink, Effect } from "effect";
const effect = Stream.make(1, 2, 3, 4).pipe(Stream.run(Sink.sum));
Effect.runPromise(effect).then(console.log);
/*
出力:
10
*/

take

take Sink は、ストリームから指定された数の値を取得し、Chunkデータ型を返します。

import { Stream, Sink, Effect } from "effect";
const effect = Stream.make(1, 2, 3, 4).pipe(Stream.run(Sink.take(3)));
Effect.runPromise(effect).then(console.log);
/*
出力:
{
_id: "Chunk",
values: [ 1, 2, 3 ]
}
*/

drain

drain Sink は、その入力を無視し、実質的に破棄します。

import { Stream, Sink, Effect } from "effect";
const effect = Stream.make(1, 2, 3, 4).pipe(Stream.run(Sink.drain));
Effect.runPromise(effect).then(console.log);
/*
出力:
undefined
*/

timed

timed Sink は、ストリームを実行し、その実行時間を計測し、所要時間を提供します。

import { Stream, Schedule, Sink, Effect } from "effect";
const effect = Stream.make(1, 2, 3, 4).pipe(
Stream.schedule(Schedule.spaced("100 millis")),
Stream.run(Sink.timed)
);
Effect.runPromise(effect).then(console.log);
/*
出力:
{
_id: "Duration",
_tag: "Millis",
millis: 523
}
*/

forEach

forEach Sink は、与えられた効果のある関数を供給されたすべての要素に対して実行します。

import { Stream, Sink, Console, Effect } from "effect";
const effect = Stream.make(1, 2, 3, 4).pipe(
Stream.run(Sink.forEach(Console.log))
);
Effect.runPromise(effect).then(console.log);
/*
出力:
1
2
3
4
undefined
*/

成功と失敗からの生成

データストリームの領域では、データを保持して操作するストリームを作成するのと同様に、Sink.failおよびSink.succeed関数を使用して Sinks を作成することもできます。

成功する Sink

データストリームから要素を消費せず、数値の値で成功する Sink から始めましょう。

import { Stream, Sink, Effect } from "effect";
const effect = Stream.make(1, 2, 3, 4).pipe(Stream.run(Sink.succeed(0)));
Effect.runPromise(effect).then(console.log);
/*
出力:
0
*/

失敗する Sink

次に、データストリームからの要素を消費せず、意図的に失敗し、タイプstringのエラーメッセージを生成する Sink を考えます。

import { Stream, Sink, Effect } from "effect";
const effect = Stream.make(1, 2, 3, 4).pipe(Stream.run(Sink.fail("fail!")));
Effect.runPromiseExit(effect).then(console.log);
/*
出力:
{
_id: 'Exit',
_tag: 'Failure',
cause: { _id: 'Cause', _tag: 'Fail', failure: 'fail!' }
}
*/

収集

すべての要素を収集

データストリームからすべての要素をChunkに収集するには、Sink.collectAll()関数を使用できます。

import { Stream, Sink, Effect } from "effect";
const effect = Stream.make(1, 2, 3, 4).pipe(Stream.run(Sink.collectAll()));
Effect.runPromise(effect).then(console.log);
/*
出力:
{
_id: "Chunk",
values: [ 1, 2, 3, 4 ]
}
*/

HashSet に収集

要素をHashSetに集約したい場合は、Sink.collectAllToSet()を使用できます。この関数は、結果のセットに各要素が一度だけ表示されることを保証します。

import { Stream, Sink, Effect } from "effect";
const effect = Stream.make(1, 2, 2, 3, 4, 4).pipe(
Stream.run(Sink.collectAllToSet())
);
Effect.runPromise(effect).then(console.log);
/*
出力:
{
_id: "HashSet",
values: [ 1, 2, 3, 4 ]
}
*/

HashMap に収集

より高度な収集のニーズには、Sink.collectAllToMap()を使用できます。この関数は、指定されたマージ関数を使用して要素をHashMap<K, A>に集めてマージします。次の例では、マップのキーを(n) => n % 3で決定し、同じキーの要素を(a, b) => a + bでマージします。

import { Stream, Sink, Effect } from "effect";
const effect = Stream.make(1, 3, 2, 3, 1, 5, 1).pipe(
Stream.run(
Sink.collectAllToMap(
(n) => n % 3,
(a, b) => a + b
)
)
);
Effect.runPromise(effect).then(console.log);
/*
出力:
{
_id: "HashMap",
values: [
[ 0, 6 ], [ 1, 3 ], [ 2, 7 ]
]
}
*/

特定の数の要素を収集

ストリームから特定の数の要素をChunkに収集したい場合、Sink.collectAllN(n)を使用できます。

import { Stream, Sink, Effect } from "effect";
const effect = Stream.make(1, 2, 3, 4, 5).pipe(Stream.run(Sink.collectAllN(3)));
Effect.runPromise(effect).then(console.log);
/*
出力:
{
_id: "Chunk",
values: [ 1, 2, 3 ]
}
*/

条件を満たす間の収集

特定の条件を満たす限り要素を集めたい場合は、Sink.collectAllWhile(predicate)を使用できます。この関数は、条件がfalseを返すまで要素を集め続けます。

import { Stream, Sink, Effect } from "effect";
const effect = Stream.make(1, 2, 0, 4, 0, 6, 7).pipe(
Stream.run(Sink.collectAllWhile((n) => n !== 0))
);
Effect.runPromise(effect).then(console.log);
/*
出力:
{
_id: "Chunk",
values: [ 1, 2 ]
}
*/

特定のサイズの HashSet に収集

最大サイズnのセットに集約するために、Sink.collectAllToSetN(n)を利用できます。

import { Stream, Sink, Effect } from "effect";
const effect = Stream.make(1, 2, 2, 3, 4, 4).pipe(
Stream.run(Sink.collectAllToSetN(3))
);
Effect.runPromise(effect).then(console.log);
/*
出力:
{
_id: "HashSet",
values: [ 1, 2, 3 ]
}
*/

限定的なキーの HashMap に収集

最大n個のキーを持つマップに要素を集約する必要がある場合は、Sink.collectAllToMapN(n, keyFunction, mergeFunction)を使用できます。

import { Stream, Sink, Effect } from "effect";
const effect = Stream.make(1, 3, 2, 3, 1, 5, 1).pipe(
Stream.run(
Sink.collectAllToMapN(
3,
(n) => n,
(a, b) => a + b
)
)
);
Effect.runPromise(effect).then(console.log);
/*
出力:
{
_id: "HashMap",
values: [
[ 1, 2 ], [ 2, 2 ], [ 3, 6 ]
]
}
*/

畳み込み

左畳み込み

ストリームの数値があり、各要素に操作を適用して単一の値に縮約したいとします。これを実現するには、Sink.foldLeft関数を使用します。

import { Stream, Sink, Effect } from "effect";
const effect = Stream.make(1, 2, 3, 4).pipe(
Stream.run(Sink.foldLeft(0, (a, b) => a + b))
);
Effect.runPromise(effect).then(console.log);
/*
出力:
10
*/

終了条件付きの畳み込み

ストリーム内の要素を畳み込むが、特定の条件が満たされた場合に畳み込みを停止したい場合があります。これを「ショートサーキット」と呼び、終了条件を指定できるSink.fold関数を使用して実現できます。

import { Stream, Sink, Effect } from "effect";
const effect = Stream.iterate(0, (n) => n + 1).pipe(
Stream.run(
Sink.fold(
0,
(sum) => sum <= 10,
(a, b) => a + b
)
)
);
Effect.runPromise(effect).then(console.log);
/*
出力:
15
*/

重み付き要素の畳み込み

要素の重みやコストに基づいて畳み込む場合、ある最大コストに達するまで要素を積み上げることができます。これをSink.foldWeightedを使用して行います。次の例では、重みが 1 の要素をまとめ、合計の重みが 3 に達するたびに畳み込みプロセスを再起動します。

import { Stream, Sink, Chunk, Effect } from "effect";
const stream = Stream.make(3, 2, 4, 1, 5, 6, 2, 1, 3, 5, 6).pipe(
Stream.transduce(
Sink.foldWeighted({
initial: Chunk.empty<number>(),
maxCost: 3,
cost: () => 1,
body: (acc, el) => Chunk.append(acc, el),
})
)
);
Effect.runPromise(Stream.runCollect(stream)).then(console.log);
/*
出力:
{
_id: "Chunk",
values: [
{
_id: "Chunk",
values: [ 3, 2, 4 ]
}, {
_id: "Chunk",
values: [ 1, 5, 6 ]
}, {
_id: "Chunk",
values: [ 2, 1, 3 ]
}, {
_id: "Chunk",
values: [ 5, 6 ]
}
]
}
*/

制限付きの畳み込み

特定の制限まで要素を畳み込みたい場合は、Sink.foldUntilを使用できます。次の例では、3 つの要素が集まるまで畳み込んでいます。

import { Stream, Sink, Effect } from "effect";
const effect = Stream.make(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).pipe(
Stream.run(Sink.foldUntil(0, 3, (a, b) => a + b))
);
Effect.runPromise(effect).then(console.log);
/*
出力:
6
*/