Skip to content

PubSub

このガイドでは、非同期メッセージハブであるPubSubの概念を探ります。これにより、パブリッシャーはメッセージをPubSubに送信でき、サブスクライバーはこれらのメッセージを受信できます。

Queueとは異なり、Queueでは1つの値が1人のテイカーによって取られるのに対し、PubSubではパブリッシュされた各値がすべてのサブスクライバーに受信されます。

Queueが値を配布する問題に対する最適なソリューションを表すのに対し、PubSubはそれらをブロードキャストする問題に対する最適なソリューションを表しています。

基本的な操作

PubSubの基本的な操作は、PubSub.publishPubSub.subscribeです。

  • publish操作は、タイプAのメッセージをPubSubに送信します。これは、メッセージが正常にパブリッシュされたかどうかを示すエフェクトを返します。
  • subscribe操作は、PubSubにサブスクライブするためのスコープ付きエフェクトを返します。スコープが閉じると自動的にサブスクリプションが解除されます。スコープ内では、PubSubにパブリッシュされたメッセージをデキューするためのDequeueにアクセスできます。

PubSubの使い方を理解するために、以下の例を見てみましょう:

import { Effect, PubSub, Queue, Console } from "effect"
const program = PubSub.bounded<string>(2).pipe(
Effect.andThen((pubsub) =>
Effect.scoped(
Effect.gen(function* () {
const dequeue1 = yield* PubSub.subscribe(pubsub)
const dequeue2 = yield* PubSub.subscribe(pubsub)
yield* PubSub.publish(pubsub, "Hello from a PubSub!")
yield* Queue.take(dequeue1).pipe(Effect.andThen(Console.log))
yield* Queue.take(dequeue2).pipe(Effect.andThen(Console.log))
})
)
)
)
Effect.runPromise(program)
/*
出力:
Hello from a PubSub!
Hello from a PubSub!
*/

サブスクライバーは、サブスクライブしている間にだけPubSubにパブリッシュされたメッセージを受信することに注意することが重要です。特定のメッセージがサブスクライバーに届くようにするためには、メッセージをパブリッシュする前にサブスクリプションを確立していることを確認してください。

PubSubの作成

PubSubモジュールが提供するさまざまなコンストラクターを使用してPubSubを作成できます。

制限付きPubSub

制限付きPubSubは、容量がいっぱいのときにパブリッシャーにバックプレッシャーを適用します。つまり、PubSubが満杯の場合、パブリッシャーはブロックされます。

import { PubSub } from "effect"
const boundedPubSub = PubSub.bounded<string>(2)

バックプレッシャーにより、すべてのサブスクライバーはサブスクライブしている間、すべてのメッセージを受信することが保証されます。ただし、サブスクライバーが遅い場合、メッセージの配信が遅くなる可能性があります。

ドロップ型PubSub

ドロップ型PubSubは、満杯の場合に値を単純に破棄します。PubSub.publish関数は、PubSubが満杯のときにfalseを返します。

import { PubSub } from "effect"
const droppingPubSub = PubSub.dropping<string>(2)

ドロップ型PubSubでは、パブリッシャーは新しい値を公開し続けることができますが、サブスクライバーがすべてのメッセージを受信できる保証はありません。

スライディング型PubSub

スライディング型PubSubは、満杯のときに最も古い値を破棄し、パブリッシュを常に即座に成功させます。

import { PubSub } from "effect"
const slidingPubSub = PubSub.sliding<string>(2)

スライディング型PubSubは、遅いサブスクライバーの影響を受けずにメッセージの配信レートを確保します。ただし、遅いサブスクライバーが一部のメッセージを見逃すリスクは依然として存在します。

無制限PubSub

無制限PubSubは決して満杯にはならず、パブリッシュは常に即座に成功します。

import { PubSub } from "effect"
const unboundedPubSub = PubSub.unbounded<string>()

無制限PubSubでは、すべてのサブスクライバーがすべてのメッセージを受信でき、メッセージ配信が遅れることはありません。ただし、メッセージが消費されるよりも早くパブリッシュされる場合、無制限PubSubは無限に成長する可能性があります。

一般的に、特定のユースケースがない限り、制限付き、ドロップ型、またはスライディング型PubSubを使用することが推奨されます。

PubSubのオペレーター

PubSubは、キューに利用できるオペレーションに類似したさまざまな操作をサポートしています。

複数の値をパブリッシュ

PubSub.publishAllオペレーターを使用して、一度に複数の値をPubSubにパブリッシュできます。

import { Effect, PubSub, Queue, Console } from "effect"
const program = PubSub.bounded<string>(2).pipe(
Effect.andThen((pubsub) =>
Effect.scoped(
Effect.gen(function* () {
const dequeue = yield* PubSub.subscribe(pubsub)
yield* PubSub.publishAll(pubsub, ["Message 1", "Message 2"])
yield* Queue.takeAll(dequeue).pipe(Effect.andThen(Console.log))
})
)
)
)
Effect.runPromise(program)
/*
出力:
{
_id: "Chunk",
values: [ "Message 1", "Message 2" ]
}
*/

サイズの確認

PubSub.capacityPubSub.sizeを使用して、PubSubの容量と現在のサイズを確認できます。

import { Effect, PubSub, Console } from "effect"
const program = PubSub.bounded<number>(2).pipe(
Effect.tap((pubsub) => Console.log(`capacity: ${PubSub.capacity(pubsub)}`)),
Effect.tap((pubsub) =>
PubSub.size(pubsub).pipe(
Effect.andThen((size) => Console.log(`size: ${size}`))
)
)
)
Effect.runPromise(program)
/*
出力:
capacity: 2
size: 0
*/

capacityはPubSub作成時に設定され、その後変更されないためnumberを返します。一方、sizeはPubSub内のメッセージ数が時間とともに変わるため、現在のサイズを決定するエフェクトを返します。

PubSubのシャットダウン

PubSub.shutdownを使用してPubSubをシャットダウンし、PubSub.isShutdownでシャットダウンされたかどうかを確認し、PubSub.awaitShutdownでそのシャットダウンを待機できます。PubSubをシャットダウンすると、関連するすべてのキューもシャットダウンされ、シャットダウンシグナルの適切な伝播が保証されます。

PubSubをエンキューとして使用

ご覧のとおり、PubSub上のオペレーターは、PubSub.publishPubSub.subscribeQueue.offerQueue.takeに置き換えられることを除いて、Queue上のオペレーターと同一です。したがって、Queueの使い方がわかれば、PubSubの使い方も既にわかっていることになります。

実際には、PubSubは書き込み専用のQueueとして見ることができます:

interface PubSub<A> extends Queue.Enqueue<A> {}

ここで、Enqueueタイプは、エンキューのみ可能なキューを表します。キューにエンキューすることは、PubSubに値をパブリッシュし、キューをシャットダウンするようなアクションもPubSubをシャットダウンします。

この多用途性により、書き込みのみのQueueを使用している場所のすべてでPubSubを使用できます。