Unverified Commit fb5e57aa authored by Hong Minhee's avatar Hong Minhee
Browse files

Let Federation.startQueue() take an AbortSignal

[ci skip]
parent 07eeddb6
Loading
Loading
Loading
Loading
+12 −0
Original line number Diff line number Diff line
@@ -109,6 +109,18 @@ To be released.
    the activity to be sent only once.  It had added Object Integrity Proofs
    to the activity for every recipient before.

 -  The message queue is now able to be stopped manually by providing
    an `AbortSignal` object to the `Federation.startQueue()` method.

     -  Added the optional second parameter to `Federation.startQueue()` method,
        which is a `FederationStartQueueOptions` object.
     -  Added `FederationStartQueueOptions` interface.
     -  Added the optional second parameter to `MessageQueue.listen()` method,
        which is a `MessageQueueListenOptions` object.
     -  Added `MessageQueueListenOptions` interface.
     -  The return type of `MessageQueue.listen()` method became `Promise<void>`
        (was `void`).

 -  Added `ParallelMessageQueue` class.  [[#106]]

 -  WebFinger responses now include <http://webfinger.net/rel/avatar> links
+23 −8
Original line number Diff line number Diff line
@@ -137,7 +137,11 @@ Create a class that implements the `MessageQueue` interface, which includes
the `~MessageQueue.enqueue()` and `~MessageQueue.listen()` methods:

~~~~ typescript twoslash
import type { MessageQueue, MessageQueueEnqueueOptions } from "@fedify/fedify";
import type {
  MessageQueue,
  MessageQueueEnqueueOptions,
  MessageQueueListenOptions,
} from "@fedify/fedify";

class CustomMessageQueue implements MessageQueue {
  async enqueue(
@@ -147,7 +151,10 @@ class CustomMessageQueue implements MessageQueue {
    // Implementation here
  }

  listen(handler: (message: any) => Promise<void> | void): void {
  async listen(
    handler: (message: any) => Promise<void> | void,
    options: MessageQueueListenOptions = {},
  ): Promise<void> {
    // Implementation here
  }
}
@@ -166,6 +173,10 @@ This method should start a process that listens for new messages.
When a message is received, it should call the provided `handler` function.
Ensure proper error handling to prevent the listener from crashing.

> [!NOTE]
> A `Promise` object it returns should never resolve unless the given
> `~MessageQueueListenOptions.signal` is triggered.

### Consider additional features

Here's a list of additional features you might want to implement in your
@@ -236,7 +247,7 @@ you can use the `~CreateFederationOptions.manuallyStartQueue` option and

::: code-group

~~~~ typescript{11-15} twoslash [Deno]
~~~~ typescript{11-17} twoslash [Deno]
import type { KvStore } from "@fedify/fedify";
// ---cut-before---
import { createFederation } from "@fedify/fedify";
@@ -255,17 +266,19 @@ const federation = createFederation<void>({
// Start the message queue manually only in worker nodes.
// On non-worker nodes, the queue won't be started.
if (Deno.env.get("NODE_TYPE") === "worker") {
  await federation.startQueue();
  const controller = new AbortController();
  Deno.addSignalListener("SIGINT", () => controller.abort());
  await federation.startQueue(undefined, { signal: controller.signal });
}
~~~~

~~~~ typescript{12-16} twoslash [Node.js/Bun]
~~~~ typescript{12-18} twoslash [Node.js/Bun]
import type { KvStore } from "@fedify/fedify";
// ---cut-before---
import { createFederation } from "@fedify/fedify";
import { RedisMessageQueue } from "@fedify/redis";
import Redis from "ioredis";
import { env } from "node:process";
import process from "node:process";

const federation = createFederation<void>({
  queue: new RedisMessageQueue(() => new Redis()),
@@ -278,8 +291,10 @@ const federation = createFederation<void>({

// Start the message queue manually only in worker nodes.
// On non-worker nodes, the queue won't be started.
if (env.NODE_TYPE === "worker") {
  await federation.startQueue();
if (process.env.NODE_TYPE === "worker") {
  const controller = new AbortController();
  process.on("SIGINT", () => controller.abort());
  await federation.startQueue(undefined, { signal: controller.signal });
}
~~~~

+2 −2
Original line number Diff line number Diff line
{
  "devDependencies": {
    "@deno/kv": "^0.8.2",
    "@fedify/fedify": "1.0.0-dev.408",
    "@fedify/redis": "^0.1.1",
    "@fedify/fedify": "1.0.0-dev.409",
    "@fedify/redis": "0.2.0-dev.10",
    "@hono/node-server": "^1.12.2",
    "@js-temporal/polyfill": "^0.4.4",
    "@logtape/logtape": "^0.5.1",
+11 −33
Original line number Diff line number Diff line
@@ -12,11 +12,11 @@ importers:
        specifier: ^0.8.2
        version: 0.8.2
      '@fedify/fedify':
        specifier: 1.0.0-dev.408
        version: 1.0.0-dev.408(web-streams-polyfill@3.3.3)
        specifier: 1.0.0-dev.409
        version: 1.0.0-dev.409(web-streams-polyfill@3.3.3)
      '@fedify/redis':
        specifier: ^0.1.1
        version: 0.1.1(web-streams-polyfill@3.3.3)
        specifier: 0.2.0-dev.10
        version: 0.2.0-dev.10(web-streams-polyfill@3.3.3)
      '@hono/node-server':
        specifier: ^1.12.2
        version: 1.12.2(hono@4.6.1)
@@ -367,14 +367,11 @@ packages:
    resolution: {integrity: sha512-vBZP4NlzfOlerQTnba4aqZoMhE/a9HY7HRqoOPaETQcSQuWEIyZMHGfVu6w9wGtGK5fED5qRs2DteVCjOH60sA==}
    engines: {node: '>=14'}

  '@fedify/fedify@0.10.2':
    resolution: {integrity: sha512-GKxm+NZ1zNPJ9HTbTW+Y2o/HT3Rk6ly+j/GJfjXBFiwm391Ni56VYC8ON/KQsFdFYIp+eNMn1hWr0RbFtMwoOg==}
  '@fedify/fedify@1.0.0-dev.409':
    resolution: {integrity: sha512-Lbk8eiJZrH2u1JeOdjQp9lSt9CtZkBK3LHW5akoV7znVY9TMZ5E6BHWlzoi0HxYJPIpOHXaQOWHo9v3CBnz9QQ==}

  '@fedify/fedify@1.0.0-dev.408':
    resolution: {integrity: sha512-Lln5Ln0lCCgrskhVN4YibDBgTeuHA2nyT2r3szeXi81j6vNGxpRzdHQ+0wBSglrR17X+EtHlMltsrCCmXbDFsQ==}

  '@fedify/redis@0.1.1':
    resolution: {integrity: sha512-oKhOVYLRwkRf/tuePoT3CnwXUKm5rTE7hDg8/qfVo/txmydPPptXzPG10kL24zCv8t18FwmyaE0LlcrUrxd6FQ==}
  '@fedify/redis@0.2.0-dev.10':
    resolution: {integrity: sha512-OMaqK0Hpvl+HpGMTCjszM/DmYrljRGJaA9wTN4pWEtDBhwxwbMsuOlq8AdqsiFx7yJCxmosPL4iwuLRC21FxRg==}

  '@floating-ui/core@1.6.7':
    resolution: {integrity: sha512-yDzVT/Lm101nQ5TCVeK65LtdN7Tj4Qpr9RTXJ2vPFLqtLxwOrpoxAHAJI8J3yYWUc40J0BDBheaitK5SJmno2g==}
@@ -1955,26 +1952,7 @@ snapshots:

  '@fastify/busboy@2.1.1': {}

  '@fedify/fedify@0.10.2(web-streams-polyfill@3.3.3)':
    dependencies:
      '@deno/shim-crypto': 0.3.1
      '@deno/shim-deno': 0.18.2
      '@js-temporal/polyfill': 0.4.4
      '@logtape/logtape': 0.4.3
      '@phensley/language-tag': 1.9.0
      asn1js: 3.0.5
      json-canon: 1.0.1
      jsonld: 8.3.2(web-streams-polyfill@3.3.3)
      multibase: 4.0.6
      multicodec: 3.2.1
      pkijs: 3.2.4
      uri-template-router: 0.0.16
      url-template: 3.1.1
      urlpattern-polyfill: 10.0.0
    transitivePeerDependencies:
      - web-streams-polyfill

  '@fedify/fedify@1.0.0-dev.408(web-streams-polyfill@3.3.3)':
  '@fedify/fedify@1.0.0-dev.409(web-streams-polyfill@3.3.3)':
    dependencies:
      '@deno/shim-crypto': 0.3.1
      '@deno/shim-deno': 0.18.2
@@ -1994,10 +1972,10 @@ snapshots:
    transitivePeerDependencies:
      - web-streams-polyfill

  '@fedify/redis@0.1.1(web-streams-polyfill@3.3.3)':
  '@fedify/redis@0.2.0-dev.10(web-streams-polyfill@3.3.3)':
    dependencies:
      '@deno/shim-deno': 0.18.2
      '@fedify/fedify': 0.10.2(web-streams-polyfill@3.3.3)
      '@fedify/fedify': 1.0.0-dev.409(web-streams-polyfill@3.3.3)
      '@js-temporal/polyfill': 0.4.4
      ioredis: 5.4.1
    transitivePeerDependencies:
+16 −1
Original line number Diff line number Diff line
@@ -17,6 +17,17 @@ import type {
} from "./callback.ts";
import type { Context, RequestContext } from "./context.ts";

/**
 * Options for {@link Federation.startQueue} method.
 * @since 1.0.0
 */
export interface FederationStartQueueOptions {
  /**
   * The signal to abort the task queue.
   */
  signal?: AbortSignal;
}

/**
 * An object that registers federation-related business logic and dispatches
 * requests to the appropriate handlers.
@@ -33,8 +44,12 @@ export interface Federation<TContextData> {
   * This method is useful when you set the `manuallyStartQueue` option to
   * `true` in the {@link createFederation} function.
   * @param contextData The context data to pass to the context.
   * @param options Additional options for starting the queue.
   */
  startQueue(contextData: TContextData): Promise<void>;
  startQueue(
    contextData: TContextData,
    options?: FederationStartQueueOptions,
  ): Promise<void>;

  /**
   * Create a new context.
Loading