セマフォ
プログラミングの文脈におけるセマフォは、共有リソースへのアクセスを制御するための同期メカニズムです。Effectにおいて、セマフォはリソースへのアクセスを管理したり、非同期かつ同時実行環境でのタスクを調整したりするために使用されます。では、セマフォの概念とEffect内での動作について詳しく見ていきましょう。
セマフォとは?
セマフォはMutexの一般化です。セマフォは特定の数の許可を持っており、これを異なるエンティティが同時に保持または解放することができます。許可を、エンティティ(例えばタスクやファイバー)が共有リソースにアクセスしたり特定の操作を実行したりするためのチケットとして考えてみてください。もし利用可能な許可がなく、エンティティがそれを取得しようとすると、そのエンティティは許可が取得できるまで中断されます。
次に、非同期タスクを使用した例を見てみましょう:
import { Effect } from "effect"
const task = Effect.gen(function* () { yield* Effect.log("start") yield* Effect.sleep("2 seconds") yield* Effect.log("end")})
const semTask = (sem: Effect.Semaphore) => sem.withPermits(1)(task)
const semTaskSeq = (sem: Effect.Semaphore) => [1, 2, 3].map(() => semTask(sem).pipe(Effect.withLogSpan("elapsed")))
const program = Effect.gen(function* () { const mutex = yield* Effect.makeSemaphore(1) yield* Effect.all(semTaskSeq(mutex), { concurrency: "unbounded" })})
Effect.runPromise(program)/*Output:timestamp=... level=INFO fiber=#1 message=start elapsed=3mstimestamp=... level=INFO fiber=#1 message=end elapsed=2010mstimestamp=... level=INFO fiber=#2 message=start elapsed=2012mstimestamp=... level=INFO fiber=#2 message=end elapsed=4017mstimestamp=... level=INFO fiber=#3 message=start elapsed=4018mstimestamp=... level=INFO fiber=#3 message=end elapsed=6026ms*/ここでは、1つの許可を持つセマフォを使って非同期タスクの実行を同期し制御しています。すべての許可が使用されている場合、許可を取得しようとする追加のタスクは、いくつかの許可が利用可能になるまで待機します。
別のシナリオでは、5つの許可を持つセマフォを作成します。そして、withPermits(n)を利用して、各タスクに対して異なる数の許可を取得・解放します:
import { Effect } from "effect"
const program = Effect.gen(function* () { const sem = yield* Effect.makeSemaphore(5)
yield* Effect.forEach( [1, 2, 3, 4, 5], (n) => sem .withPermits(n)( Effect.delay(Effect.log(`process: ${n}`), "2 seconds") ) .pipe(Effect.withLogSpan("elapsed")), { concurrency: "unbounded" } )})
Effect.runPromise(program)/*Output:timestamp=... level=INFO fiber=#1 message="process: 1" elapsed=2011mstimestamp=... level=INFO fiber=#2 message="process: 2" elapsed=2017mstimestamp=... level=INFO fiber=#3 message="process: 3" elapsed=4020mstimestamp=... level=INFO fiber=#4 message="process: 4" elapsed=6025mstimestamp=... level=INFO fiber=#5 message="process: 5" elapsed=8034ms*/この例では、withPermits(n)を使用して任意の数の許可を取得し解放できることを示しています。この柔軟性により、同時実行の正確な制御が可能になります。
思い出すべき重要な点の1つは、withPermitsが各取得を同等の数の解放に対応させることを保証するということです。これは、タスクが成功した場合でも、失敗した場合でも、中断された場合でも変わりません。