Unverified Commit 798989c6 authored by Hong Minhee's avatar Hong Minhee
Browse files

Docs for `WorkersMessageQueue`

parent c7982fdf
Loading
Loading
Loading
Loading
+2 −1
Original line number Diff line number Diff line
@@ -181,7 +181,8 @@ export default {
> [!NOTE]
> Since your `KVNamespace` is not bound to a global variable, but rather
> passed as an argument to the `fetch()` method, you need to instantiate
> your `Federation` object inside the `fetch()` method.
> your `Federation` object inside the `fetch()` method, rather than the top
> level.
>
> For better organization, you probably want to use a builder pattern to
> register your dispatchers and listeners before instantiating the `Federation`
+82 −0
Original line number Diff line number Diff line
@@ -213,6 +213,88 @@ const federation = createFederation({
[@fedify/amqp]: https://github.com/fedify-dev/amqp
[RabbitMQ]: https://www.rabbitmq.com/

### `WorkersMessageQueue` (Cloudflare Workers only)

*This API is available since Fedify 1.6.0.*

`WorkersMessageQueue` is a message queue implementation for [Cloudflare Workers]
that uses Cloudflare's built-in [Cloudflare Queues] API.  It provides
scalability and high performance, making it suitable for production use in
Cloudflare Workers environments.  It requires a Cloudflare Queues setup and
management.

Best for
:   Production use in Cloudflare Workers environments.

Pros
:   Persistent, reliable, scalable, easy to set up.

Cons
:   Only available in Cloudflare Workers runtime.

~~~~ typescript twoslash
// @noErrors: 2322 2345
import type { FederationBuilder, KvStore } from "@fedify/fedify";
const builder = undefined as unknown as FederationBuilder<void>;
// ---cut-before---
import type { Federation, Message } from "@fedify/fedify";
import { WorkersMessageQueue } from "@fedify/fedify/x/cfworkers";

export default {
  async fetch(request, env, ctx) {
    const federation: Federation<void> = await builder.build({
// ---cut-start---
      kv: undefined as unknown as KvStore,
// ---cut-end---
      queue: new WorkersMessageQueue(env.QUEUE_BINDING),
    });
    // Omit the rest of the code for brevity
  },

  // Since defining a `queue()` method is the only way to consume messages
  // from the queue in Cloudflare Workers, we need to define it so that
  // the messages can be manually processed by `Federation.processQueuedTask()`
  // method:
  async queue(batch, env, ctx) {
    const federation: Federation<void> = await builder.build({
// ---cut-start---
      kv: undefined as unknown as KvStore,
// ---cut-end---
      queue: new WorkersMessageQueue(env.QUEUE_BINDING),
    });
    for (const msg of batch.messages) {
      await federation.processQueuedTask(
        undefined,  // You need to pass your context data here
        msg.body as Message,  // You need to cast the message body to `Message`
      );
    }
  }
} satisfies ExportedHandler<{ QUEUE_BINDING: Queue }>;
~~~~

> [!NOTE]
> Since your `Queue` is not bound to a global variable, but rather passed as
> an argument to the `fetch()` and `queue()` methods, you need to instantiate
> your `Federation` object inside these methods, rather than at the top level.
>
> For better organization, you probably want to use a builder pattern to
> register your dispatchers and listeners before instantiating the `Federation`
> object.  See the [*Builder pattern for structuring*
> section](./federation.md#builder-pattern-for-structuring) for details.

> [!NOTE]
> The [Cloudflare Queues] API does not provide a way to poll messages from
> the queue, so `WorkersMessageQueue.listen()` method always throws
> a `TypeError` when invoked.  Instead, you should define a `queue()` method
> in your Cloudflare worker, which will be called by the Cloudflare Queues
> API when new messages are available in the queue.  Inside the `queue()`
> method, you need to call `Federation.processQueuedTask()` method to manually
> process the messages.  The `queue()` method is the only way to consume
> messages from the queue in Cloudflare Workers.

[Cloudflare Workers]: https://workers.cloudflare.com/
[Cloudflare Queues]: https://developers.cloudflare.com/queues/


Implementing a custom `MessageQueue`
------------------------------------
+1 −1
Original line number Diff line number Diff line
@@ -4,7 +4,7 @@
    "@cloudflare/workers-types": "4.20250529.0",
    "@deno/kv": "^0.8.4",
    "@fedify/amqp": "^0.2.0",
    "@fedify/fedify": "1.6.1-pr.242.861",
    "@fedify/fedify": "1.6.1-pr.242.863",
    "@fedify/postgres": "^0.3.0",
    "@fedify/redis": "^0.4.0",
    "@hono/node-server": "^1.13.7",
+5 −5
Original line number Diff line number Diff line
@@ -21,8 +21,8 @@ importers:
        specifier: ^0.2.0
        version: 0.2.0(web-streams-polyfill@3.3.3)
      '@fedify/fedify':
        specifier: 1.6.1-pr.242.861
        version: 1.6.1-pr.242.861(web-streams-polyfill@3.3.3)
        specifier: 1.6.1-pr.242.863
        version: 1.6.1-pr.242.863(web-streams-polyfill@3.3.3)
      '@fedify/postgres':
        specifier: ^0.3.0
        version: 0.3.0(web-streams-polyfill@3.3.3)
@@ -460,8 +460,8 @@ packages:
    resolution: {integrity: sha512-s4ev+HMu6TNH1/RMNvIGvyn17D2dahq8Fpn/aDuBBngi0rHuOodZsoOnQMvEl9MoU0isv9GhPq5BP8oPYSmGaw==}
    engines: {bun: '>=1.1.0', deno: '>=2.0.0', node: '>=20.0.0'}

  '@fedify/fedify@1.6.1-pr.242.861':
    resolution: {integrity: sha512-EiBxK8oJsiqrvnQzzqZj4HaPEpIa09PuGQT6v8aC9ZlOoqwzsNId4d6R7FnyfzLUNtPCKHmOaCCWVh6xSqfWvA==}
  '@fedify/fedify@1.6.1-pr.242.863':
    resolution: {integrity: sha512-NyjF5dUSLPiCXqe6yHMf9cdrNRv/LiWkB7mYtophRWfkHNoawSi2TsPfbBruvxH03+CyApOO22S0l7bSAgZPDg==}
    engines: {bun: '>=1.1.0', deno: '>=2.0.0', node: '>=22.0.0'}

  '@fedify/postgres@0.3.0':
@@ -3009,7 +3009,7 @@ snapshots:
    transitivePeerDependencies:
      - web-streams-polyfill

  '@fedify/fedify@1.6.1-pr.242.861(web-streams-polyfill@3.3.3)':
  '@fedify/fedify@1.6.1-pr.242.863(web-streams-polyfill@3.3.3)':
    dependencies:
      '@cfworker/json-schema': 4.1.1
      '@es-toolkit/es-toolkit': es-toolkit@1.38.0