Skip to content

キュー

Queueは、合成可能で透明なバックプレッシャーを持つ Effect に基づいた軽量のインメモリキューです。完全に非同期で(ロックやブロッキングなし)、純粋関数型で型安全です。

基本操作

Queue<A>は型Aの値を格納し、2 つの基本操作を提供します:

  • Queue.offer: この操作は型Aの値をQueueに追加します。
  • Queue.take: これはQueueから最も古い値を削除して返します。

以下は、これらの基本操作をデモする例です:

import { Effect, Queue } from "effect";
const program = Effect.gen(function* () {
const queue = yield* Queue.bounded<number>(100);
yield* Queue.offer(queue, 1); // キューに1を追加
const value = yield* Queue.take(queue); // 最も古い値を取得して削除
return value;
});
Effect.runPromise(program).then(console.log); // 出力: 1

キューの作成

Queueは、バウンデッド(容量制限あり)またはアンバウンデッド(容量無制限)のストレージを持つことができます。キューが容量に達したときに新しい値を処理するためのさまざまな戦略から選択できます。

バウンデッドキュー

バウンデッドキューは、満杯のときにバックプレッシャーを提供します。これは、キューが満杯の場合、アイテムを追加しようとすると、空きスペースが利用可能になるまで一時停止することを意味します。

import { Queue } from "effect";
// 容量100のバウンデッドキューを作成
const boundedQueue = Queue.bounded<number>(100);

ドロッピングキュー

ドロッピングキューは、満杯のときに新しいアイテムを単にドロップします。空きができるのを待ちません。

import { Queue } from "effect";
// 容量100のドロッピングキューを作成
const droppingQueue = Queue.dropping<number>(100);

スライディングキュー

スライディングキューは、満杯のときに新しいアイテムを受け入れるために古いアイテムを削除します。

import { Queue } from "effect";
// 容量100のスライディングキューを作成
const slidingQueue = Queue.sliding<number>(100);

アンバウンデッドキュー

アンバウンデッドキューには、容量制限がありません。

import { Queue } from "effect";
// アンバウンデッドキューを作成
const unboundedQueue = Queue.unbounded<number>();

キューへのアイテムの追加

キューに値を追加するには、Queue.offer操作を使用できます:

import { Effect, Queue } from "effect";
const program = Effect.gen(function* () {
const queue = yield* Queue.bounded<number>(100);
yield* Queue.offer(queue, 1); // キューに1を追加
});

バックプレッシャー付きのキューを使用していて、それが満杯の場合、offer操作は一時停止することがあります。このような場合、Effect.forkを使用して異なる実行コンテキスト(ファイバー)で待機することができます。

import { Effect, Queue, Fiber } from "effect";
const program = Effect.gen(function* () {
const queue = yield* Queue.bounded<number>(1);
yield* Queue.offer(queue, 1);
const fiber = yield* Effect.fork(Queue.offer(queue, 2)); // キューが満杯なので一時停止
yield* Queue.take(queue);
yield* Fiber.join(fiber);
});

Queue.offerAllを使用して、一度に複数の値を追加することもできます:

import { Effect, Queue, Array } from "effect";
const program = Effect.gen(function* () {
const queue = yield* Queue.bounded<number>(100);
const items = Array.range(1, 10);
yield* Queue.offerAll(queue, items);
return yield* Queue.size(queue);
});
Effect.runPromise(program).then(console.log); // 出力: 10

キューからのアイテムの消費

Queue.take操作は、キューから最も古いアイテムを削除して返します。キューが空の場合は、一時停止し、アイテムがキューに追加されると再開します。Effect.forkを使用して、別の実行コンテキスト(ファイバー)で値を待つこともできます。

import { Effect, Queue, Fiber } from "effect";
const oldestItem = Effect.gen(function* () {
const queue = yield* Queue.bounded<string>(100);
const fiber = yield* Effect.fork(Queue.take(queue)); // キューが空なので一時停止
yield* Queue.offer(queue, "something");
const value = yield* Fiber.join(fiber);
return value;
});
Effect.runPromise(oldestItem).then(console.log); // 出力: something

Queue.pollを使用して最初のアイテムを取得することができます。キューが空の場合はNone、そうでない場合はトップアイテムがSomeにラップされて返されます。

import { Effect, Queue } from "effect";
const polled = Effect.gen(function* () {
const queue = yield* Queue.bounded<number>(100);
yield* Queue.offer(queue, 10);
yield* Queue.offer(queue, 20);
const head = yield* Queue.poll(queue);
return head;
});
Effect.runPromise(polled).then(console.log);
/*
出力:
{
_id: "Option",
_tag: "Some",
value: 10
}
*/

Queue.takeUpToを使用して、一度に複数のアイテムを取得することもできます。キューに戻すべきアイテムが十分にない場合は、利用可能なアイテムをすべて返します。

import { Effect, Queue } from "effect";
const polled = Effect.gen(function* () {
const queue = yield* Queue.bounded<number>(100);
yield* Queue.offer(queue, 10);
yield* Queue.offer(queue, 20);
yield* Queue.offer(queue, 30);
const chunk = yield* Queue.takeUpTo(queue, 2);
return chunk;
});
Effect.runPromise(polled).then(console.log);
/*
出力:
{
_id: "Chunk",
values: [ 10, 20 ]
}
*/

同様に、Queue.takeAllを使用して、すべてのアイテムを一度に取得することができます。キューが空の場合は即座に空のコレクションを提供します。

import { Effect, Queue } from "effect";
const polled = Effect.gen(function* () {
const queue = yield* Queue.bounded<number>(100);
yield* Queue.offer(queue, 10);
yield* Queue.offer(queue, 20);
yield* Queue.offer(queue, 30);
const chunk = yield* Queue.takeAll(queue);
return chunk;
});
Effect.runPromise(polled).then(console.log);
/*
出力:
{
_id: "Chunk",
values: [ 10, 20, 30 ]
}
*/

キューのシャットダウン

Queue.shutdownを使用すると、offer*take*で一時停止しているすべてのファイバーを中断することができます。また、キューを空にし、すべての将来のoffer*およびtake*呼び出しを即座に終了させます。

import { Effect, Queue, Fiber } from "effect";
const program = Effect.gen(function* () {
const queue = yield* Queue.bounded<number>(3);
const fiber = yield* Effect.fork(Queue.take(queue));
yield* Queue.shutdown(queue); // ファイバーを中断
yield* Fiber.join(fiber); // 終了する
});

Queue.awaitShutdownを使用すると、キューがシャットダウンされたときにエフェクトを実行できます。この関数は、キューがシャットダウンするまで待機し、既にシャットダウンされている場合は即座に再開します。

import { Effect, Queue, Fiber, Console } from "effect";
const program = Effect.gen(function* () {
const queue = yield* Queue.bounded<number>(3);
const fiber = yield* Effect.fork(
Queue.awaitShutdown(queue).pipe(
Effect.andThen(Console.log("シャットダウンしています"))
)
);
yield* Queue.shutdown(queue);
yield* Fiber.join(fiber);
});
Effect.runPromise(program); // 出力: シャットダウンしています

Offer-only / Take-only キュー

特定のコード部分に対して値をオファーする(Enqueue)または値を取り出す(Dequeue)という排他的な機能が必要な場合があります。Effect は、これを達成するための簡単な方法を提供します。

値をオファーすることに関連するすべての操作はEnqueueインターフェースによって定義されています。以下はその使用例です:

import { Queue } from "effect";
const send = (offerOnlyQueue: Queue.Enqueue<number>, value: number) => {
// このエンキューは値のオファーにのみ使用できます
// @ts-expect-error
Queue.take(offerOnlyQueue);
// 問題なし
return Queue.offer(offerOnlyQueue, value);
};

同様に、値を取り出すことに関連するすべての操作はDequeueインターフェースによって定義されています。以下はその例です:

import { Queue } from "effect";
const receive = (takeOnlyQueue: Queue.Dequeue<number>) => {
// このデキューは値を取り出すためにのみ使用できます
// @ts-expect-error
Queue.offer(takeOnlyQueue, 1);
// 問題なし
return Queue.take(takeOnlyQueue);
};

Queue型はEnqueueDequeueの両方を拡張しており、どの部分のコードにおいてもEnqueueまたはDequeueの振る舞いを強制することが可能です。

import { Effect, Queue } from "effect";
const send = (offerOnlyQueue: Queue.Enqueue<number>, value: number) => {
return Queue.offer(offerOnlyQueue, value);
};
const receive = (takeOnlyQueue: Queue.Dequeue<number>) => {
return Queue.take(takeOnlyQueue);
};
const program = Effect.gen(function* () {
const queue = yield* Queue.unbounded<number>();
// キューに値をオファーする
yield* send(queue, 1);
yield* send(queue, 2);
// キューから値を取り出す
console.log(yield* receive(queue));
console.log(yield* receive(queue));
});
Effect.runPromise(program);
/*
出力:
1
2
*/