ストリームにおけるエラー処理
失敗からの回復
エラーが発生する可能性のあるストリームで作業する際には、これらのエラーを適切に処理する方法を熟知することが重要です。Stream.orElse関数は、失敗から回復し、エラーが発生した場合に別のストリームに切り替えるための強力なツールです。
以下は実用的な例です:
import { Stream, Effect } from "effect";
const s1 = Stream.make(1, 2, 3).pipe( Stream.concat(Stream.fail("Oh! エラー!")), Stream.concat(Stream.make(4, 5)));
const s2 = Stream.make("a", "b", "c");
const stream = Stream.orElse(s1, () => s2);
Effect.runPromise(Stream.runCollect(stream)).then(console.log);/*Output:{ _id: "Chunk", values: [ 1, 2, 3, "a", "b", "c" ]}*/この例では、`s1`がエラーに遭遇しますが、ストリームを終了するのではなく、`Stream.orElse`を使用して優雅に`s2`に切り替えます。これにより、1つのストリームが失敗してもデータの処理を継続することができます。
`Stream.orElseEither`という変種もあり、[Either](../../../other/data-types/either)データ型を使用して、成功または失敗に基づいて2つのストリームからの要素を区別します:
```ts twoslashimport { Stream, Effect } from "effect"
const s1 = Stream.make(1, 2, 3).pipe( Stream.concat(Stream.fail("Oh! エラー!")), Stream.concat(Stream.make(4, 5)))
const s2 = Stream.make("a", "b", "c")
const stream = Stream.orElseEither(s1, () => s2)
Effect.runPromise(Stream.runCollect(stream)).then(console.log)/*Output:{ _id: "Chunk", values: [ { _id: "Either", _tag: "Left", left: 1 }, { _id: "Either", _tag: "Left", left: 2 }, { _id: "Either", _tag: "Left", left: 3 }, { _id: "Either", _tag: "Right", right: "a" }, { _id: "Either", _tag: "Right", right: "b" }, { _id: "Either", _tag: "Right", right: "c" } ]}*/```
`Stream.catchAll`関数は、`Stream.orElse`と比較して高度なエラー処理機能を提供します。`Stream.catchAll`を使用すると、発生した失敗のタイプや値に基づいて判断を下すことができます。
```ts twoslashimport { Stream, Effect } from "effect"
const s1 = Stream.make(1, 2, 3).pipe( Stream.concat(Stream.fail("Uh Oh!" as const)), Stream.concat(Stream.make(4, 5)), Stream.concat(Stream.fail("Ouch" as const)))
const s2 = Stream.make("a", "b", "c")
const s3 = Stream.make(true, false, false)
const stream = Stream.catchAll( s1, (error): Stream.Stream<string | boolean> => { switch (error) { case "Uh Oh!": return s2 case "Ouch": return s3 } })
Effect.runPromise(Stream.runCollect(stream)).then(console.log)/*Output:{ _id: "Chunk", values: [ 1, 2, 3, "a", "b", "c" ]}*/```
この例では、`s1`というストリームが2つの異なるタイプのエラーに遭遇します。`Stream.orElse`で行ったように別のストリームに単純に切り替えるのではなく、`Stream.catchAll`を使用して各タイプのエラーをどのように処理するかを正確に決定します。このレベルのエラーレカバリに対する制御により、特定のエラー条件に基づいて異なるストリームやアクションを選択できます。
## 欠陥からの回復
ストリームで作業する際には、ストリーム処理中に発生する可能性のあるさまざまな失敗シナリオに備えておくことが重要です。これに対処するために、`Stream.catchAllCause`関数が強力なソリューションを提供します。これにより、発生し得る任意のタイプの失敗を優雅に処理し、回復することができます。
使用例を示します:
```ts twoslashimport { Stream, Effect } from "effect"
const s1 = Stream.make(1, 2, 3).pipe( Stream.concat(Stream.dieMessage("Boom!")), Stream.concat(Stream.make(4, 5)))
const s2 = Stream.make("a", "b", "c")
const stream = Stream.catchAllCause(s1, () => s2)
Effect.runPromise(Stream.runCollect(stream)).then(console.log)/*Output:{ _id: "Chunk", values: [ 1, 2, 3, "a", "b", "c" ]}*/```
この例では、`s1`が欠陥に遭遇する可能性がありますが、アプリケーションがクラッシュするのではなく、`Stream.catchAllCause`を使用して別のストリームである`s2`に優雅に切り替えます。これにより、アプリケーションは堅牢を保ち、予期しない問題に直面してもデータ処理を続けます。
## 一部のエラーからの回復
ストリーム処理では、特定のタイプの失敗から回復する必要がある状況が存在します。`Stream.catchSome`および`Stream.catchSomeCause`関数は、エラーを選択的に処理および緩和できるようにします。
特定のエラーから回復したい場合は、`Stream.catchSome`を使用できます:
```ts twoslashimport { Stream, Effect, Option } from "effect"
const s1 = Stream.make(1, 2, 3).pipe( Stream.concat(Stream.fail("Oh! エラー!")), Stream.concat(Stream.make(4, 5)))
const s2 = Stream.make("a", "b", "c")
const stream = Stream.catchSome(s1, (error) => { if (error === "Oh! エラー!") { return Option.some(s2) } return Option.none()})
Effect.runPromise(Stream.runCollect(stream)).then(console.log)/*Output:{ _id: "Chunk", values: [ 1, 2, 3, "a", "b", "c" ]}*/```
特定の原因から回復するために、`Stream.catchSomeCause`関数を使用できます:
```ts twoslashimport { Stream, Effect, Option, Cause } from "effect"
const s1 = Stream.make(1, 2, 3).pipe( Stream.concat(Stream.dieMessage("Oh! エラー!")), Stream.concat(Stream.make(4, 5)))
const s2 = Stream.make("a", "b", "c")
const stream = Stream.catchSomeCause(s1, (cause) => { if (Cause.isDie(cause)) { return Option.some(s2) } return Option.none()})
Effect.runPromise(Stream.runCollect(stream)).then(console.log)/*Output:{ _id: "Chunk", values: [ 1, 2, 3, "a", "b", "c" ]}*/```
## Effectへの回復
ストリーム処理では、エラーを優雅に処理し、必要な場合にはクリーンアップタスクを実行することが重要です。`Stream.onError`関数を使用することで、ストリームにエラーが発生した場合に実行するクリーンアップタスクを指定することができます。
```ts twoslashimport { Stream, Console, Effect } from "effect"
const stream = Stream.make(1, 2, 3).pipe( Stream.concat(Stream.dieMessage("Oh! Boom!")), Stream.concat(Stream.make(4, 5)), Stream.onError(() => Console.log( "ストリームアプリケーションが終了しました!クリーンアップ中です。" ).pipe(Effect.orDie) ))
Effect.runPromise(Stream.runCollect(stream)).then(console.log)/*Output:ストリームアプリケーションが終了しました!クリーンアップ中です。error: RuntimeException: Oh! Boom!*/```
## 失敗したストリームの再試行
時には、ストリームが一時的で回復可能な失敗に遭遇することがあります。その場合、`Stream.retry`オペレーターが役立ちます。このオペレーターを使用すると、再試行スケジュールを指定し、そのスケジュールに従ってストリームを再試行します。
以下はその動作を示す例です:
```ts twoslash// @types: nodeimport { Stream, Effect, Schedule } from "effect"import * as NodeReadLine from "node:readline"
const stream = Stream.make(1, 2, 3).pipe( Stream.concat( Stream.fromEffect( Effect.gen(function* () { const s = yield* readLine("数字を入力してください: ") const n = parseInt(s) if (Number.isNaN(n)) { return yield* Effect.fail("NaN") } return n }) ).pipe(Stream.retry(Schedule.exponential("1 second"))) ))
Effect.runPromise(Stream.runCollect(stream)).then(console.log)/*Output:数字を入力してください: a数字を入力してください: b数字を入力してください: c数字を入力してください: 4{ _id: "Chunk", values: [ 1, 2, 3, 4 ]}*/
const readLine = (message: string): Effect.Effect<string> => Effect.promise( () => new Promise((resolve) => { const rl = NodeReadLine.createInterface({ input: process.stdin, output: process.stdout }) rl.question(message, (answer) => { rl.close() resolve(answer) }) }) )```
この例では、ストリームがユーザーに数字の入力を求めますが、無効な値が入力された場合(例:"a," "b," "c")には"NaN"で失敗します。しかし、`Stream.retry`を指数バックオフスケジュールを使って使用することで、一時的なエラーを処理し、有効な入力が集められるまで再試行を行います。
## エラーの洗練
ストリームで作業しているとき、特定のエラーを選択的に保持し、残りのエラーでストリームを終了したい場合があります。これは`Stream.refineOrDie`関数を使用することで実現できます。
以下はその動作を示す例です:
```ts twoslashimport { Stream, Option } from "effect"
const stream = Stream.fail(new Error())
const res = Stream.refineOrDie(stream, (error) => { if (error instanceof SyntaxError) { return Option.some(error) } return Option.none()})```
この例では、`stream`が最初に一般的な`Error`で失敗します。しかし、`Stream.refineOrDie`を使用して、`SyntaxError`タイプのエラーのみをフィルタリングして保持します。他のエラーは終了し、`SyntaxError`は`refinedStream`に保持されます。
## タイムアウト
ストリームで作業する際には、特定の期間内に値を生成しない場合にストリームを終了するなど、タイムアウトを処理したいシナリオが存在します。このセクションでは、さまざまなオペレーターを使用してタイムアウトを管理する方法を探ります。
### timeout
`Stream.timeout`オペレーターを使用すると、ストリームにタイムアウトを設定できます。指定した期間内にストリームが値を生成しない場合、ストリームは終了します。
```ts twoslashimport { Stream, Effect } from "effect"
const stream = Stream.fromEffect(Effect.never).pipe( Stream.timeout("2 seconds"))
Effect.runPromise(Stream.runCollect(stream)).then(console.log)/*{ _id: "Chunk", values: []}*/```
### timeoutFail
`Stream.timeoutFail`オペレーターは、タイムアウトとカスタムの失敗メッセージを組み合わせます。ストリームがタイムアウトとなった場合、指定したエラーメッセージで失敗します。
```ts twoslashimport { Stream, Effect } from "effect"
const stream = Stream.fromEffect(Effect.never).pipe( Stream.timeoutFail(() => "タイムアウト", "2 seconds"))
Effect.runPromiseExit(Stream.runCollect(stream)).then(console.log)/*Output:{ _id: 'Exit', _tag: 'Failure', cause: { _id: 'Cause', _tag: 'Fail', failure: 'タイムアウト' }}*/```
### timeoutFailCause
`Stream.timeoutFailCause`は、`Stream.timeoutFail`に似ており、タイムアウトとカスタムの失敗原因を組み合わせます。ストリームがタイムアウトとなった場合、指定した原因で失敗します。
```ts twoslashimport { Stream, Effect, Cause } from "effect"
const stream = Stream.fromEffect(Effect.never).pipe( Stream.timeoutFailCause(() => Cause.die("タイムアウト"), "2 seconds"))
Effect.runPromiseExit(Stream.runCollect(stream)).then(console.log)/*Output:{ _id: 'Exit', _tag: 'Failure', cause: { _id: 'Cause', _tag: 'Die', defect: 'タイムアウト' }}*/```
### timeoutTo
`Stream.timeoutTo`オペレーターを使用すると、最初のストリームが指定された期間内に値を生成しない場合に、別のストリームに切り替えることができます。
```ts twoslashimport { Stream, Effect } from "effect"
const stream = Stream.fromEffect(Effect.never).pipe( Stream.timeoutTo("2 seconds", Stream.make(1, 2, 3)))
Effect.runPromise(Stream.runCollect(stream)).then(console.log)/*{ _id: "Chunk", values: [ 1, 2, 3 ]}*/```