Unverified Commit 7b90b873 authored by Hong Minhee's avatar Hong Minhee
Browse files

enqueueMany() method for batch message operations

parent ccf67837
Loading
Loading
Loading
Loading
+3 −0
Original line number Diff line number Diff line
@@ -68,6 +68,9 @@ Changelog

To be released.

 -  Added `PostgresMessageQueue.enqueueMany()` method for efficiently enqueuing
    multiple messages at once.

 -  Added some logging using [LogTape] for the sake of debugging.  The following
    categories are used:

+2 −2
Original line number Diff line number Diff line
@@ -9,8 +9,8 @@
  },
  "imports": {
    "@deno/dnt": "jsr:@deno/dnt@^0.41.3",
    "@fedify/fedify": "jsr:@fedify/fedify@^1.0.0",
    "@logtape/logtape": "jsr:@logtape/logtape@^0.8.0",
    "@fedify/fedify": "jsr:@fedify/fedify@1.5.0-dev.732",
    "@logtape/logtape": "jsr:@logtape/logtape@^0.9.0",
    "@std/assert": "jsr:@std/assert@^0.226.0",
    "@std/async": "jsr:@std/async@^1.0.5",
    "postgres": "npm:postgres@^3.4.5"
+70 −55
Original line number Diff line number Diff line
@@ -2,25 +2,27 @@
  "version": "4",
  "specifiers": {
    "jsr:@david/code-block-writer@^13.0.2": "13.0.3",
    "jsr:@david/which-runtime@~0.2.1": "0.2.1",
    "jsr:@deno/cache-dir@~0.10.3": "0.10.3",
    "jsr:@deno/dnt@~0.41.3": "0.41.3",
    "jsr:@fedify/fedify@1": "1.0.0",
    "jsr:@hugoalh/http-header-link@^1.0.2": "1.0.2",
    "jsr:@hugoalh/is-string-singleline@1.0.2": "1.0.2",
    "jsr:@logtape/logtape@0.8": "0.8.0",
    "jsr:@logtape/logtape@~0.6.2": "0.6.2",
    "jsr:@deno/graph@~0.73.1": "0.73.1",
    "jsr:@fedify/fedify@1.5.0-dev.732": "1.5.0-dev.732+7b169275",
    "jsr:@hugoalh/http-header-link@^1.0.2": "1.0.3",
    "jsr:@hugoalh/is-string-singleline@^1.0.4": "1.0.4",
    "jsr:@logtape/logtape@0.9": "0.9.0",
    "jsr:@logtape/logtape@~0.8.2": "0.8.2",
    "jsr:@std/assert@0.223": "0.223.0",
    "jsr:@std/assert@0.226": "0.226.0",
    "jsr:@std/async@^1.0.5": "1.0.5",
    "jsr:@std/bytes@0.223": "0.223.0",
    "jsr:@std/bytes@^1.0.2": "1.0.2",
    "jsr:@std/encoding@^1.0.5": "1.0.5",
    "jsr:@std/bytes@^1.0.2": "1.0.5",
    "jsr:@std/encoding@^1.0.5": "1.0.7",
    "jsr:@std/fmt@0.223": "0.223.0",
    "jsr:@std/fmt@1": "1.0.3",
    "jsr:@std/fs@0.223": "0.223.0",
    "jsr:@std/fs@1": "1.0.5",
    "jsr:@std/fs@~0.229.3": "0.229.3",
    "jsr:@std/http@^1.0.6": "1.0.7",
    "jsr:@std/http@^1.0.6": "1.0.13",
    "jsr:@std/internal@1": "1.0.4",
    "jsr:@std/io@0.223": "0.223.0",
    "jsr:@std/path@0.223": "0.223.0",
@@ -28,17 +30,19 @@
    "jsr:@std/path@1.0.0-rc.1": "1.0.0-rc.1",
    "jsr:@std/path@^1.0.7": "1.0.8",
    "jsr:@std/path@~0.225.2": "0.225.2",
    "jsr:@std/semver@^1.0.3": "1.0.3",
    "jsr:@std/semver@^1.0.3": "1.0.4",
    "jsr:@ts-morph/bootstrap@0.24": "0.24.0",
    "jsr:@ts-morph/common@0.24": "0.24.0",
    "npm:@phensley/language-tag@^1.9.0": "1.9.0",
    "npm:@multiformats/base-x@^4.0.1": "4.0.1",
    "npm:@opentelemetry/api@^1.9.0": "1.9.0",
    "npm:@opentelemetry/semantic-conventions@^1.27.0": "1.30.0",
    "npm:@phensley/language-tag@^1.9.0": "1.10.2",
    "npm:@types/node@*": "18.16.19",
    "npm:asn1js@^3.0.5": "3.0.5",
    "npm:json-canon@^1.0.1": "1.0.1",
    "npm:jsonld@^8.3.2": "8.3.2",
    "npm:multibase@^4.0.6": "4.0.6",
    "npm:jsonld@^8.3.2": "8.3.3",
    "npm:multicodec@^3.2.1": "3.2.1",
    "npm:pkijs@^3.2.4": "3.2.4",
    "npm:pkijs@^3.2.4": "3.2.5",
    "npm:postgres@^3.4.5": "3.4.5",
    "npm:uri-template-router@^0.0.16": "0.0.16",
    "npm:url-template@^3.1.1": "3.1.1"
@@ -47,9 +51,13 @@
    "@david/code-block-writer@13.0.3": {
      "integrity": "f98c77d320f5957899a61bfb7a9bead7c6d83ad1515daee92dbacc861e13bb7f"
    },
    "@david/which-runtime@0.2.1": {
      "integrity": "2a304e36b1122ebfe384d45de8fe0fb2bfe54a0a5a2a596efbfc81f54a47a62d"
    },
    "@deno/cache-dir@0.10.3": {
      "integrity": "eb022f84ecc49c91d9d98131c6e6b118ff63a29e343624d058646b9d50404776",
      "dependencies": [
        "jsr:@deno/graph",
        "jsr:@std/fmt@0.223",
        "jsr:@std/fs@0.223",
        "jsr:@std/io",
@@ -67,40 +75,47 @@
        "jsr:@ts-morph/bootstrap"
      ]
    },
    "@fedify/fedify@1.0.0": {
      "integrity": "3bb429331cec437ac6dcf0527b3e0fda1f0261d1945b7d2f20a7a3e9fdec7aa9",
    "@deno/graph@0.73.1": {
      "integrity": "cd69639d2709d479037d5ce191a422eabe8d71bb68b0098344f6b07411c84d41"
    },
    "@fedify/fedify@1.5.0-dev.732+7b169275": {
      "integrity": "7b0dbf545b8a568f83dc7419bc75d36decea529e4fa252a31c9929c12cd475db",
      "dependencies": [
        "jsr:@david/which-runtime",
        "jsr:@hugoalh/http-header-link",
        "jsr:@logtape/logtape@~0.6.2",
        "jsr:@logtape/logtape@~0.8.2",
        "jsr:@std/async",
        "jsr:@std/bytes@^1.0.2",
        "jsr:@std/encoding",
        "jsr:@std/http",
        "jsr:@std/semver",
        "npm:@multiformats/base-x",
        "npm:@opentelemetry/api",
        "npm:@opentelemetry/semantic-conventions",
        "npm:@phensley/language-tag",
        "npm:asn1js",
        "npm:json-canon",
        "npm:jsonld",
        "npm:multibase",
        "npm:multicodec",
        "npm:pkijs",
        "npm:uri-template-router",
        "npm:url-template"
      ]
    },
    "@hugoalh/http-header-link@1.0.2": {
      "integrity": "1f607e34ac0790a0b0759f89ade294ab3a1d211e46a8dea337eaafa26950205f",
    "@hugoalh/http-header-link@1.0.3": {
      "integrity": "3372096a73d755e3351f7fbd7155db7725874c2682a594a655580e3866563024",
      "dependencies": [
        "jsr:@hugoalh/is-string-singleline"
      ]
    },
    "@hugoalh/is-string-singleline@1.0.2": {
      "integrity": "f79dc9930bd89f3534db2efc93293fde5af81648dacbb87afd921cd6a163c9bd"
    "@hugoalh/is-string-singleline@1.0.4": {
      "integrity": "b283396a7b1a1c9777c42560f657a5b13b0dee486282cce87f3c41a9e5796a66"
    },
    "@logtape/logtape@0.6.2": {
      "integrity": "80a5a203bb6775c46ede31d0582502f238811cc44e54a1e53631480c48e2df04"
    "@logtape/logtape@0.8.2": {
      "integrity": "e2ae1fc2561e8d010359b9894efb39bdb559dae44f5824540cbb26a78eee36bc"
    },
    "@logtape/logtape@0.8.0": {
      "integrity": "12bdf406a713f754d99c2ff65deb45127ce4e2c59e7136d608042250629428bd"
    "@logtape/logtape@0.9.0": {
      "integrity": "aacdeb0e040e5723021bef8b9b8c03ad7cffb5e2146a0024c30814930f6e7c89"
    },
    "@std/assert@0.223.0": {
      "integrity": "eb8d6d879d76e1cc431205bd346ed4d88dc051c6366365b1af47034b0670be24"
@@ -117,11 +132,11 @@
    "@std/bytes@0.223.0": {
      "integrity": "84b75052cd8680942c397c2631318772b295019098f40aac5c36cead4cba51a8"
    },
    "@std/bytes@1.0.2": {
      "integrity": "fbdee322bbd8c599a6af186a1603b3355e59a5fb1baa139f8f4c3c9a1b3e3d57"
    "@std/bytes@1.0.5": {
      "integrity": "4465dd739d7963d964c809202ebea6d5c6b8e3829ef25c6a224290fbb8a1021e"
    },
    "@std/encoding@1.0.5": {
      "integrity": "ecf363d4fc25bd85bd915ff6733a7e79b67e0e7806334af15f4645c569fefc04"
    "@std/encoding@1.0.7": {
      "integrity": "f631247c1698fef289f2de9e2a33d571e46133b38d042905e3eac3715030a82d"
    },
    "@std/fmt@0.223.0": {
      "integrity": "6deb37794127dfc7d7bded2586b9fc6f5d50e62a8134846608baf71ffc1a5208"
@@ -144,8 +159,8 @@
        "jsr:@std/path@^1.0.7"
      ]
    },
    "@std/http@1.0.7": {
      "integrity": "9b904fc256678a5c9759f1a53a24a3fdcc59d83dc62099bb472683b6f819194c"
    "@std/http@1.0.13": {
      "integrity": "d29618b982f7ae44380111f7e5b43da59b15db64101198bb5f77100d44eb1e1e"
    },
    "@std/internal@1.0.0": {
      "integrity": "ac6a6dfebf838582c4b4f61a6907374e27e05bedb6ce276e0f1608fe84e7cd9a"
@@ -178,8 +193,8 @@
    "@std/path@1.0.8": {
      "integrity": "548fa456bb6a04d3c1a1e7477986b6cffbce95102d0bb447c67c4ee70e0364be"
    },
    "@std/semver@1.0.3": {
      "integrity": "7c139c6076a080eeaa4252c78b95ca5302818d7eafab0470d34cafd9930c13c8"
    "@std/semver@1.0.4": {
      "integrity": "a62af791917d8fd6c48d6ebbb872f83fad3fc6671ffadbbd39ea229c2d34d175"
    },
    "@ts-morph/bootstrap@0.24.0": {
      "integrity": "a826a2ef7fa8a7c3f1042df2c034d20744d94da2ee32bf29275bcd4dffd3c060",
@@ -210,11 +225,17 @@
    "@multiformats/base-x@4.0.1": {
      "integrity": "sha512-eMk0b9ReBbV23xXU693TAIrLyeO5iTgBZGSJfpqriG8UkYvr/hC9u9pyMlAakDNHWmbhMZCDs6KQO0jzKD8OTw=="
    },
    "@noble/hashes@1.5.0": {
      "integrity": "sha512-1j6kQFb7QRru7eKN3ZDvRcP13rugwdxZqCjbiAVZfIJwgj2A65UmT4TgARXGlXgnRkORLTDTrO19ZErt7+QXgA=="
    "@noble/hashes@1.7.1": {
      "integrity": "sha512-B8XBPsn4vT/KJAGqDzbwztd+6Yte3P4V7iafm24bxgDe/mlRuK6xmWPuCNrKt2vDafZ8MfJLlchDG/vYafQEjQ=="
    },
    "@opentelemetry/api@1.9.0": {
      "integrity": "sha512-3giAOQvZiH5F9bMlMiv8+GSPMeqg0dbaeo58/0SlA9sxSqZhnUtxzX9/2FzyhS9sWQf5S0GJE0AKBrFqjpeYcg=="
    },
    "@opentelemetry/semantic-conventions@1.30.0": {
      "integrity": "sha512-4VlGgo32k2EQ2wcCY3vEU28A0O13aOtHz3Xt2/2U5FAh9EfhD6t6DqL5Z6yAnRCntbTFDU4YfbpyzSlHNWycPw=="
    },
    "@phensley/language-tag@1.9.0": {
      "integrity": "sha512-nj2uFsnp2SSD/DEGvLqcrpTXerIe9yShGndnwY7p3Z6Ep4m6+GkrgZCNJUc8rKcXWgQmRGC2TUzwG/LlBVtuCA==",
    "@phensley/language-tag@1.10.2": {
      "integrity": "sha512-Xdui/GVDztdYb9oGzysHRRnakD9FKJIRIRTYvq5SoeGmglAgtDXOLEAm+yTMl8sqMr6AdVCz3/kH7F9wNeGnSQ==",
      "dependencies": [
        "tslib"
      ]
@@ -264,8 +285,8 @@
    "json-canon@1.0.1": {
      "integrity": "sha512-PQcj4PFOTAQxE8PgoQ4KrM0DcKWZd7S3ELOON8rmysl9I8JuFMgxu1H9v+oZsTPjjkpeS3IHPwLjr7d+gKygnw=="
    },
    "jsonld@8.3.2": {
      "integrity": "sha512-MwBbq95szLwt8eVQ1Bcfwmgju/Y5P2GdtlHE2ncyfuYjIdEhluUVyj1eudacf1mOkWIoS9GpDBTECqhmq7EOaA==",
    "jsonld@8.3.3": {
      "integrity": "sha512-9YcilrF+dLfg9NTEof/mJLMtbdX1RJ8dbWtJgE00cMOIohb1lIyJl710vFiTaiHTl6ZYODJuBd32xFvUhmv3kg==",
      "dependencies": [
        "@digitalbazaar/http-client",
        "canonicalize",
@@ -290,12 +311,6 @@
        "yallist"
      ]
    },
    "multibase@4.0.6": {
      "integrity": "sha512-x23pDe5+svdLz/k5JPGCVdfn7Q5mZVMBETiC+ORfO+sor9Sgs0smJzAjfTbM5tckeCqnaUuMYoz+k3RXMmJClQ==",
      "dependencies": [
        "@multiformats/base-x"
      ]
    },
    "multicodec@3.2.1": {
      "integrity": "sha512-+expTPftro8VAW8kfvcuNNNBgb9gPeNYV9dn+z1kJRWF2vih+/S79f2RVeIwmrJBUJ6NT9IUPWnZDQvegEh5pw==",
      "dependencies": [
@@ -317,8 +332,8 @@
        "formdata-polyfill"
      ]
    },
    "pkijs@3.2.4": {
      "integrity": "sha512-Et9V5QpvBilPFgagJcaKBqXjKrrgF5JL2mSDELk1vvbOTt4fuBhSSsGn9Tcz0TQTfS5GCpXQ31Whrpqeqp0VRg==",
    "pkijs@3.2.5": {
      "integrity": "sha512-WX0la7n7CbnguuaIQoT4Fc0IJckPDOUldzOwlZ0nwpOcySS+Six/tXBdc0RX17J5o1To0SAr3xDJjDLsOfDFQA==",
      "dependencies": [
        "@noble/hashes",
        "asn1js",
@@ -331,8 +346,8 @@
    "postgres@3.4.5": {
      "integrity": "sha512-cDWgoah1Gez9rN3H4165peY9qfpEo+SA61oQv65O3cRUE1pOEoJWwddwcqKE8XZYjbblOJlYDlLV4h67HrEVDg=="
    },
    "pvtsutils@1.3.5": {
      "integrity": "sha512-ARvb14YB9Nm2Xi6nBq1ZX6dAM0FsJnuk+31aUp4TrcZEdKUlSqOqsxJHUPJDNE3qiIp+iUPEIeR6Je/tgV7zsA==",
    "pvtsutils@1.3.6": {
      "integrity": "sha512-PLgQXQ6H2FWCaeRak8vvk1GW462lMxB5s3Jm673N82zI4vqtVUPuZdffdZbPDFRoU8kAhItWFtPCWiPpp4/EDg==",
      "dependencies": [
        "tslib"
      ]
@@ -349,8 +364,8 @@
    "setimmediate@1.0.5": {
      "integrity": "sha512-MATJdZp8sLqDl/68LfQmbP8zKPLQNV6BIZoIgrscFDQ+RsvK/BxeDQOgyxKKoh0y/8h3BqVFnCqQ/gd+reiIXA=="
    },
    "tslib@2.7.0": {
      "integrity": "sha512-gLXCKdN1/j47AiHiOkJN69hJmcbGTHI0ImLmbYLHykhgeN0jVGola9yVjFgzCUklsZQMW55o+dW7IXv3RCXDzA=="
    "tslib@2.8.1": {
      "integrity": "sha512-oJFu94HQb+KVduSUQL7wnpmqnfmLsOA/nAh6b6EH0wCEoK0/mPeXU6c3wKDV83MkOuHPRHtSXKKU99IBazS/2w=="
    },
    "uint8arrays@3.1.1": {
      "integrity": "sha512-+QJa8QRnbdXVpHYjLoTpJIdCTiw9Ir62nocClWuXIq2JIh4Uta0cQsTSpFL678p2CN8B+XSApwcU+pQEqVpKWg==",
@@ -358,8 +373,8 @@
        "multiformats"
      ]
    },
    "undici@5.28.4": {
      "integrity": "sha512-72RFADWFqKmUb2hmmvNODKL3p9hcB6Gt2DOQMis1SEBaV6a4MH8soBvzg+95CYhCKPFedut2JY9bMfrDl9D23g==",
    "undici@5.29.0": {
      "integrity": "sha512-raqeBD6NQK4SkWhQzeYKd1KmIG6dllBOTt55Rmkt4HtI9mwdWtJljnrXjAFUBLTSN67HWrOIZ3EPF4kjUw80Bg==",
      "dependencies": [
        "@fastify/busboy"
      ]
@@ -383,8 +398,8 @@
  "workspace": {
    "dependencies": [
      "jsr:@deno/dnt@~0.41.3",
      "jsr:@fedify/fedify@1",
      "jsr:@logtape/logtape@0.8",
      "jsr:@fedify/fedify@1.5.0-dev.732",
      "jsr:@logtape/logtape@0.9",
      "jsr:@std/assert@0.226",
      "jsr:@std/async@^1.0.5",
      "npm:postgres@^3.4.5"
+31 −0
Original line number Diff line number Diff line
@@ -56,6 +56,37 @@ Deno.test("PostgresMessageQueue", async (t) => {
    assertGreater(Date.now() - started, 3_000);
  });

  await t.step("enqueueMany()", async () => {
    while (messages.length > 0) messages.pop();
    const batchMessages = [
      "First batch message",
      { text: "Second batch message" },
      { text: "Third batch message", priority: "high" },
    ];
    await mq.enqueueMany(batchMessages);
    await waitFor(() => messages.length === batchMessages.length, 15_000);
    assertEquals(messages, batchMessages);
  });

  await t.step("enqueueMany() with delay", async () => {
    while (messages.length > 0) messages.pop();
    started = Date.now();
    const delayedBatchMessages = [
      "Delayed batch 1",
      "Delayed batch 2",
    ];
    await mq.enqueueMany(
      delayedBatchMessages,
      { delay: Temporal.Duration.from({ seconds: 2 }) },
    );
    await waitFor(
      () => messages.length === delayedBatchMessages.length,
      15_000,
    );
    assertEquals(messages, delayedBatchMessages);
    assertGreater(Date.now() - started, 2_000);
  });

  controller.abort();
  await listening;
  await listening2;
+33 −0
Original line number Diff line number Diff line
@@ -111,6 +111,39 @@ export class PostgresMessageQueue implements MessageQueue {
    });
  }

  async enqueueMany(
    // deno-lint-ignore no-explicit-any
    messages: any[],
    options?: MessageQueueEnqueueOptions,
  ): Promise<void> {
    if (messages.length === 0) return;
    await this.initialize();
    const delay = options?.delay ?? Temporal.Duration.from({ seconds: 0 });
    if (options?.delay) {
      logger.debug("Enqueuing messages with a delay of {delay}...", {
        delay,
        messages,
      });
    } else {
      logger.debug("Enqueuing messages...", { messages });
    }
    for (const message of messages) {
      await this.#sql`
        INSERT INTO ${this.#sql(this.#tableName)} (message, delay)
        VALUES (
          ${this.#json(message)},
          ${delay.toString()}
        );
      `;
    }
    logger.debug("Enqueued messages.", { messages });
    await this.#sql.notify(this.#channelName, delay.toString());
    logger.debug("Notified the message queue channel {channelName}.", {
      channelName: this.#channelName,
      messages,
    });
  }

  async listen(
    // deno-lint-ignore no-explicit-any
    handler: (message: any) => void | Promise<void>,