Skip to content

SubscriptionRef

SubscriptionRef<A>は、SynchronizedRefの特別な形式です。これにより、現在の値およびその値に加えられた変更の更新を購読し、受け取ることができます。

export interface SubscriptionRef<A> extends Synchronized.SynchronizedRef<A> {
/**
* `Ref`の現在の値とその値のすべての変更を含むストリーム。
*/
readonly changes: Stream<A>;
}

SubscriptionRefに対しては、現在の値を扱うために、getset、またはmodifyなどの通常の操作をすべて実行できます。

changesストリームが、実際の動作が行われる場所です。これにより、現在の値とその後のすべての変更を観察することができます。このストリームを実行するたびに、その時点での現在の値とその後発生した変更が取得できます。

SubscriptionRefを作成するには、初期値を指定してmakeコンストラクタを使用します。

import { SubscriptionRef } from "effect";
const ref = SubscriptionRef.make(0);

SubscriptionRefは、特に複数のオブザーバーが共有状態のすべての変更に反応する必要がある場合に、共有状態をモデル化する際に非常に価値があります。たとえば、関数型リアクティブプログラミングの文脈では、SubscriptionRefの値がアプリケーション状態の一部を表し、各オブザーバーがその状態の変更に基づいてさまざまなユーザーインターフェース要素を更新することができます。

これがどのように機能するかを示すために、「サーバー」が複数の「クライアント」によって観察される値を繰り返し更新するシンプルな例を作成しましょう:

import { Ref, Effect } from "effect";
const server = (ref: Ref.Ref<number>) =>
Ref.update(ref, (n) => n + 1).pipe(Effect.forever);

server関数は、通常のRefで動作し、SubscriptionRefについては特に気に留めていません。単に値を更新します。

import { Ref, Effect, Stream, Random } from "effect";
const server = (ref: Ref.Ref<number>) =>
Ref.update(ref, (n) => n + 1).pipe(Effect.forever);
const client = (changes: Stream.Stream<number>) =>
Effect.gen(function* () {
const n = yield* Random.nextIntBetween(1, 10);
const chunk = yield* Stream.runCollect(Stream.take(changes, n));
return chunk;
});

同様に、client関数も値のStreamのみで動作し、これらの値のソースについては関心を持ちません。

すべてを統合するために、サーバーを開始し、並行して複数のクライアントインスタンスを立ち上げ、終了時にサーバーをシャットダウンします。このプロセスでSubscriptionRefも作成します。

import { Ref, Effect, Stream, Random, SubscriptionRef, Fiber } from "effect";
const server = (ref: Ref.Ref<number>) =>
Ref.update(ref, (n) => n + 1).pipe(Effect.forever);
const client = (changes: Stream.Stream<number>) =>
Effect.gen(function* () {
const n = yield* Random.nextIntBetween(1, 10);
const chunk = yield* Stream.runCollect(Stream.take(changes, n));
return chunk;
});
const program = Effect.gen(function* () {
const ref = yield* SubscriptionRef.make(0);
const serverFiber = yield* Effect.fork(server(ref));
const clients = new Array(5).fill(null).map(() => client(ref.changes));
const chunks = yield* Effect.all(clients, { concurrency: "unbounded" });
yield* Fiber.interrupt(serverFiber);
for (const chunk of chunks) {
console.log(chunk);
}
});
Effect.runPromise(program);
/*
Output:
{
_id: "Chunk",
values: [ 2, 3, 4 ]
}
{
_id: "Chunk",
values: [ 2 ]
}
{
_id: "Chunk",
values: [ 2, 3, 4, 5, 6, 7 ]
}
{
_id: "Chunk",
values: [ 2, 3, 4 ]
}
{
_id: "Chunk",
values: [ 2, 3, 4, 5, 6, 7, 8, 9 ]
}
*/

この設定により、各クライアントは開始時に現在の値を観察し、その値へのすべての変更を受け取ることができます。

変更はストリームとして表されるため、馴染みのあるストリームオペレーターを使用して、より複雑なプログラムを簡単に構築できます。これらのストリームを変換、フィルタリング、または他のストリームとマージして、より洗練された動作を実現できます。