Unverified Commit 8b3d47de authored by Hong Minhee's avatar Hong Minhee
Browse files

`Federation.processQueuedTask()` method

parent 065a0d45
Loading
Loading
Loading
Loading
+6 −1
Original line number Diff line number Diff line
@@ -41,11 +41,16 @@ To be released. Note that 1.6.0 was skipped due to a mistake in the versioning.
     -  Added `HttpMessageSignaturesSpecDeterminer` interface.
     -  Added `--first-knock` option to `fedify lookup` command.

 -  Added `WorkersKvStore` class.  [[#233], [#241], [#242]]
 -  Fedify now supports [Cloudflare Workers].  [[#233]]

     -  Added `Federation.processQueuedTask()` method.  [[#242]]
     -  Added `Message` type.  [[#242]]
     -  Added `WorkersKvStore` class.  [[#241], [#242]]

 -  The minimum supported version of Node.js is now 22.0.0.

[RFC 9421]: https://www.rfc-editor.org/rfc/rfc9421
[Cloudflare Workers]: https://workers.cloudflare.com/
[#208]: https://github.com/fedify-dev/fedify/issues/208
[#227]: https://github.com/fedify-dev/fedify/issues/227
[#233]: https://github.com/fedify-dev/fedify/issues/233
+15 −0
Original line number Diff line number Diff line
@@ -33,6 +33,7 @@ import type {
  FederationQueueOptions,
} from "./middleware.ts";
import type { MessageQueue } from "./mq.ts";
import type { Message } from "./queue.ts";
import type { RetryPolicy } from "./retry.ts";

/**
@@ -468,6 +469,20 @@ export interface Federation<TContextData> extends Federatable<TContextData> {
    options?: FederationStartQueueOptions,
  ): Promise<void>;

  /**
   * Processes a queued message task.  This method handles different types of
   * tasks such as fanout, outbox, and inbox messages.
   *
   * Note that you usually do not need to call this method directly unless you
   * are deploying your federated application on a platform that does not
   * support long-running processing, such as Cloudflare Workers.
   * @param contextData The context data to pass to the context.
   * @param message The message that represents the task to be processed.
   * @returns A promise that resolves when the message has been processed.
   * @since 1.6.0
   */
  processQueuedTask(contextData: TContextData, message: Message): Promise<void>;

  /**
   * Create a new context.
   * @param baseUrl The base URL of the server.  The `pathname` remains root,
+10 −7
Original line number Diff line number Diff line
@@ -425,7 +425,7 @@ export class FederationImpl<TContextData>
      this.inboxQueueStarted = true;
      promises.push(
        this.inboxQueue.listen(
          (msg) => this.#listenQueue(ctxData, msg),
          (msg) => this.processQueuedTask(ctxData, msg),
          { signal },
        ),
      );
@@ -440,7 +440,7 @@ export class FederationImpl<TContextData>
      this.outboxQueueStarted = true;
      promises.push(
        this.outboxQueue.listen(
          (msg) => this.#listenQueue(ctxData, msg),
          (msg) => this.processQueuedTask(ctxData, msg),
          { signal },
        ),
      );
@@ -456,7 +456,7 @@ export class FederationImpl<TContextData>
      this.fanoutQueueStarted = true;
      promises.push(
        this.fanoutQueue.listen(
          (msg) => this.#listenQueue(ctxData, msg),
          (msg) => this.processQueuedTask(ctxData, msg),
          { signal },
        ),
      );
@@ -464,7 +464,10 @@ export class FederationImpl<TContextData>
    await Promise.all(promises);
  }

  #listenQueue(ctxData: TContextData, message: Message): Promise<void> {
  processQueuedTask(
    contextData: TContextData,
    message: Message,
  ): Promise<void> {
    const tracer = this._getTracer();
    const extractedContext = propagation.extract(
      context.active(),
@@ -486,7 +489,7 @@ export class FederationImpl<TContextData>
              span.setAttribute("activitypub.activity.id", message.activityId);
            }
            try {
              await this.#listenFanoutMessage(ctxData, message);
              await this.#listenFanoutMessage(contextData, message);
            } catch (e) {
              span.setStatus({
                code: SpanStatusCode.ERROR,
@@ -514,7 +517,7 @@ export class FederationImpl<TContextData>
              span.setAttribute("activitypub.activity.id", message.activityId);
            }
            try {
              await this.#listenOutboxMessage(ctxData, message, span);
              await this.#listenOutboxMessage(contextData, message, span);
            } catch (e) {
              span.setStatus({
                code: SpanStatusCode.ERROR,
@@ -538,7 +541,7 @@ export class FederationImpl<TContextData>
          extractedContext,
          async (span) => {
            try {
              await this.#listenInboxMessage(ctxData, message, span);
              await this.#listenInboxMessage(contextData, message, span);
            } catch (e) {
              span.setStatus({
                code: SpanStatusCode.ERROR,
+1 −0
Original line number Diff line number Diff line
@@ -22,6 +22,7 @@ export {
  type FederationQueueOptions,
} from "./middleware.ts";
export * from "./mq.ts";
export type { Message } from "./queue.ts";
export * from "./retry.ts";
export * from "./router.ts";
export { type SenderKeyPair } from "./send.ts";
+9 −0
Original line number Diff line number Diff line
@@ -3,6 +3,15 @@ export interface SenderKeyJwkPair {
  privateKey: JsonWebKey;
}

/**
 * A message that represents a task to be processed by the background worker.
 * The concrete type of the message depends on the `type` property.
 *
 * Please do not depend on the concrete types of the messages, as they may
 * change in the future.  You should treat the `Message` type as an opaque
 * type.
 * @since 1.6.0
 */
export type Message = FanoutMessage | OutboxMessage | InboxMessage;

export interface FanoutMessage {