Skip to content

リソースフルなストリーム

Stream モジュールでは、ほとんどのコンストラクタがスコープ付きリソースを Stream にリフトするために設計された特別なバリアントを提供しています。これらの特定のコンストラクタを使用することで、リソース管理において本質的に安全なストリームを作成することが可能です。これらのコンストラクタは、ストリームを作成する前にリソースの取得を処理し、ストリームの使用後には適切なクローズを保証します。

Stream はまた、Stream.acquireRelease および Stream.finalizer のコンストラクタを提供しており、これらは Effect.acquireRelease および Effect.addFinalizer と似た機能を持っています。これらのツールを使用することで、ストリームがその操作を終了する前にクリーンアップやファイナライゼーション作業を実行することができます。

取得と解放

このセクションでは、ファイル操作を行う際に Stream.acquireRelease を使用する例を見ていきます。

import { Stream, Console, Effect } from "effect";
// ファイル操作のシミュレーション
const open = (filename: string) =>
Effect.gen(function* () {
yield* Console.log(`Opening ${filename}`);
return {
getLines: Effect.succeed(["Line 1", "Line 2", "Line 3"]),
close: Console.log(`Closing ${filename}`),
};
});
const stream = Stream.acquireRelease(
open("file.txt"),
(file) => file.close
).pipe(Stream.flatMap((file) => file.getLines));
Effect.runPromise(Stream.runCollect(stream)).then(console.log);
/*
Output:
Opening file.txt
Closing file.txt
{
_id: "Chunk",
values: [
[ "Line 1", "Line 2", "Line 3" ]
]
}
*/

このコードスニペットでは、open 関数を使用してファイル操作をシミュレートしています。Stream.acquireRelease 関数を使用してファイルが正しく開かれ、閉じられることを保証し、取得したリソースを使ってファイルの行を処理します。

ファイナライゼーション

このセクションでは、ストリームにおけるファイナライゼーションの概念について探究します。ファイナライゼーションを使用することで、ストリームが終了する前に特定のアクションを実行できます。これは、クリーンアップタスクを実行したり、ストリームに最終的なタッチを加える必要がある場合に特に便利です。

ストリーミングアプリケーションが実行を完了するときに一時ディレクトリをクリーンアップする必要があるシナリオを想像してみましょう。これを Stream.finalizer 関数を使用して達成できます。

import { Stream, Console, Effect } from "effect";
const application = Stream.fromEffect(Console.log("Application Logic."));
const deleteDir = (dir: string) => Console.log(`Deleting dir: ${dir}`);
const program = application.pipe(
Stream.concat(
Stream.finalizer(
deleteDir("tmp").pipe(
Effect.andThen(Console.log("Temporary directory was deleted."))
)
)
)
);
Effect.runPromise(Stream.runCollect(program)).then(console.log);
/*
Output:
Application Logic.
Deleting dir: tmp
Temporary directory was deleted.
{
_id: "Chunk",
values: [ undefined, undefined ]
}
*/

このコード例では、application ストリームによって表されるアプリケーションのロジックから始めます。次に、Stream.finalizer を使用してファイナライゼーションステップを定義します。このステップは、一時ディレクトリを削除し、メッセージをログに記録します。これにより、アプリケーションが実行を完了したときに一時ディレクトリが適切にクリーンアップされます。

Ensuring

このセクションでは、ストリームのファイナライゼーション後にアクションを実行する必要があるシナリオについて探求します。これを達成するために、Stream.ensuring オペレーターを利用します。

アプリケーションが主要なロジックを完了し、いくつかのリソースをファイナライズした後に追加のアクションを実行する必要がある状況を考えてみましょう。この目的のために Stream.ensuring を使用できます。

import { Stream, Console, Effect } from "effect";
const program = Stream.fromEffect(Console.log("Application Logic.")).pipe(
Stream.concat(Stream.finalizer(Console.log("Finalizing the stream"))),
Stream.ensuring(
Console.log("Doing some other works after stream's finalization")
)
);
Effect.runPromise(Stream.runCollect(program)).then(console.log);
/*
Output:
Application Logic.
Finalizing the stream
Doing some other works after stream's finalization
{
_id: "Chunk",
values: [ undefined, undefined ]
}
*/

このコード例では、Application Logic. メッセージで表されるアプリケーションのロジックから始めます。その後、Stream.finalizer を使用してファイナライゼーションステップを指定し、Finalizing the stream をログに記録します。その後、Stream.ensuring を使ってストリームのファイナライゼーション後に追加の作業を行うことを示します。この結果として、Doing some other works after stream's finalization というメッセージが表示され、ファイナライゼーション後のアクションが期待通りに実行されることが保証されます。