Loading src/federation/handler.ts +80 −71 Original line number Diff line number Diff line import { getLogger } from "@logtape/logtape"; import type { Span, TracerProvider } from "@opentelemetry/api"; import type { TracerProvider } from "@opentelemetry/api"; import { SpanKind, SpanStatusCode, trace } from "@opentelemetry/api"; import { accepts } from "@std/http/negotiation"; import metadata from "../deno.json" with { type: "json" }; Loading @@ -10,7 +10,6 @@ import { detachSignature, verifyJsonLd } from "../sig/ld.ts"; import { doesActorOwnKey } from "../sig/owner.ts"; import { verifyObject } from "../sig/proof.ts"; import type { Recipient } from "../vocab/actor.ts"; import { getTypeId } from "../vocab/type.ts"; import { Activity, CryptographicKey, Loading Loading @@ -197,46 +196,13 @@ export interface CollectionHandlerParameters< onNotAcceptable(request: Request): Response | Promise<Response>; } export function handleCollection< export async function handleCollection< TItem extends URL | Object | Link | Recipient, TContext extends RequestContext<TContextData>, TContextData, TFilter, >( request: Request, params: CollectionHandlerParameters<TItem, TContext, TContextData, TFilter>, ): Promise<Response> { const name = params.name.trim().replace(/\s+/g, "_"); const tracerProvider = params.tracerProvider ?? trace.getTracerProvider(); const tracer = tracerProvider.getTracer(metadata.name, metadata.version); const url = new URL(request.url); const cursor = url.searchParams.get("cursor"); return tracer.startActiveSpan( cursor == null ? `activitypub.dispatch_collection ${name}` : `activitypub.dispatch_collection_page ${name}`, { kind: SpanKind.SERVER }, async (span) => { try { return await handleCollectionInternal(request, cursor, params, span); } catch (e) { span.setStatus({ code: SpanStatusCode.ERROR, message: String(e) }); throw e; } finally { span.end(); } }, ); } async function handleCollectionInternal< TItem extends URL | Object | Link | Recipient, TContext extends RequestContext<TContextData>, TContextData, TFilter, >( request: Request, cursor: string | null, { name, identifier, Loading @@ -245,12 +211,17 @@ async function handleCollectionInternal< filterPredicate, context, collectionCallbacks, tracerProvider, onUnauthorized, onNotFound, onNotAcceptable, }: CollectionHandlerParameters<TItem, TContext, TContextData, TFilter>, span: Span, ): Promise<Response> { const spanName = name.trim().replace(/\s+/g, "_"); tracerProvider = tracerProvider ?? trace.getTracerProvider(); const tracer = tracerProvider.getTracer(metadata.name, metadata.version); const url = new URL(request.url); const cursor = url.searchParams.get("cursor"); if (collectionCallbacks == null) return await onNotFound(request); let collection: OrderedCollection | OrderedCollectionPage; const baseUri = uriGetter(identifier); Loading @@ -260,13 +231,24 @@ async function handleCollectionInternal< identifier, ); const totalItems = await collectionCallbacks.counter?.(context, identifier); if (firstCursor == null) { const itemsOrResponse = await tracer.startActiveSpan( `activitypub.dispatch_collection ${spanName}`, { kind: SpanKind.SERVER, attributes: { "activitypub.collection.id": baseUri.href, "activitypub.collection.type": OrderedCollection.typeId.href, }, }, async (span) => { if (totalItems != null) { span.setAttribute( "activitypub.collection.total_items", Number(totalItems), ); } if (firstCursor == null) { try { const page = await collectionCallbacks.dispatcher( context, identifier, Loading @@ -279,10 +261,20 @@ async function handleCollectionInternal< } const { items } = page; span.setAttribute("fedify.collection.items", items.length); return items; } catch (e) { span.setStatus({ code: SpanStatusCode.ERROR, message: String(e) }); throw e; } finally { span.end(); } }, ); if (itemsOrResponse instanceof Response) return itemsOrResponse; collection = new OrderedCollection({ id: baseUri, totalItems: totalItems == null ? null : Number(totalItems), items: filterCollectionItems(items, name, filterPredicate), items: filterCollectionItems(itemsOrResponse, name, filterPredicate), }); } else { const lastCursor = await collectionCallbacks.lastCursor?.( Loading @@ -304,9 +296,20 @@ async function handleCollectionInternal< }); } } else { span.setAttribute("fedify.collection.cursor", cursor); const uri = new URL(baseUri); uri.searchParams.set("cursor", cursor); const pageOrResponse = await tracer.startActiveSpan( `activitypub.dispatch_collection_page ${name}`, { kind: SpanKind.SERVER, attributes: { "activitypub.collection.id": uri.href, "activitypub.collection.type": OrderedCollectionPage.typeId.href, "fedify.collection.cursor": cursor, }, }, async (span) => { try { const page = await collectionCallbacks.dispatcher( context, identifier, Loading @@ -317,8 +320,18 @@ async function handleCollectionInternal< span.setStatus({ code: SpanStatusCode.ERROR }); return await onNotFound(request); } const { items, prevCursor, nextCursor } = page; span.setAttribute("fedify.collection.items", items.length); span.setAttribute("fedify.collection.items", page.items.length); return page; } catch (e) { span.setStatus({ code: SpanStatusCode.ERROR, message: String(e) }); throw e; } finally { span.end(); } }, ); if (pageOrResponse instanceof Response) return pageOrResponse; const { items, prevCursor, nextCursor } = pageOrResponse; let prev = null; if (prevCursor != null) { prev = new URL(context.url); Loading Loading @@ -354,10 +367,6 @@ async function handleCollectionInternal< return await onUnauthorized(request); } } if (collection.id != null) { span.setAttribute("activitypub.collection.id", collection.id.href); } span.setAttribute("activitypub.collection.type", getTypeId(collection).href); const jsonLd = await collection.toJsonLd(context); return new Response(JSON.stringify(jsonLd), { headers: { Loading Loading
src/federation/handler.ts +80 −71 Original line number Diff line number Diff line import { getLogger } from "@logtape/logtape"; import type { Span, TracerProvider } from "@opentelemetry/api"; import type { TracerProvider } from "@opentelemetry/api"; import { SpanKind, SpanStatusCode, trace } from "@opentelemetry/api"; import { accepts } from "@std/http/negotiation"; import metadata from "../deno.json" with { type: "json" }; Loading @@ -10,7 +10,6 @@ import { detachSignature, verifyJsonLd } from "../sig/ld.ts"; import { doesActorOwnKey } from "../sig/owner.ts"; import { verifyObject } from "../sig/proof.ts"; import type { Recipient } from "../vocab/actor.ts"; import { getTypeId } from "../vocab/type.ts"; import { Activity, CryptographicKey, Loading Loading @@ -197,46 +196,13 @@ export interface CollectionHandlerParameters< onNotAcceptable(request: Request): Response | Promise<Response>; } export function handleCollection< export async function handleCollection< TItem extends URL | Object | Link | Recipient, TContext extends RequestContext<TContextData>, TContextData, TFilter, >( request: Request, params: CollectionHandlerParameters<TItem, TContext, TContextData, TFilter>, ): Promise<Response> { const name = params.name.trim().replace(/\s+/g, "_"); const tracerProvider = params.tracerProvider ?? trace.getTracerProvider(); const tracer = tracerProvider.getTracer(metadata.name, metadata.version); const url = new URL(request.url); const cursor = url.searchParams.get("cursor"); return tracer.startActiveSpan( cursor == null ? `activitypub.dispatch_collection ${name}` : `activitypub.dispatch_collection_page ${name}`, { kind: SpanKind.SERVER }, async (span) => { try { return await handleCollectionInternal(request, cursor, params, span); } catch (e) { span.setStatus({ code: SpanStatusCode.ERROR, message: String(e) }); throw e; } finally { span.end(); } }, ); } async function handleCollectionInternal< TItem extends URL | Object | Link | Recipient, TContext extends RequestContext<TContextData>, TContextData, TFilter, >( request: Request, cursor: string | null, { name, identifier, Loading @@ -245,12 +211,17 @@ async function handleCollectionInternal< filterPredicate, context, collectionCallbacks, tracerProvider, onUnauthorized, onNotFound, onNotAcceptable, }: CollectionHandlerParameters<TItem, TContext, TContextData, TFilter>, span: Span, ): Promise<Response> { const spanName = name.trim().replace(/\s+/g, "_"); tracerProvider = tracerProvider ?? trace.getTracerProvider(); const tracer = tracerProvider.getTracer(metadata.name, metadata.version); const url = new URL(request.url); const cursor = url.searchParams.get("cursor"); if (collectionCallbacks == null) return await onNotFound(request); let collection: OrderedCollection | OrderedCollectionPage; const baseUri = uriGetter(identifier); Loading @@ -260,13 +231,24 @@ async function handleCollectionInternal< identifier, ); const totalItems = await collectionCallbacks.counter?.(context, identifier); if (firstCursor == null) { const itemsOrResponse = await tracer.startActiveSpan( `activitypub.dispatch_collection ${spanName}`, { kind: SpanKind.SERVER, attributes: { "activitypub.collection.id": baseUri.href, "activitypub.collection.type": OrderedCollection.typeId.href, }, }, async (span) => { if (totalItems != null) { span.setAttribute( "activitypub.collection.total_items", Number(totalItems), ); } if (firstCursor == null) { try { const page = await collectionCallbacks.dispatcher( context, identifier, Loading @@ -279,10 +261,20 @@ async function handleCollectionInternal< } const { items } = page; span.setAttribute("fedify.collection.items", items.length); return items; } catch (e) { span.setStatus({ code: SpanStatusCode.ERROR, message: String(e) }); throw e; } finally { span.end(); } }, ); if (itemsOrResponse instanceof Response) return itemsOrResponse; collection = new OrderedCollection({ id: baseUri, totalItems: totalItems == null ? null : Number(totalItems), items: filterCollectionItems(items, name, filterPredicate), items: filterCollectionItems(itemsOrResponse, name, filterPredicate), }); } else { const lastCursor = await collectionCallbacks.lastCursor?.( Loading @@ -304,9 +296,20 @@ async function handleCollectionInternal< }); } } else { span.setAttribute("fedify.collection.cursor", cursor); const uri = new URL(baseUri); uri.searchParams.set("cursor", cursor); const pageOrResponse = await tracer.startActiveSpan( `activitypub.dispatch_collection_page ${name}`, { kind: SpanKind.SERVER, attributes: { "activitypub.collection.id": uri.href, "activitypub.collection.type": OrderedCollectionPage.typeId.href, "fedify.collection.cursor": cursor, }, }, async (span) => { try { const page = await collectionCallbacks.dispatcher( context, identifier, Loading @@ -317,8 +320,18 @@ async function handleCollectionInternal< span.setStatus({ code: SpanStatusCode.ERROR }); return await onNotFound(request); } const { items, prevCursor, nextCursor } = page; span.setAttribute("fedify.collection.items", items.length); span.setAttribute("fedify.collection.items", page.items.length); return page; } catch (e) { span.setStatus({ code: SpanStatusCode.ERROR, message: String(e) }); throw e; } finally { span.end(); } }, ); if (pageOrResponse instanceof Response) return pageOrResponse; const { items, prevCursor, nextCursor } = pageOrResponse; let prev = null; if (prevCursor != null) { prev = new URL(context.url); Loading Loading @@ -354,10 +367,6 @@ async function handleCollectionInternal< return await onUnauthorized(request); } } if (collection.id != null) { span.setAttribute("activitypub.collection.id", collection.id.href); } span.setAttribute("activitypub.collection.type", getTypeId(collection).href); const jsonLd = await collection.toJsonLd(context); return new Response(JSON.stringify(jsonLd), { headers: { Loading