Unverified Commit c7c9dbcd authored by Hong Minhee's avatar Hong Minhee
Browse files

RedisMessageQueue

parent be0fe5d1
Loading
Loading
Loading
Loading
+6 −1
Original line number Diff line number Diff line
@@ -8,7 +8,10 @@
[![GitHub Actions][GitHub Actions badge]][GitHub Actions]

This package provides [Fedify]'s [`KvStore`] and [`MessageQueue`]
implementations for Redis.
implementations for Redis:

 -  [`RedisKvStore`]
 -  [`RedisMessageQueue`]

[JSR]: https://jsr.io/@fedify/redis
[JSR badge]: https://jsr.io/badges/@fedify/redis
@@ -19,6 +22,8 @@ implementations for Redis.
[Fedify]: https://fedify.dev/
[`KvStore`]: https://jsr.io/@fedify/fedify/doc/federation/~/KvStore
[`MessageQueue`]: https://jsr.io/@fedify/fedify/doc/federation/~/MessageQueue
[`RedisKvStore`]: https://jsr.io/@fedify/redis/doc/kv/~/RedisKvStore
[`RedisMessageQueue`]: https://jsr.io/@fedify/redis/doc/kv/~/RedisMessageQueue


Changelog
+4 −0
Original line number Diff line number Diff line
@@ -10,8 +10,12 @@
    "@deno/dnt": "jsr:@deno/dnt@^0.41.2",
    "@fedify/fedify": "jsr:@fedify/fedify@^0.10.0",
    "@std/assert": "jsr:@std/assert@^0.226.0",
    "@std/async": "jsr:@std/async@^0.224.2",
    "ioredis": "npm:ioredis@^5.4.0"
  },
  "unstable": [
    "temporal"
  ],
  "exclude": [
    "npm"
  ],
+5 −0
Original line number Diff line number Diff line
@@ -8,6 +8,7 @@
      "jsr:@logtape/logtape@^0.4.0": "jsr:@logtape/logtape@0.4.0",
      "jsr:@std/assert@^0.218.2": "jsr:@std/assert@0.218.2",
      "jsr:@std/assert@^0.226.0": "jsr:@std/assert@0.226.0",
      "jsr:@std/async@^0.224.2": "jsr:@std/async@0.224.2",
      "jsr:@std/bytes@^0.218.2": "jsr:@std/bytes@0.218.2",
      "jsr:@std/bytes@^1.0.0": "jsr:@std/bytes@1.0.0",
      "jsr:@std/encoding@^0.224.3": "jsr:@std/encoding@0.224.3",
@@ -83,6 +84,9 @@
          "jsr:@std/internal@^1.0.0"
        ]
      },
      "@std/async@0.224.2": {
        "integrity": "4d277d6e165df43d5e061ba0ef3edfddb8e8d558f5b920e3e6b1d2614b44d074"
      },
      "@std/bytes@0.218.2": {
        "integrity": "91fe54b232dcca73856b79a817247f4a651dbb60d51baafafb6408c137241670"
      },
@@ -549,6 +553,7 @@
      "jsr:@deno/dnt@^0.41.2",
      "jsr:@fedify/fedify@^0.10.0",
      "jsr:@std/assert@^0.226.0",
      "jsr:@std/async@^0.224.2",
      "npm:ioredis@^5.4.0"
    ]
  }
+17 −1
Original line number Diff line number Diff line
@@ -40,7 +40,23 @@ await build({
  outDir: "./npm",
  entryPoints: ["./mod.ts"],
  importMap,
  shims: { deno: true },
  shims: {
    deno: true,
    custom: [
      {
        package: {
          name: "@js-temporal/polyfill",
          version: "^0.4.4",
        },
        globalNames: [
          {
            name: "Temporal",
            exportName: "Temporal",
          },
        ],
      },
    ],
  },
  typeCheck: "both",
  declaration: "separate",
  declarationMap: true,

src/mq.test.ts

0 → 100644
+63 −0
Original line number Diff line number Diff line
import { assertEquals, assertGreater } from "@std/assert";
import { delay } from "@std/async/delay";
import { Redis } from "ioredis";
import { RedisMessageQueue } from "./mq.ts";

Deno.test("RedisMessageQueue", async (t) => {
  const mq = new RedisMessageQueue(() => new Redis(), {
    loopInterval: { seconds: 1 },
  });
  const mq2 = new RedisMessageQueue(() => new Redis(), {
    loopInterval: { seconds: 1 },
  });

  const messages: string[] = [];
  mq.listen((message: string) => {
    messages.push(message);
  });
  mq2.listen((message: string) => {
    messages.push(message);
  });

  await t.step("enqueue()", async () => {
    await mq.enqueue("Hello, world!");
  });

  await waitFor(() => messages.length > 0, 15_000);

  await t.step("listen()", () => {
    assertEquals(messages, ["Hello, world!"]);
  });

  let started = 0;
  await t.step("enqueue() with delay", async () => {
    started = Date.now();
    await mq.enqueue(
      "Delayed message",
      { delay: Temporal.Duration.from({ seconds: 3 }) },
    );
  });

  await waitFor(() => messages.length > 1, 15_000);

  await t.step("listen() with delay", () => {
    assertEquals(messages, ["Hello, world!", "Delayed message"]);
    assertGreater(Date.now() - started, 3_000);
  });

  mq[Symbol.dispose]();
  mq2[Symbol.dispose]();
});

async function waitFor(
  predicate: () => boolean,
  timeoutMs: number,
): Promise<void> {
  const started = Date.now();
  while (!predicate()) {
    await delay(500);
    if (Date.now() - started > timeoutMs) {
      throw new Error("Timeout");
    }
  }
}
Loading