Unverified Commit 49db8830 authored by Hong Minhee's avatar Hong Minhee
Browse files

Allow `sendActivity`/`forwardActivity` to reject

parent 5c16bd70
Loading
Loading
Loading
Loading
+4 −0
Original line number Diff line number Diff line
@@ -59,6 +59,9 @@ To be released.

 -  Added `getTypeId()` function.

 -  `Context.sendActivity()` and `InboxContext.forwardActivity()` methods now
    reject when they fail to enqueue the task.  [[#192]]

 -  Fedify now supports OpenTelemetry for tracing.  [[#170]]

     -  Added `Context.tracerProvider` property.
@@ -103,6 +106,7 @@ To be released.
[#173]: https://github.com/dahlia/fedify/issues/173
[#183]: https://github.com/dahlia/fedify/pull/183
[#186]: https://github.com/dahlia/fedify/pull/186
[#192]: https://github.com/dahlia/fedify/issues/192


Version 1.2.8
+40 −4
Original line number Diff line number Diff line
@@ -568,7 +568,7 @@ export class FederationImpl<TContextData> implements Federation<TContextData> {
            "#{attempt}); retry...:\n{error}",
          { ...logData, error },
        );
        this.outboxQueue?.enqueue(
        await this.outboxQueue?.enqueue(
          {
            ...message,
            attempt: message.attempt + 1,
@@ -690,7 +690,7 @@ export class FederationImpl<TContextData> implements Federation<TContextData> {
            recipient: message.identifier,
          },
        );
        this.inboxQueue?.enqueue(
        await this.inboxQueue?.enqueue(
          {
            ...message,
            attempt: message.attempt + 1,
@@ -1975,6 +1975,7 @@ export class FederationImpl<TContextData> implements Federation<TContextData> {
      keyJwkPairs.push({ keyId: keyId.href, privateKey: privateKeyJwk });
    }
    if (!this.manuallyStartQueue) this.#startQueue(contextData);
    const promises: Promise<void>[] = [];
    for (const inbox in inboxes) {
      const message: OutboxMessage = {
        type: "outbox",
@@ -1995,7 +1996,24 @@ export class FederationImpl<TContextData> implements Federation<TContextData> {
            ),
        },
      };
      this.outboxQueue.enqueue(message);
      promises.push(this.outboxQueue.enqueue(message));
    }
    const results = await Promise.allSettled(promises);
    const errors = results
      .filter((r) => r.status === "rejected")
      .map((r) => r.reason);
    if (errors.length > 0) {
      logger.error(
        "Failed to enqueue activity {activityId} to send later: {errors}",
        { activityId: activity.id!.href, errors },
      );
      if (errors.length > 1) {
        throw new AggregateError(
          errors,
          `Failed to enqueue activity ${activityId} to send later.`,
        );
      }
      throw errors[0];
    }
  }

@@ -3216,6 +3234,7 @@ export class InboxContextImpl<TContextData> extends ContextImpl<TContextData>
      const privateKeyJwk = await exportJwk(privateKey);
      keyJwkPairs.push({ keyId: keyId.href, privateKey: privateKeyJwk });
    }
    const promises: Promise<void>[] = [];
    for (const inbox in inboxes) {
      const message: OutboxMessage = {
        type: "outbox",
@@ -3230,7 +3249,24 @@ export class InboxContextImpl<TContextData> extends ContextImpl<TContextData>
        attempt: 0,
        headers: {},
      };
      this.federation.outboxQueue.enqueue(message);
      promises.push(this.federation.outboxQueue.enqueue(message));
    }
    const results = await Promise.allSettled(promises);
    const errors: unknown[] = results
      .filter((r) => r.status === "rejected")
      .map((r) => r.reason);
    if (errors.length > 0) {
      logger.error(
        "Failed to enqueue activity {activityId} to forward later:\n{errors}",
        { activityId: this.activityId, errors },
      );
      if (errors.length > 1) {
        throw new AggregateError(
          errors,
          `Failed to enqueue activity ${this.activityId} to forward later.`,
        );
      }
      throw errors[0];
    }
  }
}