Unverified Commit ba3f49a7 authored by Hong Minhee's avatar Hong Minhee
Browse files

Abstract key-value store and message queue

parent 20aa8257
Loading
Loading
Loading
Loading
+24 −0
Original line number Diff line number Diff line
@@ -13,6 +13,30 @@ Version 0.5.0

To be released.

 -  Abstract key-value store for caching.

     -  Added `KvStore` interface.
     -  Added `KvStoreSetOptions` interface.
     -  Added `KvKey` type.
     -  Added `MemoryKvStore` class.
     -  `KvCacheParameters.kv` option now accepts a `KvStore` instead of
        `Deno.Kv`.
     -  `KvCacheParameters.prefix` option now accepts a `KvKey` instead of
        `Deno.KvKey`.
     -  `FederationParameters.kv` option now accepts a `KvStore` instead of
        `Deno.Kv`.
     -  `FederationKvPrefixes.activityIdempotence` option now accepts a `KvKey`
        instead of `Deno.KvKey`.
     -  `FederationKvPrefixes.remoteDocument` option now accepts a `KvKey`
        instead of `Deno.KvKey`.

 -  Abstract message queue for outgoing activities.

     -  Added `MessageQueue` interface.
     -  Added `MessageQueueEnqueueOptions` interface.
     -  Added `InProcessMessageQueue` class.
     -  Added `FederationParameters.queue` option.

 -  Removed dependency on *jose*.

     -  Added `exportSpki()` function.
+18 −5
Original line number Diff line number Diff line
@@ -40,13 +40,14 @@ properties. Some of them are required:

### `kv`

*Required.*  The `~FederationParameters.kv` property is a [`Deno.Kv`] instance
*Required.*  The `~FederationParameters.kv` property is a `KvStore` instance
that the `Federation` object uses to store several kinds of cache data and
to maintain the queue of outgoing activities.  Usually instantiated by
calling the [`Deno.openKv()`] function.
to maintain the queue of outgoing activities.

[`Deno.Kv`]: https://deno.land/api?unstable&s=Deno.Kv
[`Deno.openKv()`]: https://deno.land/api?unstable&s=Deno.openKv
`KvStore` is an abstract interface that represents a key-value store.
Currently, there is only one implementation of `KvStore`, which is the
`MemoryKvStore` class, but you can define your own `KvStore` implementation
if you want to use a different key-value store.

### `kvPrefixes`

@@ -62,6 +63,18 @@ that the `Federation` object uses:
:   The key prefix used for storing remote JSON-LD documents.
    `["_fedify", "remoteDocument"]` by default.

### `queue`

The `~FederationParameters.queue` property is a `MessageQueue` instance that
the `Federation` object uses to maintain the queue of outgoing activities.
If you don't provide this option, activities will not be queued and will
be sent immediately.

`MessageQueue` is an abstract interface that represents a message queue.
Currently, there is only one implementation of `MessageQueue`, which is the
`InProcessMessageQueue` class, but you can define your own `MessageQueue`
implementation if you want to use a different message queue.

### `documentLoader`

A JSON-LD document loader function that the `Federation` object uses to
+10 −5
Original line number Diff line number Diff line
import { Temporal } from "@js-temporal/polyfill";
import { parse } from "@std/semver";
import { dirname, join } from "@std/path";
import { Federation } from "@fedify/fedify/federation";
import {
  Federation,
  InProcessMessageQueue,
  MemoryKvStore,
} from "@fedify/fedify/federation";
import {
  Accept,
  Activity,
@@ -23,15 +27,16 @@ import {
  getFollowers,
  removeFollower,
} from "../models/follower.ts";
import { openKv } from "../models/kv.ts";
import { countPosts, getPosts, toArticle } from "../models/post.ts";

// The `Federation<TContextData>` object is a registry that registers
// federation-related callbacks:
export const federation = new Federation<void>({
  // The following Deno KV storage is used for several purposes, such as
  // cache and outbox queue:
  kv: await openKv(),
  // The following key-value storage is used for internal cache:
  kv: new MemoryKvStore(),

  // The following message queue is used for maintaining outgoing activities:
  queue: new InProcessMessageQueue(),

  // The following option is useful for local development, as Fresh's dev
  // server does not support HTTPS:
+3 −2
Original line number Diff line number Diff line
{
  "imports": {
    "@cfworker/json-schema": "npm:@cfworker/json-schema@^1.12.8",
    "@deno/dnt": "jsr:@deno/dnt@^0.41.1",
    "@fedify/fedify": "../../mod.ts",
    "@fedify/fedify/federation": "../../federation/mod.ts",
    "@fedify/fedify/httpsig": "../../httpsig/mod.ts",
@@ -16,8 +17,8 @@
    "@std/collections": "jsr:@std/collections@^0.220.1",
    "@std/encoding": "jsr:@std/encoding@^0.220.1",
    "@std/encoding/base64": "jsr:@std/encoding@^0.220.1/base64",
    "@std/http": "jsr:@std/http@^0.220.1",
    "@std/json": "jsr:@std/json@^0.220.1",
    "@std/http/negotiation": "jsr:@std/http@^0.220.1/negotiation",
    "@std/json/common": "jsr:@std/json@^0.220.1/common",
    "@std/path": "jsr:@std/path@^0.220.1",
    "@std/semver": "jsr:@std/semver@^0.220.1",
    "@std/testing": "jsr:@std/testing@^0.220.1",
+9 −5
Original line number Diff line number Diff line
import { Temporal } from "@js-temporal/polyfill";
import { accepts } from "@std/http";
import { doesActorOwnKey, verify } from "../httpsig/mod.ts";
import type { DocumentLoader } from "../runtime/docloader.ts";
@@ -17,6 +18,7 @@ import type {
  InboxListener,
} from "./callback.ts";
import type { RequestContext } from "./context.ts";
import type { KvKey, KvStore } from "./kv.ts";

export function acceptsJsonLd(request: Request): boolean {
  const types = accepts(request);
@@ -210,8 +212,8 @@ export async function handleCollection<
export interface InboxHandlerParameters<TContextData> {
  handle: string | null;
  context: RequestContext<TContextData>;
  kv: Deno.Kv;
  kvPrefix: Deno.KvKey;
  kv: KvStore;
  kvPrefix: KvKey;
  actorDispatcher?: ActorDispatcher<TContextData>;
  inboxListeners: Map<
    new (...args: unknown[]) => Activity,
@@ -276,10 +278,12 @@ export async function handleInbox<TContextData>(
      headers: { "Content-Type": "text/plain; charset=utf-8" },
    });
  }
  const cacheKey = activity.id == null ? null : [...kvPrefix, activity.id.href];
  const cacheKey = activity.id == null
    ? null
    : [...kvPrefix, activity.id.href] satisfies KvKey;
  if (cacheKey != null) {
    const cached = await kv.get(cacheKey);
    if (cached != null && cached.value === true) {
    if (cached === true) {
      return new Response(
        `Activity <${activity.id}> has already been processed.`,
        {
@@ -330,7 +334,7 @@ export async function handleInbox<TContextData>(
    });
  }
  if (cacheKey != null) {
    await kv.set(cacheKey, true, { expireIn: 1000 * 60 * 60 * 24 });
    await kv.set(cacheKey, true, { ttl: Temporal.Duration.from({ days: 1 }) });
  }
  return new Response("", {
    status: 202,
Loading