Loading src/mq.ts +12 −8 Original line number Diff line number Diff line Loading @@ -98,7 +98,7 @@ export class PostgresMessageQueue implements MessageQueue { await this.initialize(); const { signal } = options; const poll = async () => { if (signal?.aborted) return; while (!signal?.aborted) { const query = this.#sql` DELETE FROM ${this.#sql(this.#tableName)} WHERE id = ( Loading @@ -112,11 +112,15 @@ export class PostgresMessageQueue implements MessageQueue { `.execute(); const cancel = query.cancel.bind(query); signal?.addEventListener("abort", cancel); let i = 0; for (const message of await query) { if (signal?.aborted) return; await handler(JSON.parse(message.message)); i++; } signal?.removeEventListener("abort", cancel); if (i < 1) break; } }; const timeouts = new Set<ReturnType<typeof setTimeout>>(); const listen = await this.#sql.listen( Loading Loading
src/mq.ts +12 −8 Original line number Diff line number Diff line Loading @@ -98,7 +98,7 @@ export class PostgresMessageQueue implements MessageQueue { await this.initialize(); const { signal } = options; const poll = async () => { if (signal?.aborted) return; while (!signal?.aborted) { const query = this.#sql` DELETE FROM ${this.#sql(this.#tableName)} WHERE id = ( Loading @@ -112,11 +112,15 @@ export class PostgresMessageQueue implements MessageQueue { `.execute(); const cancel = query.cancel.bind(query); signal?.addEventListener("abort", cancel); let i = 0; for (const message of await query) { if (signal?.aborted) return; await handler(JSON.parse(message.message)); i++; } signal?.removeEventListener("abort", cancel); if (i < 1) break; } }; const timeouts = new Set<ReturnType<typeof setTimeout>>(); const listen = await this.#sql.listen( Loading