Unverified Commit c7982fdf authored by Hong Minhee's avatar Hong Minhee
Browse files

`WorkersMessageQueue` class

parent 8b3d47de
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -46,6 +46,7 @@ To be released. Note that 1.6.0 was skipped due to a mistake in the versioning.
     -  Added `Federation.processQueuedTask()` method.  [[#242]]
     -  Added `Message` type.  [[#242]]
     -  Added `WorkersKvStore` class.  [[#241], [#242]]
     -  Added `WorkersMessageQueue` class.  [[#241], [#242]]

 -  The minimum supported version of Node.js is now 22.0.0.

+2 −0
Original line number Diff line number Diff line
@@ -12,6 +12,8 @@ const mf = new Miniflare({
    { type: "ESModule", path: join(import.meta.dirname ?? ".", "server.js") },
  ],
  kvNamespaces: ["KV1", "KV2", "KV3"],
  queueProducers: ["Q1"],
  queueConsumers: { Q1: { maxBatchSize: 1 } },
  async outboundService(request: Request) {
    const url = new URL(request.url);
    if (url.hostname.endsWith(".test")) {
+9 −1
Original line number Diff line number Diff line
@@ -21,6 +21,7 @@ interface TestDefinition {
// @ts-ignore: testDefinitions is untyped
const tests: TestDefinition[] = testDefinitions;
const logs: LogRecord[] = [];
const messageBatches: MessageBatch[] = [];

await configure({
  sinks: {
@@ -98,7 +99,7 @@ export default {
      }
      logs.splice(0, logs.length); // Clear logs
      try {
        await fn({ name, origin: "", step, env });
        await fn({ name, origin: "", step, env, messageBatches });
      } catch (e) {
        failed ??= e;
      }
@@ -130,4 +131,11 @@ export default {
      },
    );
  },
  async queue(
    batch: MessageBatch,
    env: unknown,
    ctx: ExecutionContext
  ): Promise<void> {
    messageBatches.push(batch);
  }
};
+46 −1
Original line number Diff line number Diff line
import type { KVNamespace } from "@cloudflare/workers-types/experimental";
import { delay } from "@es-toolkit/es-toolkit";
import { assertEquals } from "@std/assert";
import { test } from "../testing/mod.ts";
import { WorkersKvStore } from "./cfworkers.ts";
import { WorkersKvStore, WorkersMessageQueue } from "./cfworkers.ts";

test({
  name: "WorkersKvStore",
@@ -30,3 +31,47 @@ test({
    });
  },
});

test({
  name: "WorkersMessageQueue",
  ignore: !("navigator" in globalThis &&
    navigator.userAgent === "Cloudflare-Workers"),
  async fn(t) {
    const { env, messageBatches } = t as unknown as {
      env: Record<string, Queue>;
      messageBatches: MessageBatch[];
    };
    const queue = new WorkersMessageQueue(env.Q1);
    await queue.enqueue({ foo: 1, bar: 2 });
    await waitFor(() => messageBatches.length > 0, 5000);
    assertEquals(messageBatches.length, 1);
    assertEquals(messageBatches[0].queue, "Q1");
    assertEquals(messageBatches[0].messages.length, 1);
    assertEquals(messageBatches[0].messages[0].body, { foo: 1, bar: 2 });

    await queue.enqueue(
      { baz: 3, qux: 4 },
      { delay: Temporal.Duration.from({ seconds: 3 }) },
    );
    await delay(2000);
    assertEquals(messageBatches.length, 1);
    await waitFor(() => messageBatches.length > 1, 6000);
    assertEquals(messageBatches[1].queue, "Q1");
    assertEquals(messageBatches[1].messages.length, 1);
    assertEquals(messageBatches[1].messages[0].body, { baz: 3, qux: 4 });
  },
});

async function waitFor(
  predicate: () => boolean,
  timeoutMs: number,
): Promise<void> {
  let delayed = 0;
  while (!predicate()) {
    await delay(500);
    delayed += 500;
    if (delayed > timeoutMs) {
      throw new Error("Timeout");
    }
  }
}
+76 −4
Original line number Diff line number Diff line
import type { KVNamespace } from "@cloudflare/workers-types/experimental";
/**
 * `KvStore` & `MessageQueue` adapters for Cloudflare Workers
 * ==========================================================
 *
 * This module provides `KvStore` and `MessageQueue` implementations that use
 * Cloudflare Workers' KV and Queues bindings, respectively.
 *
 * @module
 * @since 1.6.0
 */
import type {
  KVNamespace,
  MessageSendRequest,
  Queue,
} from "@cloudflare/workers-types/experimental";
import type { KvKey, KvStore, KvStoreSetOptions } from "../federation/kv.ts";
import type {
  MessageQueue,
  MessageQueueEnqueueOptions,
  MessageQueueListenOptions,
} from "../federation/mq.ts";

interface KvMetadata {
  expires?: number;
}

/**
 * Implementation of the KvStore interface for Cloudflare Workers KV binding.
 * This class provides a wrapper around Cloudflare's KV namespace to store and
 * retrieve JSON-serializable values using structured keys.
 * Implementation of the {@link KvStore} interface for Cloudflare Workers KV
 * binding.  This class provides a wrapper around Cloudflare's KV namespace to
 * store and retrieve JSON-serializable values using structured keys.
 * @since 1.6.0
 */
export class WorkersKvStore implements KvStore {
@@ -58,3 +77,56 @@ export class WorkersKvStore implements KvStore {
    return this.#namespace.delete(this.#encodeKey(key));
  }
}

/**
 * Implementation of the {@link MessageQueue} interface for Cloudflare
 * Workers Queues binding.  This class provides a wrapper around Cloudflare's
 * Queues to send messages to a queue.
 *
 * Note that this implementation does not support the `listen()` method,
 * as Cloudflare Workers Queues do not support message consumption in the same
 * way as other message queue systems.  Instead, you should use
 * the {@link Federation.processQueuedTask} method to process messages
 * passed to the queue.
 * @since 1.6.0
 */
export class WorkersMessageQueue implements MessageQueue {
  #queue: Queue;

  constructor(queue: Queue) {
    this.#queue = queue;
  }

  // deno-lint-ignore no-explicit-any
  enqueue(message: any, options?: MessageQueueEnqueueOptions): Promise<void> {
    return this.#queue.send(message, {
      contentType: "json",
      delaySeconds: options?.delay?.total("seconds") ?? 0,
    });
  }

  enqueueMany(
    // deno-lint-ignore no-explicit-any
    messages: any[],
    options?: MessageQueueEnqueueOptions,
  ): Promise<void> {
    const requests: MessageSendRequest[] = messages.map((msg) => ({
      body: msg,
      contentType: "json",
    }));
    return this.#queue.sendBatch(requests, {
      delaySeconds: options?.delay?.total("seconds") ?? 0,
    });
  }

  listen(
    // deno-lint-ignore no-explicit-any
    _handler: (message: any) => Promise<void> | void,
    _options?: MessageQueueListenOptions,
  ): Promise<void> {
    throw new TypeError(
      "WorkersMessageQueue does not support listen().  " +
        "Use Federation.processQueuedTask() method instead.",
    );
  }
}