Unverified Commit 5b5e643a authored by Hong Minhee's avatar Hong Minhee
Browse files

Add enqueueMany method and fix flaky tests



- Add enqueueMany method to efficiently send multiple messages
- Fix flaky tests by removing dependency on message order:
  - Create separate arrays for tracking messages from each listener
  - Add allMessages array to track all received messages
  - Update assertions to check for message inclusion rather than exact order
  - Use message presence for waitFor conditions instead of array length

Co-Authored-By: default avatarClaude <noreply@anthropic.com>
parent 47273779
Loading
Loading
Loading
Loading
+3 −0
Original line number Diff line number Diff line
@@ -72,6 +72,9 @@ Changelog

To be released.

 -  Added `AmqpMessageQueue.enqueueMany()` method for efficiently enqueuing
    multiple messages at once.

 -  Updated *@js-temporal/polyfill* to 0.5.0 for Node.js and Bun. On Deno,
    there is no change because the polyfill is not used.

+45 −7
Original line number Diff line number Diff line
@@ -19,23 +19,27 @@ Deno.test("AmqpMessageQueue", async (t) => {
  const mq = new AmqpMessageQueue(conn, { queue, delayedQueuePrefix });
  const mq2 = new AmqpMessageQueue(conn2, { queue, delayedQueuePrefix });

  const messages: string[] = [];
  const messages1: string[] = [];
  const messages2: string[] = [];
  const allMessages: string[] = [];
  const controller = new AbortController();
  const listening = mq.listen((message: string) => {
    messages.push(message);
    messages1.push(message);
    allMessages.push(message);
  }, { signal: controller.signal });
  const listening2 = mq2.listen((message: string) => {
    messages.push(message);
    messages2.push(message);
    allMessages.push(message);
  }, { signal: controller.signal });

  await t.step("enqueue()", async () => {
    await mq.enqueue("Hello, world!");
  });

  await waitFor(() => messages.length > 0, 15_000);
  await waitFor(() => allMessages.length > 0, 15_000);

  await t.step("listen()", () => {
    assertEquals(messages, ["Hello, world!"]);
    assertEquals(allMessages.includes("Hello, world!"), true);
  });

  let started = 0;
@@ -47,13 +51,47 @@ Deno.test("AmqpMessageQueue", async (t) => {
    );
  });

  await waitFor(() => messages.length > 1, 15_000);
  await waitFor(() => allMessages.includes("Delayed message"), 15_000);

  await t.step("listen() with delay", () => {
    assertEquals(messages, ["Hello, world!", "Delayed message"]);
    assertEquals(allMessages.includes("Delayed message"), true);
    assertGreater(Date.now() - started, 3_000);
  });

  await t.step("enqueueMany()", async () => {
    await mq.enqueueMany(["Message 1", "Message 2", "Message 3"]);
  });

  await waitFor(() =>
    allMessages.includes("Message 1") &&
    allMessages.includes("Message 2") &&
    allMessages.includes("Message 3"), 15_000);

  await t.step("listen() after enqueueMany()", () => {
    assertEquals(allMessages.includes("Message 1"), true);
    assertEquals(allMessages.includes("Message 2"), true);
    assertEquals(allMessages.includes("Message 3"), true);
  });

  let manyStarted = 0;
  await t.step("enqueueMany() with delay", async () => {
    manyStarted = Date.now();
    await mq.enqueueMany(
      ["Delayed batch 1", "Delayed batch 2"],
      { delay: Temporal.Duration.from({ seconds: 3 }) },
    );
  });

  await waitFor(() =>
    allMessages.includes("Delayed batch 1") &&
    allMessages.includes("Delayed batch 2"), 15_000);

  await t.step("listen() after enqueueMany() with delay", () => {
    assertEquals(allMessages.includes("Delayed batch 1"), true);
    assertEquals(allMessages.includes("Delayed batch 2"), true);
    assertGreater(Date.now() - manyStarted, 3_000);
  });

  controller.abort();
  await listening;
  await listening2;
+34 −0
Original line number Diff line number Diff line
@@ -113,6 +113,40 @@ export class AmqpMessageQueue implements MessageQueue {
    );
  }

  async enqueueMany(
    // deno-lint-ignore no-explicit-any
    messages: any[],
    options?: MessageQueueEnqueueOptions,
  ): Promise<void> {
    const channel = await this.#getSenderChannel();
    const delay = options?.delay?.total("millisecond");
    let queue: string;
    if (delay == null || delay <= 0) {
      queue = this.#queue;
    } else {
      const delayStr = delay.toLocaleString("en", { useGrouping: false });
      queue = this.#delayedQueuePrefix + delayStr;
      await channel.assertQueue(queue, {
        autoDelete: true,
        durable: this.#durable,
        deadLetterExchange: "",
        deadLetterRoutingKey: this.#queue,
        messageTtl: delay,
      });
    }

    for (const message of messages) {
      channel.sendToQueue(
        queue,
        Buffer.from(JSON.stringify(message), "utf-8"),
        {
          persistent: this.#durable,
          contentType: "application/json",
        },
      );
    }
  }

  async listen(
    // deno-lint-ignore no-explicit-any
    handler: (message: any) => void | Promise<void>,