Unverified Commit f3787c34 authored by Hong Minhee's avatar Hong Minhee
Browse files

Instrument outbox enqueueing

parent 49db8830
Loading
Loading
Loading
Loading
+5 −0
Original line number Diff line number Diff line
@@ -127,6 +127,9 @@ spans:
| `activitypub.dispatch_object`                       | Server      | Dispatches the Activity Streams object.     |
| `activitypub.get_actor_handle`                      | Client      | Resolves the actor handle.                  |
| `activitypub.lookup_object`                         | Client      | Looks up the Activity Streams object.       |
| `activitypub.outbox`                                | Client      | Sends the ActivityPub activity.             |
| `activitypub.outbox`                                | Consumer    | Dequeues the ActivityPub activity to send.  |
| `activitypub.outbox`                                | Producer    | Enqueues the ActivityPub activity to send.  |
| `activitypub.parse_object`                          | Internal    | Parses the Activity Streams object.         |
| `activitypub.send_activity`                         | Client      | Sends the ActivityPub activity.             |
| `http_signatures.sign`                              | Internal    | Signs the HTTP request.                     |
@@ -157,6 +160,8 @@ for ActivityPub:
| `activitypub.activity.type`           | string[] | The qualified URI(s) of the activity type(s).                                            | `["https://www.w3.org/ns/activitystreams#Create"]`                   |
| `activitypub.activity.to`             | string[] | The URI(s) of the recipient collections/actors of the activity.                          | `["https://example.com/1/followers/2"]`                              |
| `activitypub.activity.cc`             | string[] | The URI(s) of the carbon-copied recipient collections/actors of the activity.            | `["https://www.w3.org/ns/activitystreams#Public"]`                   |
| `activitypub.activity.bto`            | string[] | The URI(s) of the blind recipient collections/actors of the activity.                    | `["https://example.com/1/followers/2"]`                              |
| `activitypub.activity.bcc`            | string[] | The URI(s) of the blind carbon-copied recipient collections/actors of the activity.      | `["https://www.w3.org/ns/activitystreams#Public"]`                   |
| `activitypub.activity.retries`        | int      | The ordinal number of activity resending attempt (if and only if it's retried).          | `3`                                                                  |
| `activitypub.actor.id`                | string   | The URI of the actor object.                                                             | `"https://example.com/actor/1"`                                      |
| `activitypub.actor.type`              | string[] | The qualified URI(s) of the actor type(s).                                               | `["https://www.w3.org/ns/activitystreams#Person"]`                   |
+136 −6
Original line number Diff line number Diff line
import { getLogger, withContext } from "@logtape/logtape";
import {
  context,
  propagation,
  type Span,
  SpanKind,
  SpanStatusCode,
@@ -492,9 +494,37 @@ export class FederationImpl<TContextData> implements Federation<TContextData> {
  }

  #listenQueue(ctxData: TContextData, message: Message): Promise<void> {
    const tracer = this.#getTracer();
    return withContext({ messageId: message.id }, async () => {
      if (message.type === "outbox") {
        const extractedContext = propagation.extract(
          context.active(),
          message.traceContext,
        );
        await tracer.startActiveSpan(
          "activitypub.outbox",
          {
            kind: SpanKind.CONSUMER,
            attributes: { "activitypub.activity.type": message.activityType },
          },
          extractedContext,
          async (span) => {
            if (message.activityId != null) {
              span.setAttribute("activitypub.activity.id", message.activityId);
            }
            try {
              await this.#listenOutboxMessage(ctxData, message);
            } catch (e) {
              span.setStatus({
                code: SpanStatusCode.ERROR,
                message: String(e),
              });
              throw e;
            } finally {
              span.end();
            }
          },
        );
      } else if (message.type === "inbox") {
        await this.#listenInboxMessage(ctxData, message);
      }
@@ -1824,6 +1854,7 @@ export class FederationImpl<TContextData> implements Federation<TContextData> {
    recipients: Recipient | Recipient[],
    activity: Activity,
    options: SendActivityInternalOptions<TContextData>,
    span?: Span,
  ): Promise<void> {
    const logger = getLogger(["fedify", "federation", "outbox"]);
    const {
@@ -1849,9 +1880,9 @@ export class FederationImpl<TContextData> implements Federation<TContextData> {
      );
    }
    if (activity.id == null) {
      activity = activity.clone({
        id: new URL(`urn:uuid:${crypto.randomUUID()}`),
      });
      const id = new URL(`urn:uuid:${crypto.randomUUID()}`);
      activity = activity.clone({ id });
      span?.setAttribute("activitypub.activity.id", id.href);
    }
    const inboxes = extractInboxes({
      recipients: Array.isArray(recipients) ? recipients : [recipients],
@@ -1975,6 +2006,8 @@ export class FederationImpl<TContextData> implements Federation<TContextData> {
      keyJwkPairs.push({ keyId: keyId.href, privateKey: privateKeyJwk });
    }
    if (!this.manuallyStartQueue) this.#startQueue(contextData);
    const carrier: Record<string, string> = {};
    propagation.inject(context.active(), carrier);
    const promises: Promise<void>[] = [];
    for (const inbox in inboxes) {
      const message: OutboxMessage = {
@@ -1995,6 +2028,7 @@ export class FederationImpl<TContextData> implements Federation<TContextData> {
              inboxes[inbox].actorIds,
            ),
        },
        traceContext: carrier,
      };
      promises.push(this.outboxQueue.enqueue(message));
    }
@@ -2806,7 +2840,7 @@ export class ContextImpl<TContextData> implements Context<TContextData> {
    });
  }

  async sendActivity(
  sendActivity(
    sender:
      | SenderKeyPair
      | SenderKeyPair[]
@@ -2816,6 +2850,58 @@ export class ContextImpl<TContextData> implements Context<TContextData> {
    recipients: Recipient | Recipient[] | "followers",
    activity: Activity,
    options: SendActivityOptions = {},
  ): Promise<void> {
    const tracer = this.tracerProvider.getTracer(
      metadata.name,
      metadata.version,
    );
    return tracer.startActiveSpan(
      "activitypub.outbox",
      {
        kind: this.federation.outboxQueue == null || options.immediate
          ? SpanKind.CLIENT
          : SpanKind.PRODUCER,
        attributes: {
          "activitypub.activity.type": getTypeId(activity).href,
          "activitypub.activity.to": activity.toIds.map((to) => to.href),
          "activitypub.activity.cc": activity.toIds.map((cc) => cc.href),
          "activitypub.activity.bto": activity.btoIds.map((bto) => bto.href),
          "activitypub.activity.bcc": activity.toIds.map((bcc) => bcc.href),
        },
      },
      async (span) => {
        try {
          if (activity.id != null) {
            span.setAttribute("activitypub.activity.id", activity.id.href);
          }
          await this.sendActivityInternal(
            sender,
            recipients,
            activity,
            options,
            span,
          );
        } catch (e) {
          span.setStatus({ code: SpanStatusCode.ERROR, message: String(e) });
          throw e;
        } finally {
          span.end();
        }
      },
    );
  }

  protected async sendActivityInternal(
    sender:
      | SenderKeyPair
      | SenderKeyPair[]
      | { identifier: string }
      | { username: string }
      | { handle: string },
    recipients: Recipient | Recipient[] | "followers",
    activity: Activity,
    options: SendActivityOptions = {},
    span: Span,
  ): Promise<void> {
    let keys: SenderKeyPair[];
    let identifier: string | null = null;
@@ -2851,6 +2937,7 @@ export class ContextImpl<TContextData> implements Context<TContextData> {
          identifier = mapped;
        }
      }
      span.setAttribute("fedify.actor.identifier", identifier);
      keys = await this.getKeyPairsFromIdentifier(identifier);
      if (keys.length < 1) {
        throw new Error(
@@ -2895,11 +2982,13 @@ export class ContextImpl<TContextData> implements Context<TContextData> {
    } else {
      expandedRecipients = [recipients];
    }
    span.setAttribute("activitypub.inboxes", expandedRecipients.length);
    return await this.federation.sendActivity(
      keys,
      expandedRecipients,
      activity,
      opts,
      span,
    );
  }

@@ -3094,7 +3183,45 @@ export class InboxContextImpl<TContextData> extends ContextImpl<TContextData>
    recipients: "followers",
    options?: ForwardActivityOptions,
  ): Promise<void>;
  async forwardActivity(
  forwardActivity(
    forwarder:
      | SenderKeyPair
      | SenderKeyPair[]
      | { identifier: string }
      | { username: string }
      | { handle: string },
    recipients: Recipient | Recipient[] | "followers",
    options?: ForwardActivityOptions,
  ): Promise<void> {
    const tracer = this.tracerProvider.getTracer(
      metadata.name,
      metadata.version,
    );
    return tracer.startActiveSpan(
      "activitypub.outbox",
      {
        kind: this.federation.outboxQueue == null || options?.immediate
          ? SpanKind.CLIENT
          : SpanKind.PRODUCER,
        attributes: { "activitypub.activity.type": this.activityType },
      },
      async (span) => {
        try {
          if (this.activityId != null) {
            span.setAttribute("activitypub.activity.id", this.activityId);
          }
          await this.forwardActivityInternal(forwarder, recipients, options);
        } catch (e) {
          span.setStatus({ code: SpanStatusCode.ERROR, message: String(e) });
          throw e;
        } finally {
          span.end();
        }
      },
    );
  }

  private async forwardActivityInternal(
    forwarder:
      | SenderKeyPair
      | SenderKeyPair[]
@@ -3234,6 +3361,8 @@ export class InboxContextImpl<TContextData> extends ContextImpl<TContextData>
      const privateKeyJwk = await exportJwk(privateKey);
      keyJwkPairs.push({ keyId: keyId.href, privateKey: privateKeyJwk });
    }
    const carrier: Record<string, string> = {};
    propagation.inject(context.active(), carrier);
    const promises: Promise<void>[] = [];
    for (const inbox in inboxes) {
      const message: OutboxMessage = {
@@ -3248,6 +3377,7 @@ export class InboxContextImpl<TContextData> extends ContextImpl<TContextData>
        started: new Date().toISOString(),
        attempt: 0,
        headers: {},
        traceContext: carrier,
      };
      promises.push(this.federation.outboxQueue.enqueue(message));
    }
+1 −0
Original line number Diff line number Diff line
@@ -17,6 +17,7 @@ export interface OutboxMessage {
  started: string;
  attempt: number;
  headers: Record<string, string>;
  traceContext: Record<string, string>;
}

export interface InboxMessage {