Skip to content

Fibers

ファイバーとは?

「ファイバー」とは、小さい作業単位や軽量な実行スレッドを指します。プログラム内の特定の計算や効果を持つ操作を表します。ファイバーは並行性や非同期タスクを管理するために使用されます。

ファイバーを特定の作業を行う作業者として考えてみてください。ファイバーは開始、停止、再開、さらには中断することも可能です。ファイバーは、複数のタスクを同時に実行したり、メインプログラムをブロックせずに長時間実行される操作を処理したりする際に便利です。

ファイバーを使用することで、開発者はタスクの実行を制御し、調整できるため、アプリケーションにおける効率的なマルチタスクや応答性を実現できます。

要約すると:

  • Effectは、効果のある計算を説明する高レベルの概念です。遅延実行で不変であり、値を生成したり失敗したりする可能性のある計算を表しますが、即座に実行されるわけではありません。
  • 一方でファイバーは、Effectの実行を表します。結果を取得するために中断したり待機したりできます。それは進行中の計算を制御し、対話する方法と考えてください。

ファイバーの作成

ファイバーは、効果が実行されるたびに作成されます。複数の効果を同時に実行する場合、各並行効果ごとにファイバーが作成されます。

子ファイバーのライフタイム

ファイバーをフォークする際、フォークの仕方によって子ファイバーのライフタイム戦略が4つ異なります。

  1. 自動監視でのフォーク。通常のEffect.fork操作を使用すると、子ファイバーは親ファイバーによって自動的に監視されます。子ファイバーのライフタイムは親ファイバーのライフタイムに結び付いています。つまり、これらのファイバーは自然に終了するか、親ファイバーが終了するまで終了しません。

  2. グローバルスコープでのフォーク(デーモン)。時には、親ファイバーに結び付けられていない長時間実行されるバックグラウンドファイバーを実行したい場合があります。また、グローバルスコープでフォークしたい場合もあります。グローバルスコープでフォークされたファイバーはデーモンファイバーになります。これはEffect.forkDaemon演算子を使用することで実現できます。これらのファイバーは親がいないため監視されず、自然に終了するか、アプリケーションが終了したときに終了します。

  3. ローカルスコープでのフォーク。時には、親ファイバーに結び付けられていないバックグラウンドファイバーを実行したいが、そのファイバーをローカルスコープに留めたい場合もあります。Effect.forkScopedを使用することで、ローカルスコープ内でファイバーをフォークできます。このようなファイバーは親ファイバーの寿命を超えて生存することができ(親によって監視されることはありません)、寿命が尽きるかローカルスコープが閉じられたときに終了します。

  4. 特定のスコープでのフォーク。これは前の戦略と似ていますが、特定のスコープでフォークすることによって子ファイバーのライフタイムをより細かく制御することができます。これはEffect.forkIn演算子を使用することで行えます。

自動監視でのフォーク

Effectは構造化された並行性モデルを採用しており、ファイバーのライフタイムは整然と入れ子になっています。簡単に言えば、ファイバーの寿命はその親ファイバーの寿命に依存しています。

この概念を明確にするために、以下の例を見てみましょう。このシナリオでは、parentファイバーがchildファイバーを生成しています。 childファイバーは完了しない長時間のタスクに従事しています。 重要なのは、Effectがchildファイバーの寿命がparentファイバーを超えることがないことを保証している点です:

import { Effect, Console, Schedule } from "effect"
const child = Effect.repeat(
Console.log("child: still running!"),
Schedule.fixed("1 second")
)
const parent = Effect.gen(function* () {
console.log("parent: started!")
yield* Effect.fork(child)
yield* Effect.sleep("3 seconds")
console.log("parent: finished!")
})
Effect.runPromise(parent)

上記のプログラムを実行すると、次の出力が得られます:

Terminal window
parent: started!
child: still running!
child: still running!
child: still running!
parent: finished!

このパターンは、いかなるレベルの入れ子のファイバーにも拡張できます。

グローバルスコープでのフォーク(デーモン)

Effect.forkDaemonを使用することで、効果からデーモンファイバーを作成できます。デーモンファイバーのライフタイムはグローバルスコープに結び付いています。 したがって、親ファイバーが終了すると、デーモンファイバーは終了しません。 デーモンファイバーは、グローバルスコープが閉じられるか、その寿命が自然に尽きるまで終了しません。

import { Effect, Console, Schedule } from "effect"
const daemon = Effect.repeat(
Console.log("daemon: still running!"),
Schedule.fixed("1 second")
)
const parent = Effect.gen(function* () {
console.log("parent: started!")
yield* Effect.forkDaemon(daemon)
yield* Effect.sleep("3 seconds")
console.log("parent: finished!")
})
Effect.runPromise(parent)

上記のプログラムを実行すると、次の出力が得られます。parentファイバーの寿命が3秒後に終了する一方で、daemonファイバーがまだ実行中であることが示されています:

Terminal window
parent: started!
daemon: still running!
daemon: still running!
daemon: still running!
parent: finished!
daemon: still running!
daemon: still running!
daemon: still running!
daemon: still running!
daemon: still running!
...etc...

親ファイバーを中断しても、daemonファイバーは中断されません:

import { Effect, Console, Schedule, Fiber } from "effect"
const daemon = Effect.repeat(
Console.log("daemon: still running!"),
Schedule.fixed("1 second")
)
const parent = Effect.gen(function* () {
console.log("parent: started!")
yield* Effect.forkDaemon(daemon)
yield* Effect.sleep("3 seconds")
console.log("parent: finished!")
}).pipe(Effect.onInterrupt(() => Console.log("parent: interrupted!")))
const program = Effect.gen(function* () {
const fiber = yield* Effect.fork(parent)
yield* Effect.sleep("2 seconds")
yield* Fiber.interrupt(fiber)
})
Effect.runPromise(program)

出力:

Terminal window
parent: started!
daemon: still running!
daemon: still running!
parent: interrupted!
daemon: still running!
daemon: still running!
daemon: still running!
daemon: still running!
daemon: still running!
...etc...

ローカルスコープでのフォーク

時には、ファイバーをローカルスコープに付けたいことがあります。そのような場合は、Effect.forkScoped演算子を使用します。この演算子を使用することで、フォークされたファイバーのライフタイムは親ファイバーの寿命を超えることができ、ローカルスコープが閉じられたときに終了します:

import { Effect, Console, Schedule } from "effect"
const child = Effect.repeat(
Console.log("child: still running!"),
Schedule.fixed("1 second")
)
const parent = Effect.gen(function* () {
console.log("parent: started!")
yield* Effect.forkScoped(child)
yield* Effect.sleep("3 seconds")
console.log("parent: finished!")
})
const program = Effect.scoped(
Effect.gen(function* () {
console.log("Local scope started!")
yield* Effect.fork(parent)
yield* Effect.sleep("5 seconds")
console.log("Leaving the local scope!")
})
)
Effect.runPromise(program)

上記の例では、ローカルスコープでフォークされたchildファイバーはそのparentファイバーよりも長いライフタイムを持っています。したがって、parentファイバーが終了しても、ローカルスコープ内でchildファイバーは閉じられるまで実行を続けます。 出力を見てみましょう:

Terminal window
Local scope started!
parent: started!
child: still running!
child: still running!
child: still running!
parent: finished!
child: still running!
child: still running!
Leaving the local scope!

特定のスコープでのフォーク

より細かい制御が必要な場合、特定のスコープでファイバーをフォークしたい場合があります。その場合、Effect.forkIn演算子を使用して対象スコープを引数として指定できます:

import { Console, Effect, Schedule } from "effect"
const child = Console.log("child: still running!").pipe(
Effect.repeat(Schedule.fixed("1 second"))
)
const program = Effect.scoped(
Effect.gen(function* () {
yield* Effect.addFinalizer(() =>
Console.log("The outer scope is about to be closed!")
)
const outerScope = yield* Effect.scope
yield* Effect.scoped(
Effect.gen(function* () {
yield* Effect.addFinalizer(() =>
Console.log("The inner scope is about to be closed!")
)
yield* Effect.forkIn(child, outerScope)
yield* Effect.sleep("3 seconds")
})
)
yield* Effect.sleep("5 seconds")
})
)
Effect.runPromise(program)

出力:

Terminal window
child: still running!
child: still running!
child: still running!
The inner scope is about to be closed!
child: still running!
child: still running!
child: still running!
child: still running!
child: still running!
child: still running!
The outer scope is about to be closed!

ファイバーはいつ実行されますか?

新しいファイバーは、現在のファイバーが完了するか、イールドした後に実行を開始します。これは、一部のケースで無限ループを防ぐために必要であり、fork APIを使用する際に知っておくと便利です。

以下の例では、SubscriptionRefchangesストリームは単一の値2しか含まれません。それは、ストリームを実行するために作成されたファイバー(forkによって)の開始が、値が更新されたであるためです。

import { Effect, SubscriptionRef, Stream, Console } from "effect"
const program = Effect.gen(function* () {
const ref = yield* SubscriptionRef.make(0)
yield* ref.changes.pipe(
Stream.tap((n) => Console.log(`SubscriptionRef changed to ${n}`)),
Stream.runDrain,
Effect.fork
)
yield* SubscriptionRef.set(ref, 1)
yield* SubscriptionRef.set(ref, 2)
})
Effect.runPromise(program)
/*
Output:
SubscriptionRef changed to 2
*/

もし、Effect.yieldNow()を追加して現在のファイバーをイールドさせると、ストリームはすべての値012を含むことになります。これは、ストリームを実行するファイバーが値が変更される前に開始される機会があるからです。

import { Effect, SubscriptionRef, Stream, Console } from "effect"
const program = Effect.gen(function* () {
const ref = yield* SubscriptionRef.make(0)
yield* ref.changes.pipe(
Stream.tap((n) => Console.log(`SubscriptionRef changed to ${n}`)),
Stream.runDrain,
Effect.fork
)
yield* Effect.yieldNow()
yield* SubscriptionRef.set(ref, 1)
yield* SubscriptionRef.set(ref, 2)
})
Effect.runPromise(program)
/*
Output:
SubscriptionRef changed to 0
SubscriptionRef changed to 1
SubscriptionRef changed to 2
*/