Skip to content

基本的な並行処理

Effect は、ファイバーによって支えられた高い並行性を持つフレームワークです。ファイバーは軽量な 仮想スレッド であり、リソース安全性を持つキャンセル機能を備えているため、Effect における多くの機能を実現可能にします。

このセクションでは、ファイバーの基本を学び、ファイバーを活用する強力な高レベルのオペレーターに慣れ親しむことができます。

仮想スレッドとは?

JavaScript は本質的にシングルスレッドです。つまり、一連の命令を単一の順序で実行します。しかし、現代の JavaScript 環境では、イベントループを使用して非同期操作を管理し、マルチタスクの幻想を生み出します。この文脈において、仮想スレッド、またはファイバーは、Effect ランタイムによってシミュレーションされた論理的なスレッドです。これにより、JavaScript ではネイティブにサポートされていない真のマルチスレッドに依存せずに並行実行が可能となります。

ファイバー

Effect 内のすべてのエフェクトはファイバーによって実行されます。自分でファイバーを作成しなかった場合、それは使用している操作(それが並行している場合)または Effect ランタイムシステムによって作成されました。

非同期操作のない “シングルスレッド” コードを書いた場合でも、常に少なくとも 1 つのファイバーが存在します。それは、あなたのエフェクトを実行する “メイン” ファイバーです。

Effect のファイバーには、実行しているエフェクトに基づいた明確なライフサイクルがあります。

各ファイバーは、その実行しているエフェクトが成功または失敗に応じて、成功または失敗で終了します。

Effect のファイバーは、ユニークな ID、ローカルな状態、およびステータス(完了、実行中、または中断中など)を持っています。

ファイバーデータ型

Effect のファイバーデータ型は、エフェクトの実行に対する「ハンドル」を表します。

Fiber<A, E>データ型には、2 つの型パラメータがあります:

  • A (成功型): ファイバーが成功する可能性のある値の型。
  • E (失敗型): ファイバーが失敗する可能性のある値の型。

ファイバーは、すでに要件が提供されたエフェクトのみを実行するため、R型パラメータを持ちません。

エフェクトのフォーク

ファイバーを作成する基本的な方法の 1 つは、既存のエフェクトをフォークすることです。エフェクトをフォークすると、新しいファイバーでエフェクトの実行が始まり、この新たに作成されたファイバーへの参照が得られます。

以下のコードは、Effect.fork関数を使って単一のファイバーを作成する方法を示しています。このファイバーは、メインファイバーとは独立してfib(10)関数を実行します:

import { Effect } from "effect";
const fib = (n: number): Effect.Effect<number> =>
Effect.suspend(() => {
if (n <= 1) {
return Effect.succeed(n);
}
return fib(n - 1).pipe(Effect.zipWith(fib(n - 2), (a, b) => a + b));
});
const fib10Fiber = Effect.fork(fib(10));

ファイバーの結合

ファイバーに対して一般的な操作は、Fiber.join関数を使用してそれらを結合することです。この関数は、結合したファイバーの結果に基づいて成功または失敗するEffectを返します:

import { Effect, Fiber } from "effect";
const fib = (n: number): Effect.Effect<number> =>
Effect.suspend(() => {
if (n <= 1) {
return Effect.succeed(n);
}
return fib(n - 1).pipe(Effect.zipWith(fib(n - 2), (a, b) => a + b));
});
const fib10Fiber = Effect.fork(fib(10));
const program = Effect.gen(function* () {
const fiber = yield* fib10Fiber;
const n = yield* Fiber.join(fiber);
console.log(n);
});
Effect.runPromise(program); // 55

ファイバーの待機

ファイバーにとって便利なもう 1 つの関数はFiber.awaitです。この関数は、ファイバーの完了状況についての詳細な情報を 제공する Exit 値を含むエフェクトを返します。

import { Effect, Fiber } from "effect";
const fib = (n: number): Effect.Effect<number> =>
Effect.suspend(() => {
if (n <= 1) {
return Effect.succeed(n);
}
return fib(n - 1).pipe(Effect.zipWith(fib(n - 2), (a, b) => a + b));
});
const fib10Fiber = Effect.fork(fib(10));
const program = Effect.gen(function* () {
const fiber = yield* fib10Fiber;
const exit = yield* Fiber.await(fiber);
console.log(exit);
});
Effect.runPromise(program); // { _id: 'Exit', _tag: 'Success', value: 55 }

ファイバーの中断

ファイバーの結果がもはや必要ない場合、それを中断することができ、これによりファイバーが直ちに終了し、すべてのリソースを安全に解放するためにファイバーの完了処理が実行されます。

Fiber.awaitと同様に、Fiber.interruptはファイバーの完了状況を説明する Exit 値を返します。

import { Effect, Fiber } from "effect";
const program = Effect.gen(function* () {
const fiber = yield* Effect.fork(Effect.forever(Effect.succeed("Hi!")));
const exit = yield* Fiber.interrupt(fiber);
console.log(exit);
});
Effect.runPromise(program);
/*
出力
{
_id: 'Exit',
_tag: 'Failure',
cause: {
_id: 'Cause',
_tag: 'Interrupt',
fiberId: {
_id: 'FiberId',
_tag: 'Runtime',
id: 0,
startTimeMillis: 1715787137490
}
}
}
*/

設計上、Fiber.interruptによって返されるエフェクトは、ファイバーが完了するまで再開されません。これにより、古いファイバーが終了する前に新しいファイバーが開始されないことが保証されます。この動作は「バックプレッシャー」と呼ばれ、必要に応じてオーバーライドすることができます。

バックプレッシャーが必要ない場合は、中断自体を新しいファイバーにフォークすることができます:

import { Effect, Fiber } from "effect";
const program = Effect.gen(function* () {
const fiber = yield* Effect.fork(Effect.forever(Effect.succeed("Hi!")));
const _ = yield* Effect.fork(Fiber.interrupt(fiber));
});

バックグラウンド中断のためのショートハンドとして、Fiber.interruptForkも用意されています。

import { Effect, Fiber } from "effect";
const program = Effect.gen(function* () {
const fiber = yield* Effect.fork(Effect.forever(Effect.succeed("Hi!")));
const _ = yield* Fiber.interruptFork(fiber);
});

注意: 高レベル API Effect.interruptを使用して中断を行うこともできます。詳細については、Effect.interruptを参照してください。

ファイバーの合成

Fiber.zipおよびFiber.zipWith関数を使用すると、2 つのファイバーを 1 つのファイバーに結合できます。結果のファイバーは、入力ファイバーの両方の結果を生成します。入力ファイバーのいずれかが失敗すると、合成されたファイバーも失敗します。

Fiber.zipを使用した例は以下の通りです:

import { Effect, Fiber } from "effect";
const program = Effect.gen(function* () {
const fiber1 = yield* Effect.fork(Effect.succeed("Hi!"));
const fiber2 = yield* Effect.fork(Effect.succeed("Bye!"));
const fiber = Fiber.zip(fiber1, fiber2);
const tuple = yield* Fiber.join(fiber);
console.log(tuple);
});
Effect.runPromise(program);
/*
出力:
[ 'Hi!', 'Bye!' ]
*/

ファイバーを合成する別の方法は、Fiber.orElse関数を使用することです。この関数を使用すると、最初のファイバーが失敗した場合に実行される代替のファイバーを指定できます。最初のファイバーが成功した場合は、合成されたファイバーがその結果を返します。最初のファイバーが失敗した場合、合成されたファイバーは 2 番目のファイバーの結果で完了しますが、その結果が成功か失敗かは問いません。

Fiber.orElseを使用した例は以下の通りです:

import { Effect, Fiber } from "effect";
const program = Effect.gen(function* () {
const fiber1 = yield* Effect.fork(Effect.fail("Uh oh!"));
const fiber2 = yield* Effect.fork(Effect.succeed("Hurray!"));
const fiber = Fiber.orElse(fiber1, fiber2);
const message = yield* Fiber.join(fiber);
console.log(message);
});
Effect.runPromise(program);
/*
出力:
Hurray!
*/

並行処理オプション

Effect は、コードの並行化の機会を特定するのに役立つ多くの関数を提供します。これらは Concurrency Options を受け入れます。

たとえば、標準のEffect.zip関数は、2 つのエフェクトを順次結合します。しかし、Effect.zip({_, _, { concurrent: true })という並行バージョンもあり、これを使用すると 2 つのエフェクトを並行して結合できます。

以下の例では、Effect.zipを使用して 2 つのタスクを順次実行します。最初のタスクは 1 秒かかり、2 番目のタスクは 2 秒かかるため、合計所要時間は約 3 秒になります:

import { Effect, Console } from "effect";
const task1 = Effect.delay(Console.log("task1"), "1 second");
const task2 = Effect.delay(Console.log("task2"), "2 seconds");
const program = Effect.zip(task1, task2);
Effect.runPromise(Effect.timed(program)).then(([duration]) =>
console.log(String(duration))
);
/*
出力:
task1
task2
Duration(3s 5ms 369875ns)
*/

この例では、Effect.zipの並行バージョンを使用して 2 つのタスクを並行して実行します。合計所要時間は、最も長いタスクの所要時間、つまり 2 秒にほぼ等しくなります:

import { Effect, Console } from "effect";
const task1 = Effect.delay(Console.log("task1"), "1 second");
const task2 = Effect.delay(Console.log("task2"), "2 seconds");
const program = Effect.zip(task1, task2, { concurrent: true });
Effect.runPromise(Effect.timed(program)).then(([duration]) =>
console.log(String(duration))
);
/*
出力:
task1
task2
Duration(2s 8ms 179666ns)
*/

レース

Effect.race関数を使用すると、複数のエフェクトを並行して競わせ、最初に成功したものの結果を返します。以下はその例です:

import { Effect } from "effect";
const task1 = Effect.delay(Effect.fail("task1"), "1 second");
const task2 = Effect.delay(Effect.succeed("task2"), "2 seconds");
const program = Effect.race(task1, task2);
Effect.runPromise(program).then(console.log);
/*
出力:
task2
*/

この例では、task1は 1 秒後に失敗する設定であり、task2は 2 秒後に成功する設定です。Effect.race関数は両方のタスクを並行して実行し、最初に成功するのはtask2であるため、その結果が返されます。

最初に完了したエフェクトが成功か失敗かにかかわらず処理が必要な場合は、Effect.either関数を使用できます。この関数は結果を Either 型でラップし、結果が成功 (Right) か失敗 (Left) かを示します:

import { Effect } from "effect";
const task1 = Effect.delay(Effect.fail("task1"), "1 second");
const task2 = Effect.delay(Effect.succeed("task2"), "2 seconds");
const program = Effect.race(Effect.either(task1), Effect.either(task2));
Effect.runPromise(program).then(console.log);
/*
出力:
{ _id: 'Either', _tag: 'Left', left: 'task1' }
*/

この例では、task1は 1 秒後に失敗し、task2は 2 秒後に成功します。Effect.eitherを使用することで、プログラムはtask1の結果を返し、失敗(Left)であることを示します。

タイムアウト

非同期タスクを扱う際、合理的な時間内にタスクが完了することを保証することが重要です。Effect は、Effect.timeout関数を使用してエフェクトに時間制限を強制する便利な方法を提供します。この関数は、新しいエフェクトを返し、指定された期間内に元のエフェクトが完了しない場合は TimeoutException で失敗します。

以下は、Effect.timeoutを使用する方法を示す例です:

import { Effect } from "effect";
const task = Effect.delay(Effect.succeed("task1"), "10 seconds");
const program = Effect.timeout(task, "2 seconds");
Effect.runPromise(program);
/*
throws:
TimeoutException
*/

この例では、taskは 10 秒後に成功するエフェクトです。taskEffect.timeoutでラップし、2 秒のタイムアウトを指定することで、結果のプログラムはタスクが許可された時間よりも長くかかるため、TimeoutExceptionで失敗します。

エフェクトがタイムアウトすると、effect ライブラリはそれを自動的に中断し、バックグラウンドでの実行を防ぎます。この中断によって、不必要な作業を停止することによりリソースの効率的な使用が保証されます。