Unverified Commit 7b169275 authored by Hong Minhee's avatar Hong Minhee
Browse files

Add enqueueMany() method to MessageQueue interface

- Add optional enqueueMany() method to MessageQueue interface for batch sending
- Implement enqueueMany() in InProcessMessageQueue and ParallelMessageQueue
- Update middleware to use enqueueMany() when available for better performance
- Add tests for the new method
- Update documentation
parent 6bb4c90e
Loading
Loading
Loading
Loading
+3 −0
Original line number Diff line number Diff line
@@ -64,6 +64,9 @@ To be released.
     -  Deprecated the fourth parameter of the `ObjectAuthorizePredicate` type
        in favor of the `RequestContext.getSignedKeyOwner()` method.

 -  Added an optional method `enqueueMany()` to `MessageQueue` interface
    for sending multiple activities at once.

 -  Fixed a bug of the `fedify inbox` command where it had failed to render
    the web interface when the `fedify` command was installed using
    `deno install` command from JSR.
+14 −0
Original line number Diff line number Diff line
@@ -256,6 +256,20 @@ Handle the `~MessageQueueEnqueueOptions.delay` option if provided in
`MessageQueueEnqueueOptions`.  Ensure the method is non-blocking
(use async operations where necessary).

### Implement `~MessageQueue.enqueueMany()` method (optional)

*This API is available since Fedify 1.5.0.*

This method should add multiple messages to your queue system at once.
Handle the `~MessageQueueEnqueueOptions.delay` option if provided in
`MessageQueueEnqueueOptions`.  Ensure the method is non-blocking
(use async operations where necessary).

Although this method is optional, it's recommended to implement it
for better performance when enqueuing multiple messages at once.
Otherwise, Fedify will call `~MessageQueue.enqueue()` for each message
individually, which may be less efficient.

### Implement `~MessageQueue.listen()` method

This method should start a process that listens for new messages.
+62 −30
Original line number Diff line number Diff line
@@ -2311,7 +2311,7 @@ export class FederationImpl<TContextData> implements Federation<TContextData> {
    if (!this.manuallyStartQueue) this._startQueueInternal(ctx.data);
    const carrier: Record<string, string> = {};
    propagation.inject(context.active(), carrier);
    const promises: Promise<void>[] = [];
    const messages: OutboxMessage[] = [];
    for (const inbox in inboxes) {
      const message: OutboxMessage = {
        type: "outbox",
@@ -2334,8 +2334,13 @@ export class FederationImpl<TContextData> implements Federation<TContextData> {
        },
        traceContext: carrier,
      };
      promises.push(this.outboxQueue.enqueue(message));
      messages.push(message);
    }
    const { outboxQueue } = this;
    if (outboxQueue.enqueueMany == null) {
      const promises: Promise<void>[] = messages.map((m) =>
        outboxQueue.enqueue(m)
      );
      const results = await Promise.allSettled(promises);
      const errors = results
        .filter((r) => r.status === "rejected")
@@ -2353,6 +2358,17 @@ export class FederationImpl<TContextData> implements Federation<TContextData> {
        }
        throw errors[0];
      }
    } else {
      try {
        await outboxQueue.enqueueMany(messages);
      } catch (error) {
        logger.error(
          "Failed to enqueue activity {activityId} to send later: {error}",
          { activityId: activity.id!.href, error },
        );
        throw error;
      }
    }
  }

  fetch(
@@ -3957,7 +3973,7 @@ export class InboxContextImpl<TContextData> extends ContextImpl<TContextData>
    }
    const carrier: Record<string, string> = {};
    propagation.inject(context.active(), carrier);
    const promises: Promise<void>[] = [];
    const messages: OutboxMessage[] = [];
    for (const inbox in inboxes) {
      const message: OutboxMessage = {
        type: "outbox",
@@ -3974,8 +3990,13 @@ export class InboxContextImpl<TContextData> extends ContextImpl<TContextData>
        headers: {},
        traceContext: carrier,
      };
      promises.push(this.federation.outboxQueue.enqueue(message));
      messages.push(message);
    }
    const { outboxQueue } = this.federation;
    if (outboxQueue.enqueueMany == null) {
      const promises: Promise<void>[] = messages.map((m) =>
        outboxQueue.enqueue(m)
      );
      const results = await Promise.allSettled(promises);
      const errors: unknown[] = results
        .filter((r) => r.status === "rejected")
@@ -3993,6 +4014,17 @@ export class InboxContextImpl<TContextData> extends ContextImpl<TContextData>
        }
        throw errors[0];
      }
    } else {
      try {
        await outboxQueue.enqueueMany(messages);
      } catch (error) {
        logger.error(
          "Failed to enqueue activity {activityId} to forward later:\n{error}",
          { activityId: this.activityId, error },
        );
        throw error;
      }
    }
  }
}

+65 −2
Original line number Diff line number Diff line
@@ -43,6 +43,53 @@ test("InProcessMessageQueue", async (t) => {
    assertGreater(Date.now() - started, 3_000);
  });

  // Clear messages array
  while (messages.length > 0) messages.pop();

  await t.step("enqueueMany()", async () => {
    const testMessages = Array.from(
      { length: 5 },
      (_, i) => `Batch message ${i}!`,
    );
    await mq.enqueueMany(testMessages);
  });

  await waitFor(() => messages.length >= 5, 15_000);

  await t.step("listen() [multiple]", () => {
    assertEquals(messages.length, 5);
    for (let i = 0; i < 5; i++) {
      assertEquals(messages[i], `Batch message ${i}!`);
    }
  });

  // Clear messages array
  while (messages.length > 0) messages.pop();

  started = 0;
  await t.step("enqueueMany() with delay", async () => {
    started = Date.now();
    const testMessages = Array.from(
      { length: 3 },
      (_, i) => `Delayed batch ${i}!`,
    );
    await mq.enqueueMany(
      testMessages,
      { delay: Temporal.Duration.from({ seconds: 2 }) },
    );
    assertEquals(messages.length, 0);
  });

  await waitFor(() => messages.length >= 3, 15_000);

  await t.step("listen() [delayed multiple]", () => {
    assertEquals(messages.length, 3);
    assertGreater(Date.now() - started, 2_000);
    for (let i = 0; i < 3; i++) {
      assertEquals(messages[i], `Delayed batch ${i}!`);
    }
  });

  controller.abort();
  await listening;
});
@@ -82,7 +129,7 @@ for (const mqName in queues) {
    }, controller);

    await t.step("enqueue() [single]", async () => {
      await mq.enqueue("Hello, world!");
      await workers.enqueue("Hello, world!");
    });

    await waitFor(() => messages.length > 0, 15_000);
@@ -95,7 +142,7 @@ for (const mqName in queues) {

    await t.step("enqueue() [multiple]", async () => {
      for (let i = 0; i < 20; i++) {
        await mq.enqueue(`Hello, ${i}!`);
        await workers.enqueue(`Hello, ${i}!`);
      }
    });

@@ -108,6 +155,22 @@ for (const mqName in queues) {

    await waitFor(() => messages.length >= 20, 15_000);

    while (messages.length > 0) messages.pop();

    await t.step("enqueueMany()", async () => {
      const messages = Array.from({ length: 20 }, (_, i) => `Hello, ${i}!`);
      await workers.enqueueMany(messages);
    });

    await t.step("listen() [multiple]", async () => {
      await delay(10 * 250 + 500);
      assertGreaterOrEqual(messages.length, 5);
      await waitFor(() => messages.length >= 20, 15_000);
      assertEquals(messages.length, 20);
    });

    await waitFor(() => messages.length >= 20, 15_000);

    controller.abort();
    await listening;

+54 −0
Original line number Diff line number Diff line
@@ -39,6 +39,19 @@ export interface MessageQueue {
   */
  enqueue(message: any, options?: MessageQueueEnqueueOptions): Promise<void>;

  /**
   * Enqueues multiple messages in the queue.  This operation is optional,
   * and may not be supported by all implementations.  If not supported,
   * Fedify will invoke {@link enqueue} for each message.
   *
   * @param messages The messages to enqueue.
   * @param options Additional options for enqueuing the messages.
   */
  enqueueMany?: (
    messages: any[],
    options?: MessageQueueEnqueueOptions,
  ) => Promise<void>;

  /**
   * Listens for messages in the queue.
   * @param handler The handler for messages in the queue.
@@ -107,6 +120,28 @@ export class InProcessMessageQueue implements MessageQueue {
    return Promise.resolve();
  }

  enqueueMany(
    messages: any[],
    options?: MessageQueueEnqueueOptions,
  ): Promise<void> {
    if (messages.length === 0) return Promise.resolve();
    const delay = options?.delay == null
      ? 0
      : Math.max(options.delay.total("millisecond"), 0);
    if (delay > 0) {
      setTimeout(
        () => this.enqueueMany(messages, { ...options, delay: undefined }),
        delay,
      );
      return Promise.resolve();
    }
    this.#messages.push(...messages);
    for (const monitorId in this.#monitors) {
      this.#monitors[monitorId as ReturnType<typeof crypto.randomUUID>]();
    }
    return Promise.resolve();
  }

  async listen(
    handler: (message: any) => Promise<void> | void,
    options: MessageQueueListenOptions = {},
@@ -183,6 +218,25 @@ export class ParallelMessageQueue implements MessageQueue {
    return this.queue.enqueue(message, options);
  }

  async enqueueMany(
    messages: any[],
    options?: MessageQueueEnqueueOptions,
  ): Promise<void> {
    if (this.queue.enqueueMany == null) {
      const results = await Promise.allSettled(
        messages.map((message) => this.queue.enqueue(message, options)),
      );
      const errors = results
        .filter((r): r is PromiseRejectedResult => r.status === "rejected")
        .map((r) => r.reason);
      if (errors.length > 1) {
        throw new AggregateError(errors, "Failed to enqueue messages.");
      } else if (errors.length === 1) throw errors[0];
      return;
    }
    await this.queue.enqueueMany(messages, options);
  }

  listen(
    handler: (message: any) => Promise<void> | void,
    options: MessageQueueListenOptions = {},