Unverified Commit d4357cbb authored by Hong Minhee's avatar Hong Minhee
Browse files

Merge remote-tracking branch 'amqp/main'

parents 8a000b1c 56d8a4ba
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
.claude/settings.local.json
.DS_Store
dist/
deno.lock
node_modules/
repomix-output.xml

amqp/README.md

0 → 100644
+109 −0
Original line number Diff line number Diff line
<!-- deno-fmt-ignore-file -->

@fedify/amqp: AMQP/RabbitMQ driver for Fedify
=============================================

[![JSR][JSR badge]][JSR]
[![npm][npm badge]][npm]
[![GitHub Actions][GitHub Actions badge]][GitHub Actions]

> [!NOTE]
>
> Although it's theoretically possible to be used with any AMQP 0-9-1 broker,
> this package is primarily designed for and tested with [RabbitMQ].

This package provides [Fedify]'s [`MessageQueue`] implementation for AMQP, which
is supported by RabbitMQ:

 -  [`AmqpMessageQueue`]

Here is an example of how to use it:

~~~~ typescript
import { createFederation } from "@fedify/fedify";
import { AmqpMessageQueue } from "@fedify/amqp";
import { connect } from "amqplib";

const federation = createFederation({
  queue: new AmqpMessageQueue(await connect("amqp://localhost")),
  // ... other configurations
});
~~~~

The `AmqpMessageQueue` constructor accepts options as the second
parameter, which can be used to configure the message queue:

~~~~ typescript
new AmqpMessageQueue(await connect("amqp://localhost"), {
  queue: "my_queue",
})
~~~~

For more details, please refer to the docs of [`AmqpMessageQueueOptions`].

[JSR]: https://jsr.io/@fedify/amqp
[JSR badge]: https://jsr.io/badges/@fedify/amqp
[npm]: https://www.npmjs.com/package/@fedify/amqp
[npm badge]: https://img.shields.io/npm/v/@fedify/amqp?logo=npm
[GitHub Actions]: https://github.com/fedify-dev/amqp/actions/workflows/main.yaml
[GitHub Actions badge]: https://github.com/fedify-dev/amqp/actions/workflows/main.yaml/badge.svg
[RabbitMQ]: https://www.rabbitmq.com/
[Fedify]: https://fedify.dev/
[`KvStore`]: https://jsr.io/@fedify/fedify/doc/federation/~/KvStore
[`MessageQueue`]: https://jsr.io/@fedify/fedify/doc/federation/~/MessageQueue
[`AmqpMessageQueue`]: https://jsr.io/@fedify/amqp/doc/mq/~/AmqpMessageQueue
[`AmqpMessageQueueOptions`]: https://jsr.io/@fedify/amqp/doc/mq/~/AmqpMessageQueueOptions


Installation
------------

### Deno

~~~~ sh
deno add @fedify/amqp
~~~~

### Node.js

~~~~ sh
npm install @fedify/amqp
~~~~

### Bun

~~~~ sh
bun add @fedify/amqp
~~~~


Changelog
---------

### Version 0.4.0

To be released.

### Version 0.3.0

Released on June 25, 2025.

 -  Added `nativeRetrial` option to `AmqpMessageQueueOptions` to enable
    native retrial of messages.

 -  The type of the `AmqpMessageQueue()` constructor's first parameter has been
    changed from `Connection` to `ChannelModel`.

### Version 0.2.0

Released on March 28, 2025.

 -  Added `AmqpMessageQueue.enqueueMany()` method for efficiently enqueuing
    multiple messages at once.

 -  Updated *@js-temporal/polyfill* to 0.5.0 for Node.js and Bun. On Deno,
    there is no change because the polyfill is not used.

### Version 0.1.0

Initial release.  Released on October 14, 2024.

amqp/deno.json

0 → 100644
+47 −0
Original line number Diff line number Diff line
{
  "name": "@fedify/amqp",
  "version": "1.8.0",
  "license": "MIT",
  "exports": {
    ".": "./mod.ts",
    "./mq": "./mq.ts"
  },
  "imports": {
    "@hongminhee/suite": "jsr:@hongminhee/suite@^0.6.3",
    "amqplib": "npm:amqplib@^0.10.8"
  },
  "nodeModulesDir": "auto",
  "unstable": [
    "temporal"
  ],
  "exclude": [
    ".github",
    "node_modules",
    "npm",
    "pnpm-lock.yaml"
  ],
  "tasks": {
    "build": "pnpm build",
    "check": "deno fmt --check && deno lint && deno check *.ts",
    "test": "deno test --allow-net --allow-env",
    "test:node": {
      "dependencies": [
        "build"
      ],
      "command": "node --experimental-transform-types --test"
    },
    "test:bun": {
      "dependencies": [
        "build"
      ],
      "command": "bun test --timeout 15000"
    },
    "test-all": {
      "dependencies": [
        "test",
        "test:node",
        "test:bun"
      ]
    }
  }
}

amqp/mod.ts

0 → 100644
+1 −0
Original line number Diff line number Diff line
export * from "./mq.ts";

amqp/mq.test.ts

0 → 100644
+186 −0
Original line number Diff line number Diff line
import { suite } from "@hongminhee/suite";
import * as temporal from "@js-temporal/polyfill";
import { assert, assertEquals, assertFalse, assertGreater } from "@std/assert";
import { delay } from "@std/async/delay";
// @deno-types="npm:@types/amqplib"
import { type ChannelModel, connect } from "amqplib";
import process from "node:process";
import { AmqpMessageQueue } from "./mq.ts";

let Temporal: typeof temporal.Temporal;
if ("Temporal" in globalThis) {
  Temporal = globalThis.Temporal;
} else {
  Temporal = temporal.Temporal;
}

const AMQP_URL = process.env.AMQP_URL;
const test = AMQP_URL ? suite(import.meta) : suite(import.meta).skip;

function getConnection(): Promise<ChannelModel> {
  return connect(AMQP_URL!);
}

test("AmqpMessageQueue", {
  sanitizeOps: false,
  sanitizeExit: false,
  sanitizeResources: false,
}, async () => {
  const conn = await getConnection();
  const conn2 = await getConnection();
  const randomSuffix = Math.random().toString(36).substring(2);
  const queue = `fedify_queue_${randomSuffix}`;
  const delayedQueuePrefix = `fedify_delayed_${randomSuffix}_`;
  const mq = new AmqpMessageQueue(conn, { queue, delayedQueuePrefix });
  const mq2 = new AmqpMessageQueue(conn2, { queue, delayedQueuePrefix });

  const messages1: string[] = [];
  const messages2: string[] = [];
  const allMessages: string[] = [];
  const controller = new AbortController();
  const listening = mq.listen((message: string) => {
    messages1.push(message);
    allMessages.push(message);
  }, { signal: controller.signal });
  const listening2 = mq2.listen((message: string) => {
    messages2.push(message);
    allMessages.push(message);
  }, { signal: controller.signal });

  await mq.enqueue("Hello, world!");

  await waitFor(() => allMessages.length > 0, 15_000);

  assertEquals(allMessages.includes("Hello, world!"), true);

  // enqueue() with delay
  const started = Date.now();
  await mq.enqueue(
    "Delayed message",
    { delay: Temporal.Duration.from({ seconds: 3 }) },
  );

  await waitFor(() => allMessages.includes("Delayed message"), 15_000);

  // listen() with delay
  assertEquals(allMessages.includes("Delayed message"), true);
  assertGreater(Date.now() - started, 3_000);

  await mq.enqueueMany(["Message 1", "Message 2", "Message 3"]);

  await waitFor(() =>
    allMessages.includes("Message 1") &&
    allMessages.includes("Message 2") &&
    allMessages.includes("Message 3"), 15_000);

  // listen() after enqueueMany()
  assertEquals(allMessages.includes("Message 1"), true);
  assertEquals(allMessages.includes("Message 2"), true);
  assertEquals(allMessages.includes("Message 3"), true);

  // enqueueMany() with delay
  const manyStarted = Date.now();
  await mq.enqueueMany(
    ["Delayed batch 1", "Delayed batch 2"],
    { delay: Temporal.Duration.from({ seconds: 3 }) },
  );

  await waitFor(() =>
    allMessages.includes("Delayed batch 1") &&
    allMessages.includes("Delayed batch 2"), 15_000);

  // listen() after enqueueMany() with delay
  assertEquals(allMessages.includes("Delayed batch 1"), true);
  assertEquals(allMessages.includes("Delayed batch 2"), true);
  assertGreater(Date.now() - manyStarted, 3_000);

  controller.abort();
  await listening;
  await listening2;

  await conn.close();
  await conn2.close();
});

test(
  "AmqpMessageQueue [nativeRetrial: false]",
  { sanitizeOps: false, sanitizeExit: false, sanitizeResources: false },
  async () => {
    const conn = await getConnection();
    const randomSuffix = Math.random().toString(36).substring(2);
    const queue = `fedify_queue_${randomSuffix}`;
    const delayedQueuePrefix = `fedify_delayed_${randomSuffix}_`;
    const mq = new AmqpMessageQueue(conn, { queue, delayedQueuePrefix });
    assertFalse(mq.nativeRetrial);

    const controller = new AbortController();
    let i = 0;
    const listening = mq.listen((message: string) => {
      if (message !== "Hello, world!") return;
      if (i++ < 1) {
        throw new Error("Test error to check native retrial");
      }
    }, { signal: controller.signal });

    await mq.enqueue("Hello, world!");

    await waitFor(() => i >= 1, 15_000);
    assertEquals(i, 1);
    await delay(5_000);

    controller.abort();
    await listening;
    await conn.close();

    assertEquals(i, 1);
  },
);

test(
  "AmqpMessageQueue [nativeRetrial: true]",
  { sanitizeOps: false, sanitizeExit: false, sanitizeResources: false },
  async () => {
    const conn = await getConnection();
    const randomSuffix = Math.random().toString(36).substring(2);
    const queue = `fedify_queue_${randomSuffix}`;
    const delayedQueuePrefix = `fedify_delayed_${randomSuffix}_`;
    const mq = new AmqpMessageQueue(conn, {
      queue,
      delayedQueuePrefix,
      nativeRetrial: true,
    });
    assert(mq.nativeRetrial);

    const controller = new AbortController();
    let i = 0;
    const listening = mq.listen((message: string) => {
      if (message !== "Hello, world!") return;
      if (i++ < 1) {
        throw new Error("Test error to check native retrial");
      }
    }, { signal: controller.signal });

    await mq.enqueue("Hello, world!");

    await waitFor(() => i > 1, 15_000);

    controller.abort();
    await listening;
    await conn.close();

    assertGreater(i, 1);
  },
);

async function waitFor(
  predicate: () => boolean,
  timeoutMs: number,
): Promise<void> {
  const started = Date.now();
  while (!predicate()) {
    await delay(500);
    if (Date.now() - started > timeoutMs) {
      throw new Error("Timeout");
    }
  }
}
Loading