Unverified Commit 4bcfdf27 authored by Hong Minhee's avatar Hong Minhee
Browse files

Extract #setWithCasRetry helper to reduce duplication

Both #storeWithCas and #updateTraceSummary share the same pattern of
CAS retry with fallback to set. Extract this into a reusable helper.

https://github.com/fedify-dev/fedify/pull/502#discussion_r2642988658
parent ccf4dba1
Loading
Loading
Loading
Loading
+53 −54
Original line number Diff line number Diff line
@@ -442,6 +442,27 @@ export class FedifySpanExporter implements SpanExporter {
    }
  }

  async #setWithCasRetry<T>(
    key: KvKey,
    transform: (existing: T | undefined) => T,
    options?: KvStoreSetOptions,
  ): Promise<void> {
    if (this.#kv.cas != null) {
      for (let attempt = 0; attempt < 3; attempt++) {
        const existing = await this.#kv.get<T>(key);
        const newValue = transform(existing);
        if (await this.#kv.cas(key, existing, newValue, options)) {
          return;
        }
      }
    }

    // Fallback to non-atomic set if CAS is not available or fails
    const existing = await this.#kv.get<T>(key);
    const newValue = transform(existing);
    await this.#kv.set(key, newValue, options);
  }

  async #updateTraceSummary(
    record: TraceActivityRecord,
    options?: KvStoreSetOptions,
@@ -452,9 +473,9 @@ export class FedifySpanExporter implements SpanExporter {
      record.traceId,
    ] as KvKey;

    const createOrUpdateSummary = (
      existing: TraceSummary | undefined,
    ): TraceSummary => {
    await this.#setWithCasRetry<TraceSummary>(
      summaryKey,
      (existing) => {
        const summary: TraceSummary = existing != null
          ? {
            traceId: existing.traceId,
@@ -473,22 +494,9 @@ export class FedifySpanExporter implements SpanExporter {
          summary.activityTypes.push(record.activityType);
        }
        return summary;
    };

    if (this.#kv.cas != null) {
      for (let attempt = 0; attempt < 3; attempt++) {
        const existing = await this.#kv.get<TraceSummary>(summaryKey);
        const summary = createOrUpdateSummary(existing);
        if (await this.#kv.cas(summaryKey, existing, summary, options)) {
          return;
        }
      }
    }

    // Fallback to non-atomic set if CAS is not available or fails
    const existing = await this.#kv.get<TraceSummary>(summaryKey);
    const summary = createOrUpdateSummary(existing);
    await this.#kv.set(summaryKey, summary, options);
      },
      options,
    );
  }

  async #storeWithCas(
@@ -497,24 +505,15 @@ export class FedifySpanExporter implements SpanExporter {
  ): Promise<void> {
    const key: KvKey = [...this.#keyPrefix, record.traceId] as KvKey;

    // Retry CAS operation up to 3 times
    for (let attempt = 0; attempt < 3; attempt++) {
      const existing = await this.#kv.get<TraceActivityRecord[]>(key);
    await this.#setWithCasRetry<TraceActivityRecord[]>(
      key,
      (existing) => {
        const records = existing ?? [];
        records.push(record);

      const success = await this.#kv.cas!(key, existing, records, options);
      if (success) {
        await this.#updateTraceSummary(record, options);
        return;
      }
    }

    // If CAS fails after retries, fall back to simple set
    const existing = await this.#kv.get<TraceActivityRecord[]>(key);
    const records = existing ?? [];
    records.push(record);
    await this.#kv.set(key, records, options);
        return records;
      },
      options,
    );
    await this.#updateTraceSummary(record, options);
  }