Unverified Commit 5eedba62 authored by Hong Minhee's avatar Hong Minhee
Browse files

Instrument inbox

parent 9cfc4531
Loading
Loading
Loading
Loading
+28 −23
Original line number Diff line number Diff line
@@ -118,14 +118,18 @@ Fedify automatically instruments the following operations with OpenTelemetry
spans:

| Span name                                           | [Span kind] | Description                                   |
|-----------------------------------------------------|-------------|---------------------------------------------|
|-----------------------------------------------------|-------------|-----------------------------------------------|
| `{method} {template}`                               | Server      | Serves the incoming HTTP request.             |
| `activitypub.dispatch_actor`                        | Server      | Dispatches the ActivityPub actor.             |
| `activitypub.dispatch_actor_key_pairs`              | Server      | Dispatches the ActivityPub actor key pairs.   |
| `activitypub.dispatch_collection {collection}`      | Server      | Dispatches the ActivityPub collection.        |
| `activitypub.dispatch_collection_page {collection}` | Server      | Dispatches the ActivityPub collection page.   |
| `activitypub.dispatch_inbox_listener {type}`        | Internal    | Dispatches the ActivityPub inbox listener.    |
| `activitypub.dispatch_object`                       | Server      | Dispatches the Activity Streams object.       |
| `activitypub.get_actor_handle`                      | Client      | Resolves the actor handle.                    |
| `activitypub.inbox`                                 | Consumer    | Dequeues the ActivityPub activity to receive. |
| `activitypub.inbox`                                 | Producer    | Enqueues the ActivityPub activity to receive. |
| `activitypub.inbox`                                 | Server      | Receives the ActivityPub activity.            |
| `activitypub.lookup_object`                         | Client      | Looks up the Activity Streams object.         |
| `activitypub.outbox`                                | Client      | Sends the ActivityPub activity.               |
| `activitypub.outbox`                                | Consumer    | Dequeues the ActivityPub activity to send.    |
@@ -174,6 +178,7 @@ for ActivityPub:
| `activitypub.inboxes`                 | int      | The number of inboxes the activity is sent to.                                           | `12`                                                                 |
| `activitypub.shared_inbox`            | boolean  | Whether the activity is sent to the shared inbox.                                        | `true`                                                               |
| `fedify.actor.identifier`             | string   | The identifier of the actor.                                                             | `"1"`                                                                |
| `fedify.inbox.recipient`              | string   | The identifier of the inbox recipient.                                                   | `"1"`                                                                |
| `fedify.object.type`                  | string   | The URI of the object type.                                                              | `"https://www.w3.org/ns/activitystreams#Note"`                       |
| `fedify.object.values.{parameter}`    | string[] | The argument values of the object dispatcher.                                            | `["1", "2"]`                                                         |
| `fedify.collection.cursor`            | string   | The cursor of the collection.                                                            | `"eyJpZCI6IjEiLCJ0eXBlIjoiT3JkZXJlZENvbGxlY3Rpb24ifQ=="`             |
+204 −90
Original line number Diff line number Diff line
import { getLogger } from "@logtape/logtape";
import type { TracerProvider } from "@opentelemetry/api";
import { SpanKind, SpanStatusCode, trace } from "@opentelemetry/api";
import type { Span, TracerProvider } from "@opentelemetry/api";
import {
  context,
  propagation,
  SpanKind,
  SpanStatusCode,
  trace,
} from "@opentelemetry/api";
import { accepts } from "@std/http/negotiation";
import metadata from "../deno.json" with { type: "json" };
import type { DocumentLoader } from "../runtime/docloader.ts";
@@ -431,10 +437,38 @@ export interface InboxHandlerParameters<TContextData> {
}

export async function handleInbox<TContextData>(
  request: Request,
  options: InboxHandlerParameters<TContextData>,
): Promise<Response> {
  const tracerProvider = options.tracerProvider ?? trace.getTracerProvider();
  const tracer = tracerProvider.getTracer(metadata.name, metadata.version);
  return await tracer.startActiveSpan(
    "activitypub.inbox",
    {
      kind: options.queue == null ? SpanKind.SERVER : SpanKind.PRODUCER,
      attributes: { "activitypub.shared_inbox": options.recipient == null },
    },
    async (span) => {
      if (options.recipient != null) {
        span.setAttribute("fedify.inbox.recipient", options.recipient);
      }
      try {
        return await handleInboxInternal(request, options, span);
      } catch (e) {
        span.setStatus({ code: SpanStatusCode.ERROR, message: String(e) });
        throw e;
      } finally {
        span.end();
      }
    },
  );
}

async function handleInboxInternal<TContextData>(
  request: Request,
  {
    recipient,
    context,
    context: ctx,
    inboxContextFactory,
    kv,
    kvPrefixes,
@@ -447,26 +481,43 @@ export async function handleInbox<TContextData>(
    skipSignatureVerification,
    tracerProvider,
  }: InboxHandlerParameters<TContextData>,
  span: Span,
): Promise<Response> {
  const logger = getLogger(["fedify", "federation", "inbox"]);
  if (actorDispatcher == null) {
    logger.error("Actor dispatcher is not set.", { recipient });
    span.setStatus({
      code: SpanStatusCode.ERROR,
      message: "Actor dispatcher is not set.",
    });
    return await onNotFound(request);
  } else if (recipient != null) {
    const actor = await actorDispatcher(context, recipient);
    const actor = await actorDispatcher(ctx, recipient);
    if (actor == null) {
      logger.error("Actor {recipient} not found.", { recipient });
      span.setStatus({
        code: SpanStatusCode.ERROR,
        message: `Actor ${recipient} not found.`,
      });
      return await onNotFound(request);
    }
  }
  if (request.bodyUsed) {
    logger.error("Request body has already been read.", { recipient });
    span.setStatus({
      code: SpanStatusCode.ERROR,
      message: "Request body has already been read.",
    });
    return new Response("Internal server error.", {
      status: 500,
      headers: { "Content-Type": "text/plain; charset=utf-8" },
    });
  } else if (request.body?.locked) {
    logger.error("Request body is locked.", { recipient });
    span.setStatus({
      code: SpanStatusCode.ERROR,
      message: "Request body is locked.",
    });
    return new Response("Internal server error.", {
      status: 500,
      headers: { "Content-Type": "text/plain; charset=utf-8" },
@@ -478,13 +529,17 @@ export async function handleInbox<TContextData>(
  } catch (error) {
    logger.error("Failed to parse JSON:\n{error}", { recipient, error });
    try {
      await inboxErrorHandler?.(context, error as Error);
      await inboxErrorHandler?.(ctx, error as Error);
    } catch (error) {
      logger.error(
        "An unexpected error occurred in inbox error handler:\n{error}",
        { error, activity: json, recipient },
      );
    }
    span.setStatus({
      code: SpanStatusCode.ERROR,
      message: `Failed to parse JSON:\n${error}`,
    });
    return new Response("Invalid JSON.", {
      status: 400,
      headers: { "Content-Type": "text/plain; charset=utf-8" },
@@ -501,7 +556,7 @@ export async function handleInbox<TContextData>(
      if (serialized == null) return undefined;
      let object: Object;
      try {
        object = await Object.fromJsonLd(serialized, context);
        object = await Object.fromJsonLd(serialized, ctx);
      } catch {
        await kv.delete([...kvPrefixes.publicKey, keyId.href]);
        return undefined;
@@ -519,13 +574,13 @@ export async function handleInbox<TContextData>(
        return;
      }
      this.nullKeys.delete(keyId.href);
      const serialized = await key.toJsonLd(context);
      const serialized = await key.toJsonLd(ctx);
      await kv.set([...kvPrefixes.publicKey, keyId.href], serialized);
    },
  };
  const ldSigVerified = await verifyJsonLd(json, {
    contextLoader: context.contextLoader,
    documentLoader: context.documentLoader,
    contextLoader: ctx.contextLoader,
    documentLoader: ctx.documentLoader,
    keyCache,
    tracerProvider,
  });
@@ -533,7 +588,7 @@ export async function handleInbox<TContextData>(
  let activity: Activity | null = null;
  if (ldSigVerified) {
    logger.debug("Linked Data Signatures are verified.", { recipient, json });
    activity = await Activity.fromJsonLd(jsonWithoutSig, context);
    activity = await Activity.fromJsonLd(jsonWithoutSig, ctx);
  } else {
    logger.debug(
      "Linked Data Signatures are not verified.",
@@ -541,8 +596,8 @@ export async function handleInbox<TContextData>(
    );
    try {
      activity = await verifyObject(Activity, jsonWithoutSig, {
        contextLoader: context.contextLoader,
        documentLoader: context.documentLoader,
        contextLoader: ctx.contextLoader,
        documentLoader: ctx.documentLoader,
        keyCache,
        tracerProvider,
      });
@@ -553,13 +608,17 @@ export async function handleInbox<TContextData>(
        error,
      });
      try {
        await inboxErrorHandler?.(context, error as Error);
        await inboxErrorHandler?.(ctx, error as Error);
      } catch (error) {
        logger.error(
          "An unexpected error occurred in inbox error handler:\n{error}",
          { error, activity: json, recipient },
        );
      }
      span.setStatus({
        code: SpanStatusCode.ERROR,
        message: `Failed to parse activity:\n${error}`,
      });
      return new Response("Invalid activity.", {
        status: 400,
        headers: { "Content-Type": "text/plain; charset=utf-8" },
@@ -581,8 +640,8 @@ export async function handleInbox<TContextData>(
  if (activity == null) {
    if (!skipSignatureVerification) {
      const key = await verifyRequest(request, {
        contextLoader: context.contextLoader,
        documentLoader: context.documentLoader,
        contextLoader: ctx.contextLoader,
        documentLoader: ctx.documentLoader,
        timeWindow: signatureTimeWindow,
        keyCache,
        tracerProvider,
@@ -592,6 +651,10 @@ export async function handleInbox<TContextData>(
          "Failed to verify the request's HTTP Signatures.",
          { recipient },
        );
        span.setStatus({
          code: SpanStatusCode.ERROR,
          message: `Failed to verify the request's HTTP Signatures.`,
        });
        const response = new Response(
          "Failed to verify the request signature.",
          {
@@ -605,8 +668,12 @@ export async function handleInbox<TContextData>(
      }
      httpSigKey = key;
    }
    activity = await Activity.fromJsonLd(jsonWithoutSig, context);
    activity = await Activity.fromJsonLd(jsonWithoutSig, ctx);
  }
  if (activity.id != null) {
    span.setAttribute("activitypub.activity.id", activity.id.href);
  }
  span.setAttribute("activitypub.activity.type", getTypeId(activity).href);
  const cacheKey = activity.id == null
    ? null
    : [...kvPrefixes.activityIdempotence, activity.id.href] satisfies KvKey;
@@ -618,6 +685,10 @@ export async function handleInbox<TContextData>(
        activity: json,
        recipient,
      });
      span.setStatus({
        code: SpanStatusCode.UNSET,
        message: `Activity ${activity.id?.href} has already been processed.`,
      });
      return new Response(
        `Activity <${activity.id}> has already been processed.`,
        {
@@ -629,14 +700,15 @@ export async function handleInbox<TContextData>(
  }
  if (activity.actorId == null) {
    logger.error("Missing actor.", { activity: json });
    const response = new Response("Missing actor.", {
    span.setStatus({ code: SpanStatusCode.ERROR, message: "Missing actor." });
    return new Response("Missing actor.", {
      status: 400,
      headers: { "Content-Type": "text/plain; charset=utf-8" },
    });
    return response;
  }
  span.setAttribute("activitypub.actor.id", activity.actorId.href);
  if (
    httpSigKey != null && !await doesActorOwnKey(activity, httpSigKey, context)
    httpSigKey != null && !await doesActorOwnKey(activity, httpSigKey, ctx)
  ) {
    logger.error(
      "The signer ({keyId}) and the actor ({actorId}) do not match.",
@@ -647,13 +719,20 @@ export async function handleInbox<TContextData>(
        actorId: activity.actorId.href,
      },
    );
    const response = new Response("The signer and the actor do not match.", {
    span.setStatus({
      code: SpanStatusCode.ERROR,
      message: `The signer (${httpSigKey.id?.href}) and ` +
        `the actor (${activity.actorId.href}) do not match.`,
    });
    return new Response("The signer and the actor do not match.", {
      status: 401,
      headers: { "Content-Type": "text/plain; charset=utf-8" },
    });
    return response;
  }
  if (queue != null) {
    const carrier: Record<string, string> = {};
    propagation.inject(context.active(), carrier);
    try {
      await queue.enqueue(
        {
          type: "inbox",
@@ -663,8 +742,21 @@ export async function handleInbox<TContextData>(
          identifier: recipient,
          attempt: 0,
          started: new Date().toISOString(),
          traceContext: carrier,
        } satisfies InboxMessage,
      );
    } catch (error) {
      logger.error(
        "Failed to enqueue the incoming activity {activityId}:\n{error}",
        { error, activityId: activity.id?.href, activity: json, recipient },
      );
      span.setStatus({
        code: SpanStatusCode.ERROR,
        message:
          `Failed to enqueue the incoming activity ${activity.id?.href}.`,
      });
      throw error;
    }
    logger.info(
      "Activity {activityId} is enqueued.",
      { activityId: activity.id?.href, activity: json, recipient },
@@ -674,36 +766,49 @@ export async function handleInbox<TContextData>(
      headers: { "Content-Type": "text/plain; charset=utf-8" },
    });
  }
  const listener = inboxListeners?.dispatch(activity);
  if (listener == null) {
  tracerProvider = tracerProvider ?? trace.getTracerProvider();
  const tracer = tracerProvider.getTracer(metadata.name, metadata.version);
  const response = await tracer.startActiveSpan(
    "activitypub.dispatch_inbox_listener",
    { kind: SpanKind.INTERNAL },
    async (span) => {
      const dispatched = inboxListeners?.dispatchWithClass(activity!);
      if (dispatched == null) {
        logger.error(
          "Unsupported activity type:\n{activity}",
          { activity: json, recipient },
        );
        span.setStatus({
          code: SpanStatusCode.UNSET,
          message: `Unsupported activity type: ${getTypeId(activity!).href}`,
        });
        span.end();
        return new Response("", {
          status: 202,
          headers: { "Content-Type": "text/plain; charset=utf-8" },
        });
      }
      const { class: cls, listener } = dispatched;
      span.updateName(`activitypub.dispatch_inbox_listener ${cls.name}`);
      try {
        await listener(
          inboxContextFactory(
            recipient,
            json,
        activity.id?.href,
        getTypeId(activity).href,
            activity?.id?.href,
            getTypeId(activity!).href,
          ),
      activity,
          activity!,
        );
      } catch (error) {
        try {
      await inboxErrorHandler?.(context, error as Error);
          await inboxErrorHandler?.(ctx, error as Error);
        } catch (error) {
          logger.error(
            "An unexpected error occurred in inbox error handler:\n{error}",
            {
              error,
          activityId: activity.id?.href,
              activityId: activity!.id?.href,
              activity: json,
              recipient,
            },
@@ -713,27 +818,36 @@ export async function handleInbox<TContextData>(
          "Failed to process the incoming activity {activityId}:\n{error}",
          {
            error,
        activityId: activity.id?.href,
            activityId: activity!.id?.href,
            activity: json,
            recipient,
          },
        );
        span.setStatus({ code: SpanStatusCode.ERROR, message: String(error) });
        span.end();
        return new Response("Internal server error.", {
          status: 500,
          headers: { "Content-Type": "text/plain; charset=utf-8" },
        });
      }
      if (cacheKey != null) {
    await kv.set(cacheKey, true, { ttl: Temporal.Duration.from({ days: 1 }) });
        await kv.set(cacheKey, true, {
          ttl: Temporal.Duration.from({ days: 1 }),
        });
      }
      logger.info(
        "Activity {activityId} has been processed.",
    { activityId: activity.id?.href, activity: json, recipient },
        { activityId: activity!.id?.href, activity: json, recipient },
      );
      span.end();
      return new Response("", {
        status: 202,
        headers: { "Content-Type": "text/plain; charset=utf-8" },
      });
    },
  );
  if (response.status >= 500) span.setStatus({ code: SpanStatusCode.ERROR });
  return response;
}

/**
+40 −0
Original line number Diff line number Diff line
@@ -21,7 +21,15 @@ test("InboxListenerSet", () => {
  const listenOffer = () => {};
  listeners.add(Offer, listenOffer);
  assertEquals(listeners.dispatch(activity), null);
  assertEquals(
    listeners.dispatchWithClass(offer),
    { class: Offer, listener: listenOffer },
  );
  assertEquals(listeners.dispatch(offer), listenOffer);
  assertEquals(
    listeners.dispatchWithClass(invite),
    { class: Offer, listener: listenOffer },
  );
  assertEquals(listeners.dispatch(invite), listenOffer);
  assertEquals(listeners.dispatch(create), null);
  assertEquals(listeners.dispatch(update), null);
@@ -30,17 +38,49 @@ test("InboxListenerSet", () => {
  listeners.add(Create, listenCreate);
  assertEquals(listeners.dispatch(activity), null);
  assertEquals(listeners.dispatch(offer), listenOffer);
  assertEquals(
    listeners.dispatchWithClass(offer),
    { class: Offer, listener: listenOffer },
  );
  assertEquals(listeners.dispatch(invite), listenOffer);
  assertEquals(
    listeners.dispatchWithClass(invite),
    { class: Offer, listener: listenOffer },
  );
  assertEquals(listeners.dispatch(create), listenCreate);
  assertEquals(
    listeners.dispatchWithClass(create),
    { class: Create, listener: listenCreate },
  );
  assertEquals(listeners.dispatch(update), null);

  const listenActivity = () => {};
  listeners.add(Activity, listenActivity);
  assertEquals(listeners.dispatch(activity), listenActivity);
  assertEquals(
    listeners.dispatchWithClass(activity),
    { class: Activity, listener: listenActivity },
  );
  assertEquals(listeners.dispatch(offer), listenOffer);
  assertEquals(
    listeners.dispatchWithClass(offer),
    { class: Offer, listener: listenOffer },
  );
  assertEquals(listeners.dispatch(invite), listenOffer);
  assertEquals(
    listeners.dispatchWithClass(invite),
    { class: Offer, listener: listenOffer },
  );
  assertEquals(listeners.dispatch(create), listenCreate);
  assertEquals(
    listeners.dispatchWithClass(create),
    { class: Create, listener: listenCreate },
  );
  assertEquals(listeners.dispatch(update), listenActivity);
  assertEquals(
    listeners.dispatchWithClass(update),
    { class: Activity, listener: listenActivity },
  );

  assertThrows(
    () => listeners.add(Activity, listenActivity),
+13 −3
Original line number Diff line number Diff line
@@ -25,9 +25,13 @@ export class InboxListenerSet<TContextData> {
    );
  }

  dispatch<TActivity extends Activity>(
  dispatchWithClass<TActivity extends Activity>(
    activity: TActivity,
  ): InboxListener<TContextData, TActivity> | null {
  ): {
    // deno-lint-ignore no-explicit-any
    class: new (...args: any[]) => Activity;
    listener: InboxListener<TContextData, TActivity>;
  } | null {
    // deno-lint-ignore no-explicit-any
    let cls: new (...args: any[]) => Activity = activity
      // deno-lint-ignore no-explicit-any
@@ -42,6 +46,12 @@ export class InboxListenerSet<TContextData> {
      cls = globalThis.Object.getPrototypeOf(cls);
    }
    const listener = inboxListeners.get(cls)!;
    return listener;
    return { class: cls, listener };
  }

  dispatch<TActivity extends Activity>(
    activity: TActivity,
  ): InboxListener<TContextData, TActivity> | null {
    return this.dispatchWithClass(activity)?.listener ?? null;
  }
}
+144 −93
Original line number Diff line number Diff line
@@ -495,17 +495,20 @@ export class FederationImpl<TContextData> implements Federation<TContextData> {

  #listenQueue(ctxData: TContextData, message: Message): Promise<void> {
    const tracer = this.#getTracer();
    return withContext({ messageId: message.id }, async () => {
      if (message.type === "outbox") {
    const extractedContext = propagation.extract(
      context.active(),
      message.traceContext,
    );
    return withContext({ messageId: message.id }, async () => {
      if (message.type === "outbox") {
        await tracer.startActiveSpan(
          "activitypub.outbox",
          {
            kind: SpanKind.CONSUMER,
            attributes: { "activitypub.activity.type": message.activityType },
            attributes: {
              "activitypub.activity.type": message.activityType,
              "activitypub.activity.retries": message.attempt,
            },
          },
          extractedContext,
          async (span) => {
@@ -513,7 +516,7 @@ export class FederationImpl<TContextData> implements Federation<TContextData> {
              span.setAttribute("activitypub.activity.id", message.activityId);
            }
            try {
              await this.#listenOutboxMessage(ctxData, message);
              await this.#listenOutboxMessage(ctxData, message, span);
            } catch (e) {
              span.setStatus({
                code: SpanStatusCode.ERROR,
@@ -526,7 +529,29 @@ export class FederationImpl<TContextData> implements Federation<TContextData> {
          },
        );
      } else if (message.type === "inbox") {
        await this.#listenInboxMessage(ctxData, message);
        await tracer.startActiveSpan(
          "activitypub.inbox",
          {
            kind: SpanKind.CONSUMER,
            attributes: {
              "activitypub.shared_inbox": message.identifier == null,
            },
          },
          extractedContext,
          async (span) => {
            try {
              await this.#listenInboxMessage(ctxData, message, span);
            } catch (e) {
              span.setStatus({
                code: SpanStatusCode.ERROR,
                message: String(e),
              });
              throw e;
            } finally {
              span.end();
            }
          },
        );
      }
    });
  }
@@ -534,6 +559,7 @@ export class FederationImpl<TContextData> implements Federation<TContextData> {
  async #listenOutboxMessage(
    _: TContextData,
    message: OutboxMessage,
    span: Span,
  ): Promise<void> {
    const logger = getLogger(["fedify", "federation", "outbox"]);
    const logData = {
@@ -571,6 +597,7 @@ export class FederationImpl<TContextData> implements Federation<TContextData> {
        tracerProvider: this.tracerProvider,
      });
    } catch (error) {
      span.setStatus({ code: SpanStatusCode.ERROR, message: String(error) });
      const activity = await Activity.fromJsonLd(message.activity, {
        contextLoader: this.contextLoader,
        documentLoader: rsaKeyPair == null
@@ -627,6 +654,7 @@ export class FederationImpl<TContextData> implements Federation<TContextData> {
  async #listenInboxMessage(
    ctxData: TContextData,
    message: InboxMessage,
    span: Span,
  ): Promise<void> {
    const logger = getLogger(["fedify", "federation", "inbox"]);
    const baseUrl = new URL(message.baseUrl);
@@ -649,6 +677,10 @@ export class FederationImpl<TContextData> implements Federation<TContextData> {
      }
    }
    const activity = await Activity.fromJsonLd(message.activity, context);
    span.setAttribute("activitypub.activity.type", getTypeId(activity).href);
    if (activity.id != null) {
      span.setAttribute("activitypub.activity.id", activity.id.href);
    }
    const cacheKey = activity.id == null ? null : [
      ...this.kvPrefixes.activityIdempotence,
      activity.id.href,
@@ -664,8 +696,12 @@ export class FederationImpl<TContextData> implements Federation<TContextData> {
        return;
      }
    }
    const listener = this.inboxListeners?.dispatch(activity);
    if (listener == null) {
    await this.#getTracer().startActiveSpan(
      "activitypub.dispatch_inbox_listener",
      { kind: SpanKind.INTERNAL },
      async (span) => {
        const dispatched = this.inboxListeners?.dispatchWithClass(activity);
        if (dispatched == null) {
          logger.error(
            "Unsupported activity type:\n{activity}",
            {
@@ -675,8 +711,15 @@ export class FederationImpl<TContextData> implements Federation<TContextData> {
              trial: message.attempt,
            },
          );
          span.setStatus({
            code: SpanStatusCode.ERROR,
            message: `Unsupported activity type: ${getTypeId(activity).href}`,
          });
          span.end();
          return;
        }
        const { class: cls, listener } = dispatched;
        span.updateName(`activitypub.dispatch_inbox_listener ${cls.name}`);
        try {
          await listener(
            context.toInboxContext(
@@ -743,6 +786,11 @@ export class FederationImpl<TContextData> implements Federation<TContextData> {
              },
            );
          }
          span.setStatus({
            code: SpanStatusCode.ERROR,
            message: String(error),
          });
          span.end();
          return;
        }
        if (cacheKey != null) {
@@ -758,6 +806,9 @@ export class FederationImpl<TContextData> implements Federation<TContextData> {
            recipient: message.identifier,
          },
        );
        span.end();
      },
    );
  }

  startQueue(
Loading