Unverified Commit e0101b9f authored by Hong Minhee's avatar Hong Minhee
Browse files

Separate settings of MQs for inbox and outbox

parent 369ed297
Loading
Loading
Loading
Loading
+9 −0
Original line number Diff line number Diff line
@@ -8,6 +8,15 @@ Version 1.3.0

To be released.

 -  `MessageQueue`s now can be differently configured for incoming and outgoing
    activities.

     -  Changed the type of `CreateFederationOptions.queue` option to
        `FederationQueueOptions | MessageQueue | undefined` (was `MessageQueue |
        undefined`).
     -  Added `FederationQueueOptions` interface.
     -  Added `FederationStartQueueOptions.queue` option.

 -  Fedify now makes HTTP requests with the proper `User-Agent` header. [[#162]]

     -  Added `getUserAgent()` function.
+58 −0
Original line number Diff line number Diff line
@@ -116,6 +116,64 @@ Further details are explained in the [*Message queue* section](./mq.md).
> will be sent immediately.  This can make delivery of activities unreliable
> and can cause performance issues.

> [!TIP]
> Since Fedify 1.3.0, you can separately configure the message queue for
> incoming and outgoing activities by providing an object with `inbox` and
> `outbox` properties:
>
> ~~~~ typescript twoslash
> // @noErrors: 2353
> import {
>   createFederation,
>   type KvStore,
>   MemoryKvStore,
>   type MessageQueue,
> } from "@fedify/fedify";
> import { PostgresMessageQueue } from "@fedify/postgres";
> import { RedisMessageQueue } from "@fedify/redis";
> import postgres from "postgres";
> import Redis from "ioredis";
>
> createFederation<void>({
> kv: null as unknown as KvStore,
> // ---cut-before---
> queue: {
>   inbox: new PostgresMessageQueue(
>     postgres("postgresql://user:pass@localhost/db")
>   ),
>   outbox: new RedisMessageQueue(() => new Redis()),
> }
> // ---cut-after---
> });
> ~~~~
>
> Or, you can provide a message queue for only the `inbox` or `outbox` by
> omitting the other:
>
> ~~~~ typescript twoslash
> // @noErrors: 2353
> import {
>   createFederation,
>   type KvStore,
>   MemoryKvStore,
>   type MessageQueue,
> } from "@fedify/fedify";
> import { PostgresMessageQueue } from "@fedify/postgres";
> import postgres from "postgres";
>
> createFederation<void>({
> kv: null as unknown as KvStore,
> // ---cut-before---
> queue: {
>   inbox: new PostgresMessageQueue(
>     postgres("postgresql://user:pass@localhost/db")
>   ),
>   // outbox is not provided; outgoing activities will not be queued.
> }
> // ---cut-after---
> });
> ~~~~

[`RedisMessageQueue`]: https://jsr.io/@fedify/redis/doc/mq/~/RedisMessageQueue
[`PostgresMessageQueue`]: https://jsr.io/@fedify/postgres/doc/mq/~/PostgresMessageQueue
[@fedify/amqp]: https://github.com/dahlia/fedify-amqp
+136 −0
Original line number Diff line number Diff line
@@ -405,3 +405,139 @@ server process responsive by offloading message processing to worker nodes.
> [!NOTE]
> To ensure that messages are enqueued only from the `NODE_TYPE=web` nodes,
> you should not place the `NODE_TYPE=worker` nodes behind a load balancer.


Using different message queues for different tasks
--------------------------------------------------

*This API is available since Fedify 1.3.0.*

In some cases, you may want to use different message queues for different tasks,
such as using a faster-but-less-persistent queue for outgoing activities and
a slower-but-more-persistent queue for incoming activities.  To achieve this,
you can pass `FederationQueueOptions` to the `CreateFederationOptions.queue`
option.

For example, the following code shows how to use a [`PostgresMessageQueue`] for
the inbox and a [`RedisMessageQueue`] for the outbox:

~~~~ typescript twoslash
// @noErrors: 2353
import {
  createFederation,
  type KvStore,
  MemoryKvStore,
  type MessageQueue,
} from "@fedify/fedify";
import { PostgresMessageQueue } from "@fedify/postgres";
import { RedisMessageQueue } from "@fedify/redis";
import postgres from "postgres";
import Redis from "ioredis";

// ---cut-before---
const federation = createFederation<void>({
// ---cut-start---
  kv: null as unknown as KvStore,
// ---cut-end---
  queue: {
    inbox: new PostgresMessageQueue(
      postgres("postgresql://user:pass@localhost/db")
    ),
    outbox: new RedisMessageQueue(() => new Redis()),
  },
  // ... other options
});
~~~~

Or, you can provide a message queue for only the `inbox` or `outbox` by omitting
the other:

~~~~ typescript twoslash
// @noErrors: 2353
import {
  createFederation,
  type KvStore,
  MemoryKvStore,
  type MessageQueue,
} from "@fedify/fedify";
import { PostgresMessageQueue } from "@fedify/postgres";
import postgres from "postgres";

// ---cut-before---
const federation = createFederation<void>({
// ---cut-start---
  kv: null as unknown as KvStore,
// ---cut-end---
  queue: {
    inbox: new PostgresMessageQueue(
      postgres("postgresql://user:pass@localhost/db")
    ),
    // outbox is not provided; outgoing activities will not be queued
  },
  // ... other options
});
~~~~

When you [manually start a task
worker](#separating-message-processing-from-the-main-process), you can specify
which queue to start (if `queue` is not provided in the options, it will start
all queues).  The following example shows how to start only the `inbox` queue:

::: code-group

~~~~ typescript{11-17} twoslash [Deno]
// @noErrors: 2353
import type { KvStore } from "@fedify/fedify";
import { createFederation } from "@fedify/fedify";
import { RedisMessageQueue } from "@fedify/redis";
import Redis from "ioredis";

const federation = createFederation<void>({
  queue: new RedisMessageQueue(() => new Redis()),
  manuallyStartQueue: true,  // [!code highlight]
  // ... other options
  // ---cut-start---
  kv: null as unknown as KvStore,
  // ---cut-end---
});

// ---cut-before---
if (Deno.env.get("NODE_TYPE") === "worker") {
  const controller = new AbortController();
  Deno.addSignalListener("SIGINT", () => controller.abort());
  await federation.startQueue(undefined, {
    signal: controller.signal,
    queue: "inbox",  // [!code highlight]
  });
}
~~~~

~~~~ typescript{12-18} twoslash [Node.js/Bun]
// @noErrors: 2353
import type { KvStore } from "@fedify/fedify";
import { createFederation } from "@fedify/fedify";
import { RedisMessageQueue } from "@fedify/redis";
import Redis from "ioredis";
import process from "node:process";

const federation = createFederation<void>({
  queue: new RedisMessageQueue(() => new Redis()),
  manuallyStartQueue: true,  // [!code highlight]
  // ... other options
  // ---cut-start---
  kv: null as unknown as KvStore,
  // ---cut-end---
});

// ---cut-before---
if (process.env.NODE_TYPE === "worker") {
  const controller = new AbortController();
  process.on("SIGINT", () => controller.abort());
  await federation.startQueue(undefined, {
    signal: controller.signal,
    queue: "inbox",  // [!code highlight]
  });
}
~~~~

:::
+7 −0
Original line number Diff line number Diff line
@@ -26,6 +26,13 @@ export interface FederationStartQueueOptions {
   * The signal to abort the task queue.
   */
  signal?: AbortSignal;

  /**
   * Starts the task worker only for the specified queue.  If unspecified,
   * which is the default, the task worker starts for both the inbox and outbox.
   * @since 1.3.0
   */
  queue?: "inbox" | "outbox";
}

/**
+82 −23
Original line number Diff line number Diff line
@@ -103,11 +103,16 @@ export interface CreateFederationOptions {
  kvPrefixes?: Partial<FederationKvPrefixes>;

  /**
   * The message queue for sending activities to recipients' inboxes.
   * If not provided, activities will not be queued and will be sent
   * immediately.
   * The message queue for sending and receiving activities.  If not provided,
   * activities will not be queued and will be processed immediately.
   *
   * If a `MessageQueue` is provided, both the `inbox` and `outbox` queues
   * will be set to the same queue.
   *
   * If a `FederationQueueOptions` object is provided, you can set the queues
   * separately (since Fedify 1.3.0).
   */
  queue?: MessageQueue;
  queue?: FederationQueueOptions | MessageQueue;

  /**
   * Whether to start the task queue manually or automatically.
@@ -218,6 +223,24 @@ export interface CreateFederationOptions {
  trailingSlashInsensitive?: boolean;
}

/**
 * Configures the task queues for sending and receiving activities.
 * @since 1.3.0
 */
export interface FederationQueueOptions {
  /**
   * The message queue for incoming activities.  If not provided, incoming
   * activities will not be queued and will be processed immediately.
   */
  inbox?: MessageQueue;

  /**
   * The message queue for outgoing activities.  If not provided, outgoing
   * activities will not be queued and will be sent immediately.
   */
  outbox?: MessageQueue;
}

/**
 * Prefixes for namespacing keys in the Deno KV store.
 */
@@ -257,8 +280,10 @@ export function createFederation<TContextData>(
export class FederationImpl<TContextData> implements Federation<TContextData> {
  kv: KvStore;
  kvPrefixes: FederationKvPrefixes;
  queue?: MessageQueue;
  queueStarted: boolean;
  inboxQueue?: MessageQueue;
  outboxQueue?: MessageQueue;
  inboxQueueStarted: boolean;
  outboxQueueStarted: boolean;
  manuallyStartQueue: boolean;
  router: Router;
  nodeInfoDispatcher?: NodeInfoDispatcher<TContextData>;
@@ -335,8 +360,18 @@ export class FederationImpl<TContextData> implements Federation<TContextData> {
      } satisfies FederationKvPrefixes),
      ...(options.kvPrefixes ?? {}),
    };
    this.queue = options.queue;
    this.queueStarted = false;
    if (options.queue == null) {
      this.inboxQueue = undefined;
      this.outboxQueue = undefined;
    } else if ("enqueue" in options.queue && "listen" in options.queue) {
      this.inboxQueue = options.queue;
      this.outboxQueue = options.queue;
    } else {
      this.inboxQueue = options.queue.inbox;
      this.outboxQueue = options.queue.outbox;
    }
    this.inboxQueueStarted = false;
    this.outboxQueueStarted = false;
    this.manuallyStartQueue = options.manuallyStartQueue ?? false;
    this.router = new Router({
      trailingSlashInsensitive: options.trailingSlashInsensitive,
@@ -390,16 +425,40 @@ export class FederationImpl<TContextData> implements Federation<TContextData> {
  async #startQueue(
    ctxData: TContextData,
    signal?: AbortSignal,
    queue?: keyof FederationQueueOptions,
  ): Promise<void> {
    if (this.queue != null && !this.queueStarted) {
    if (this.inboxQueue == null && this.outboxQueue == null) return;
    const logger = getLogger(["fedify", "federation", "queue"]);
      logger.debug("Starting a task queue.");
      this.queueStarted = true;
      await this.queue?.listen(
    const promises: Promise<void>[] = [];
    if (
      this.inboxQueue != null && (queue == null || queue === "inbox") &&
      !this.inboxQueueStarted
    ) {
      logger.debug("Starting an inbox task worker.");
      this.inboxQueueStarted = true;
      promises.push(
        this.inboxQueue.listen(
          (msg) => this.#listenQueue(ctxData, msg),
          { signal },
        ),
      );
    }
    if (
      this.outboxQueue != null &&
      this.outboxQueue !== this.inboxQueue &&
      (queue == null || queue === "outbox") &&
      !this.outboxQueueStarted
    ) {
      logger.debug("Starting an outbox task worker.");
      this.outboxQueueStarted = true;
      promises.push(
        this.outboxQueue.listen(
          (msg) => this.#listenQueue(ctxData, msg),
          { signal },
        ),
      );
    }
    await Promise.all(promises);
  }

  #listenQueue(ctxData: TContextData, message: Message): Promise<void> {
@@ -475,7 +534,7 @@ export class FederationImpl<TContextData> implements Federation<TContextData> {
            "#{attempt}); retry...:\n{error}",
          { ...logData, error },
        );
        this.queue?.enqueue(
        this.outboxQueue?.enqueue(
          {
            ...message,
            attempt: message.attempt + 1,
@@ -592,7 +651,7 @@ export class FederationImpl<TContextData> implements Federation<TContextData> {
            recipient: message.identifier,
          },
        );
        this.queue?.enqueue(
        this.inboxQueue?.enqueue(
          {
            ...message,
            attempt: message.attempt + 1,
@@ -636,7 +695,7 @@ export class FederationImpl<TContextData> implements Federation<TContextData> {
    contextData: TContextData,
    options: FederationStartQueueOptions = {},
  ): Promise<void> {
    return this.#startQueue(contextData, options.signal);
    return this.#startQueue(contextData, options.signal, options.queue);
  }

  createContext(baseUrl: URL, contextData: TContextData): Context<TContextData>;
@@ -1728,7 +1787,7 @@ export class FederationImpl<TContextData> implements Federation<TContextData> {
        },
      );
    }
    if (immediate || this.queue == null) {
    if (immediate || this.outboxQueue == null) {
      if (immediate) {
        logger.debug(
          "Sending activity immediately without queue since immediate option " +
@@ -1790,7 +1849,7 @@ export class FederationImpl<TContextData> implements Federation<TContextData> {
            ),
        },
      };
      this.queue.enqueue(message);
      this.outboxQueue.enqueue(message);
    }
  }

@@ -1934,7 +1993,7 @@ export class FederationImpl<TContextData> implements Federation<TContextData> {
          inboxContextFactory: context.toInboxContext.bind(context),
          kv: this.kv,
          kvPrefixes: this.kvPrefixes,
          queue: this.queue,
          queue: this.inboxQueue,
          actorDispatcher: this.actorCallbacks?.dispatcher,
          inboxListeners: this.inboxListeners,
          inboxErrorHandler: this.inboxErrorHandler,
@@ -2918,7 +2977,7 @@ export class InboxContextImpl<TContextData> extends ContextImpl<TContextData>
      activityId,
      activity: this.activity,
    });
    if (options?.immediate || this.federation.queue == null) {
    if (options?.immediate || this.federation.outboxQueue == null) {
      if (options?.immediate) {
        logger.debug(
          "Forwarding activity immediately without queue since immediate " +
@@ -2965,7 +3024,7 @@ export class InboxContextImpl<TContextData> extends ContextImpl<TContextData>
        attempt: 0,
        headers: {},
      };
      this.federation.queue.enqueue(message);
      this.federation.outboxQueue.enqueue(message);
    }
  }
}