Effectの障害モデル入門
ファイバーの中断処理
並行アプリケーションを開発する際、他のファイバーの実行を中断する必要があるさまざまなケースがあります。例えば:
-
親ファイバーがいくつかの子ファイバーを開始してタスクを実行させ、後に親ファイバーが子ファイバーの一部またはすべての結果が不要であると判断する場合。
-
二つ以上のファイバーが互いに競い合う場合。最初に結果が計算されたファイバーが勝ち、他のすべてのファイバーはもはや必要なく、中断されるべきです。
-
インタラクティブなアプリケーションでは、ユーザーが「停止」ボタンをクリックして、既に実行中のタスクを停止したい場合があります。
-
予期以上に長く走る計算は、タイムアウト操作を使用して中断されるべきです。
-
ユーザー入力に基づいて計算集約的なタスクを実行するアプリケーションがある場合、ユーザーが入力を変更した時に現在のタスクをキャンセルし、別のタスクを実行するべきです。
ポーリング vs. 非同期中断
ファイバーの中断については、単純なアプローチとして、一つのファイバーが他のファイバーを強制終了させることが考えられます。しかし、このアプローチは理想的ではありません。その理由は、ターゲットファイバーが共有状態を変更中であった場合、共有状態が不整合で信頼できない状態になる可能性があるからです。したがって、共有ミュータブル状態の内部一貫性は保証されません。
代わりに、この問題に対処するための 2 つの一般的かつ有効な解決策があります。
-
準非同期中断(中断のポーリング): 命令型言語では、Java のようにポーリングを準非同期シグナル機構として使用することがよくあります。このモデルでは、あるファイバーが別のファイバーに中断要求を送信します。ターゲットファイバーは中断ステータスを継続的にポーリングし、他のファイバーから中断要求を受け取ったかどうかを確認します。中断要求が検出された場合、ターゲットファイバーはできるだけ早く自己を終了させます。
この解決策では、ファイバー自身がクリティカルセクションを処理します。したがって、ファイバーが重要なセクションの真っ最中で中断要求を受け取った場合、その中断を無視し、クリティカルセクションの後までその処理を先延ばしします。
ただし、このアプローチの欠点は、プログラマーが定期的にポーリングを忘れた場合、ターゲットファイバーが応答しなくなり、デッドロックを引き起こす可能性があることです。また、グローバルフラグをポーリングすることは、Effect が従っている機能的なパラダイムに適合しません。
-
非同期中断: 非同期中断では、あるファイバーが別のファイバーを終了させることが許可されています。ターゲットファイバーは中断ステータスをポーリングする責任はありません。代わりに、クリティカルセクション中にターゲットファイバーはそれらの領域の中断可能性を無効化します。これは、グローバル状態をポーリングする必要がない純粋に機能的な解決策です。Effect はこの解決策を中断モデルとして採用しており、これは完全に非同期のシグナル機構です。
この機構は、定期的なポーリングを忘れるという欠点を克服します。また、純粋な機能計算においては、いつでも計算を中断できるため、機能的なパラダイムに完全に適合しています。ただし、クリティカルセクション中は中断が無効化されます。
ファイバーはいつ中断されるのか?
ファイバーが中断される方法や状況はいくつかあります。それぞれを探求し、これらのシナリオを再現する方法を示す例を提供します。
Effect.interrupt の呼び出し
ファイバーは、その特定のファイバーに対してEffect.interruptオペレーターを呼び出すことによって中断されます。
中断なし
import { Effect } from "effect";
const program = Effect.gen(function* () { yield* Effect.log("start"); yield* Effect.sleep("2 seconds"); yield* Effect.log("done");});
Effect.runPromise(program).catch((error) => console.log(`interrupted: ${error}`));/*出力:timestamp=... level=INFO fiber=#0 message=starttimestamp=... level=INFO fiber=#0 message=done*/import { Effect } from "effect";
const program = Effect.log("start").pipe( Effect.andThen(Effect.sleep("2 seconds")), Effect.andThen(Effect.log("done")));
Effect.runPromise(program).catch((error) => console.log(`interrupted: ${error}`));/*出力:timestamp=... level=INFO fiber=#0 message=starttimestamp=... level=INFO fiber=#0 message=done*/中断あり
import { Effect } from "effect";
const program = Effect.gen(function* () { yield* Effect.log("start"); yield* Effect.sleep("2秒"); yield* Effect.interrupt; yield* Effect.log("done");});
Effect.runPromiseExit(program).then(console.log);/*出力:timestamp=... level=INFO fiber=#0 message=start{ _id: 'Exit', _tag: 'Failure', cause: { _id: 'Cause', _tag: 'Interrupt', fiberId: { _id: 'FiberId', _tag: 'Runtime', id: 0, startTimeMillis: ... } }}*/import { Effect } from "effect";
const program = Effect.log("start").pipe( Effect.andThen(Effect.sleep("2秒")), Effect.andThen(Effect.interrupt), Effect.andThen(Effect.log("done")));
Effect.runPromiseExit(program).then(console.log);/*出力:timestamp=... level=INFO fiber=#0 message=start{ _id: 'Exit', _tag: 'Failure', cause: { _id: 'Cause', _tag: 'Interrupt', fiberId: { _id: 'FiberId', _tag: 'Runtime', id: 0, startTimeMillis: ... } }}*/並行効果の中断
Effect.forEachのような関数を使用して複数の並行効果を組み合わせるとき、もし一つの効果が中断されると、他のすべての並行効果も中断されることに注意してください。例を見てみましょう。
import { Effect } from "effect";
const program = Effect.forEach( [1, 2, 3], (n) => Effect.gen(function* () { yield* Effect.log(`start #${n}`); yield* Effect.sleep(`${n} seconds`); if (n > 1) { yield* Effect.interrupt; } yield* Effect.log(`done #${n}`); }), { concurrency: "unbounded" });
Effect.runPromiseExit(program).then((exit) => console.log(JSON.stringify(exit, null, 2)));/*出力:timestamp=... level=INFO fiber=#1 message="start #1"timestamp=... level=INFO fiber=#2 message="start #2"timestamp=... level=INFO fiber=#3 message="start #3"timestamp=... level=INFO fiber=#1 message="done #1"{ "_id": "Exit", "_tag": "Failure", "cause": { "_id": "Cause", "_tag": "Parallel", "left": { "_id": "Cause", "_tag": "Interrupt", "fiberId": { "_id": "FiberId", "_tag": "Runtime", "id": 3, "startTimeMillis": ... } }, "right": { "_id": "Cause", "_tag": "Sequential", "left": { "_id": "Cause", "_tag": "Empty" }, "right": { "_id": "Cause", "_tag": "Interrupt", "fiberId": { "_id": "FiberId", "_tag": "Runtime", "id": 0, "startTimeMillis": ... } } } }}*/import { Effect } from "effect";
const program = Effect.forEach( [1, 2, 3], (n) => Effect.log(`start #${n}`).pipe( Effect.andThen(() => { const effect = Effect.sleep(`${n} seconds`); if (n > 1) { return Effect.andThen(effect, () => Effect.interrupt); } else { return effect; } }), Effect.andThen(Effect.log(`done #${n}`)) ), { concurrency: "unbounded" });
Effect.runPromiseExit(program).then((exit) => console.log(JSON.stringify(exit, null, 2)));/*出力:timestamp=... level=INFO fiber=#1 message="start #1"timestamp=... level=INFO fiber=#2 message="start #2"timestamp=... level=INFO fiber=#3 message="start #3"timestamp=... level=INFO fiber=#1 message="done #1"{ "_id": "Exit", "_tag": "Failure", "cause": { "_id": "Cause", "_tag": "Parallel", "left": { "_id": "Cause", "_tag": "Interrupt", "fiberId": { "_id": "FiberId", "_tag": "Runtime", "id": 3, "startTimeMillis": ... } }, "right": { "_id": "Cause", "_tag": "Sequential", "left": { "_id": "Cause", "_tag": "Empty" }, "right": { "_id": "Cause", "_tag": "Interrupt", "fiberId": { "_id": "FiberId", "_tag": "Runtime", "id": 0, "startTimeMillis": ... } } } }}*/この例では、三つの並行タスクを表す配列[1, 2, 3]があります。Effect.forEachを使って各要素を反復処理し、いくつかの操作を行います。Effect.log関数を使用して、各タスクの開始と完了を示すメッセージをログに記録します。
出力を見てみると、n = 1のタスクは正常に開始され、完了します。しかし、n = 2のタスクは終了する前にEffect.interruptで中断されます。その結果、すべてのファイバーが中断され、プログラムは「すべてのファイバーがエラーなしで中断されました」というメッセージで終了します。
この例は、並行効果における中断の働き方を示しています。一つの並行タスクが中断されると、他のすべての並行タスクも中断されます。