Skip to content

並行演算子

このセクションでは、複数のシンクを同時に実行することを可能にする並行演算子について探ります。これらの演算子は、タスクを同時に実行する必要があるときに非常に便利です。

並行結合: 結果をまとめる

2つのシンクを同時に実行し、それらの結果を結合したい場合、Sink.zip を使用します。この操作は、両方のシンクを同時に実行し、その結果をタプルに結合します。

import { Sink, Console, Stream, Schedule, Effect } from "effect"
const s1 = Sink.forEach((s: string) => Console.log(`sink 1: ${s}`)).pipe(
Sink.as(1)
)
const s2 = Sink.forEach((s: string) => Console.log(`sink 2: ${s}`)).pipe(
Sink.as(2)
)
const sink = s1.pipe(Sink.zip(s2, { concurrent: true }))
Effect.runPromise(
Stream.make("1", "2", "3", "4", "5").pipe(
Stream.schedule(Schedule.spaced("10 millis")),
Stream.run(sink)
)
).then(console.log)
/*
Output:
sink 1: 1
sink 2: 1
sink 1: 2
sink 2: 2
sink 1: 3
sink 2: 3
sink 1: 4
sink 2: 4
sink 1: 5
sink 2: 5
[ 1, 2 ]
*/

レース: 最初に完了したものが勝ち

もう一つの便利な操作は Sink.race であり、複数のシンクを同時にレースさせることができます。最初に完了したシンクがプログラムの結果を提供します。

import { Sink, Console, Stream, Schedule, Effect } from "effect"
const s1 = Sink.forEach((s: string) => Console.log(`sink 1: ${s}`)).pipe(
Sink.as(1)
)
const s2 = Sink.forEach((s: string) => Console.log(`sink 2: ${s}`)).pipe(
Sink.as(2)
)
const sink = s1.pipe(Sink.race(s2))
Effect.runPromise(
Stream.make("1", "2", "3", "4", "5").pipe(
Stream.schedule(Schedule.spaced("10 millis")),
Stream.run(sink)
)
).then(console.log)
/*
Output:
sink 1: 1
sink 2: 1
sink 1: 2
sink 2: 2
sink 1: 3
sink 2: 3
sink 1: 4
sink 2: 4
sink 1: 5
sink 2: 5
1
*/