From 86a006387002d9d5e4efabecb401a58086ca8040 Mon Sep 17 00:00:00 2001 From: Grant Date: Sat, 8 Feb 2025 17:16:58 -0700 Subject: [PATCH 01/12] [wip] implement bullmq & initial new locking --- package-lock.json | 266 ++++++++++++++++++++++++ packages/server/package.json | 1 + packages/server/src/jobs/bullmq.ts | 8 + packages/server/src/lib/SocketServer.ts | 13 +- packages/server/src/lib/redis.ts | 24 +++ 5 files changed, 302 insertions(+), 10 deletions(-) create mode 100644 packages/server/src/jobs/bullmq.ts diff --git a/package-lock.json b/package-lock.json index e98548f..5d0741f 100644 --- a/package-lock.json +++ b/package-lock.json @@ -2965,6 +2965,11 @@ "@swc/helpers": "^0.5.0" } }, + "node_modules/@ioredis/commands": { + "version": "1.2.0", + "resolved": "https://registry.npmjs.org/@ioredis/commands/-/commands-1.2.0.tgz", + "integrity": "sha512-Sx1pU8EM64o2BrqNpEO1CNLtKQwyhuXuqyfH7oGKCk+1a33d2r5saW8zNwm3j6BTExtjrv2BxTgzzkMwts6vGg==" + }, "node_modules/@isaacs/cliui": { "version": "8.0.2", "resolved": "https://registry.npmjs.org/@isaacs/cliui/-/cliui-8.0.2.tgz", @@ -3503,6 +3508,78 @@ "@jridgewell/sourcemap-codec": "^1.4.10" } }, + "node_modules/@msgpackr-extract/msgpackr-extract-darwin-arm64": { + "version": "3.0.3", + "resolved": "https://registry.npmjs.org/@msgpackr-extract/msgpackr-extract-darwin-arm64/-/msgpackr-extract-darwin-arm64-3.0.3.tgz", + "integrity": "sha512-QZHtlVgbAdy2zAqNA9Gu1UpIuI8Xvsd1v8ic6B2pZmeFnFcMWiPLfWXh7TVw4eGEZ/C9TH281KwhVoeQUKbyjw==", + "cpu": [ + "arm64" + ], + "optional": true, + "os": [ + "darwin" + ] + }, + "node_modules/@msgpackr-extract/msgpackr-extract-darwin-x64": { + "version": "3.0.3", + "resolved": "https://registry.npmjs.org/@msgpackr-extract/msgpackr-extract-darwin-x64/-/msgpackr-extract-darwin-x64-3.0.3.tgz", + "integrity": "sha512-mdzd3AVzYKuUmiWOQ8GNhl64/IoFGol569zNRdkLReh6LRLHOXxU4U8eq0JwaD8iFHdVGqSy4IjFL4reoWCDFw==", + "cpu": [ + "x64" + ], + "optional": true, + "os": [ + "darwin" + ] + }, + "node_modules/@msgpackr-extract/msgpackr-extract-linux-arm": { + "version": "3.0.3", + "resolved": "https://registry.npmjs.org/@msgpackr-extract/msgpackr-extract-linux-arm/-/msgpackr-extract-linux-arm-3.0.3.tgz", + "integrity": "sha512-fg0uy/dG/nZEXfYilKoRe7yALaNmHoYeIoJuJ7KJ+YyU2bvY8vPv27f7UKhGRpY6euFYqEVhxCFZgAUNQBM3nw==", + "cpu": [ + "arm" + ], + "optional": true, + "os": [ + "linux" + ] + }, + "node_modules/@msgpackr-extract/msgpackr-extract-linux-arm64": { + "version": "3.0.3", + "resolved": "https://registry.npmjs.org/@msgpackr-extract/msgpackr-extract-linux-arm64/-/msgpackr-extract-linux-arm64-3.0.3.tgz", + "integrity": "sha512-YxQL+ax0XqBJDZiKimS2XQaf+2wDGVa1enVRGzEvLLVFeqa5kx2bWbtcSXgsxjQB7nRqqIGFIcLteF/sHeVtQg==", + "cpu": [ + "arm64" + ], + "optional": true, + "os": [ + "linux" + ] + }, + "node_modules/@msgpackr-extract/msgpackr-extract-linux-x64": { + "version": "3.0.3", + "resolved": "https://registry.npmjs.org/@msgpackr-extract/msgpackr-extract-linux-x64/-/msgpackr-extract-linux-x64-3.0.3.tgz", + "integrity": "sha512-cvwNfbP07pKUfq1uH+S6KJ7dT9K8WOE4ZiAcsrSes+UY55E/0jLYc+vq+DO7jlmqRb5zAggExKm0H7O/CBaesg==", + "cpu": [ + "x64" + ], + "optional": true, + "os": [ + "linux" + ] + }, + "node_modules/@msgpackr-extract/msgpackr-extract-win32-x64": { + "version": "3.0.3", + "resolved": "https://registry.npmjs.org/@msgpackr-extract/msgpackr-extract-win32-x64/-/msgpackr-extract-win32-x64-3.0.3.tgz", + "integrity": "sha512-x0fWaQtYp4E6sktbsdAqnehxDgEc/VwM7uLsRCYWaiGu0ykYdZPiS8zCWdnjHwyiumousxfBm4SO31eXqwEZhQ==", + "cpu": [ + "x64" + ], + "optional": true, + "os": [ + "win32" + ] + }, "node_modules/@nextui-org/accordion": { "version": "2.2.7", "resolved": "https://registry.npmjs.org/@nextui-org/accordion/-/accordion-2.2.7.tgz", @@ -10165,6 +10242,20 @@ "integrity": "sha512-E+XQCRwSbaaiChtv6k6Dwgc+bx+Bs6vuKJHHl5kox/BaKbhiXzqQOwK4cO22yElGp2OCmjwVhT3HmxgyPGnJfQ==", "dev": true }, + "node_modules/bullmq": { + "version": "5.40.2", + "resolved": "https://registry.npmjs.org/bullmq/-/bullmq-5.40.2.tgz", + "integrity": "sha512-Cn4NUpwGAF4WnuXR2kTZCTAUEUHajSCn/IqiDG9ry1kVvAwwwg1Ati3J5HN2uZjqD5PBfNDXYnsc2+0PzakDwg==", + "dependencies": { + "cron-parser": "^4.9.0", + "ioredis": "^5.4.1", + "msgpackr": "^1.11.2", + "node-abort-controller": "^3.1.1", + "semver": "^7.5.4", + "tslib": "^2.0.0", + "uuid": "^9.0.0" + } + }, "node_modules/bytes": { "version": "3.1.2", "resolved": "https://registry.npmjs.org/bytes/-/bytes-3.1.2.tgz", @@ -10700,6 +10791,17 @@ "integrity": "sha512-dcKFX3jn0MpIaXjisoRvexIJVEKzaq7z2rZKxf+MSr9TkdmHmsU4m2lcLojrj/FHl8mk5VxMmYA+ftRkP/3oKQ==", "devOptional": true }, + "node_modules/cron-parser": { + "version": "4.9.0", + "resolved": "https://registry.npmjs.org/cron-parser/-/cron-parser-4.9.0.tgz", + "integrity": "sha512-p0SaNjrHOnQeR8/VnfGbmg9te2kfyYSQ7Sc/j/6DtPL3JQvKxmjO9TSjNFpujqV3vEYYBvNNvXSxzyksBWAx1Q==", + "dependencies": { + "luxon": "^3.2.1" + }, + "engines": { + "node": ">=12.0.0" + } + }, "node_modules/cross-spawn": { "version": "7.0.6", "resolved": "https://registry.npmjs.org/cross-spawn/-/cross-spawn-7.0.6.tgz", @@ -10873,6 +10975,14 @@ "node": ">=0.4.0" } }, + "node_modules/denque": { + "version": "2.1.0", + "resolved": "https://registry.npmjs.org/denque/-/denque-2.1.0.tgz", + "integrity": "sha512-HVQE3AAb/pxF8fQAoiqpvg9i3evqug3hoiwakOyZAwJm+6vZehbkYXZ0l4JxS+I3QxM97v5aaRNhj8v5oBhekw==", + "engines": { + "node": ">=0.10" + } + }, "node_modules/depd": { "version": "2.0.0", "resolved": "https://registry.npmjs.org/depd/-/depd-2.0.0.tgz", @@ -13425,6 +13535,50 @@ "tslib": "2" } }, + "node_modules/ioredis": { + "version": "5.5.0", + "resolved": "https://registry.npmjs.org/ioredis/-/ioredis-5.5.0.tgz", + "integrity": "sha512-7CutT89g23FfSa8MDoIFs2GYYa0PaNiW/OrT+nRyjRXHDZd17HmIgy+reOQ/yhh72NznNjGuS8kbCAcA4Ro4mw==", + "dependencies": { + "@ioredis/commands": "^1.1.1", + "cluster-key-slot": "^1.1.0", + "debug": "^4.3.4", + "denque": "^2.1.0", + "lodash.defaults": "^4.2.0", + "lodash.isarguments": "^3.1.0", + "redis-errors": "^1.2.0", + "redis-parser": "^3.0.0", + "standard-as-callback": "^2.1.0" + }, + "engines": { + "node": ">=12.22.0" + }, + "funding": { + "type": "opencollective", + "url": "https://opencollective.com/ioredis" + } + }, + "node_modules/ioredis/node_modules/debug": { + "version": "4.4.0", + "resolved": "https://registry.npmjs.org/debug/-/debug-4.4.0.tgz", + "integrity": "sha512-6WTZ/IxCY/T6BALoZHaE4ctp9xm+Z5kY/pzYaCHRFeyVhojxlrm+46y68HA6hr0TcwEssoxNiDEUJQjfPZ/RYA==", + "dependencies": { + "ms": "^2.1.3" + }, + "engines": { + "node": ">=6.0" + }, + "peerDependenciesMeta": { + "supports-color": { + "optional": true + } + } + }, + "node_modules/ioredis/node_modules/ms": { + "version": "2.1.3", + "resolved": "https://registry.npmjs.org/ms/-/ms-2.1.3.tgz", + "integrity": "sha512-6FlzubTLZG3J2a/NVCAleEhjzq5oxgHyaCU9yYXvcLsvoVaHJq/s5xXI6/XXP6tz7R9xAOtHnSO/tXtF3WRTlA==" + }, "node_modules/ipaddr.js": { "version": "1.9.1", "resolved": "https://registry.npmjs.org/ipaddr.js/-/ipaddr.js-1.9.1.tgz", @@ -14842,6 +14996,16 @@ "integrity": "sha512-FT1yDzDYEoYWhnSGnpE/4Kj1fLZkDFyqRb7fNt6FdYOSxlUWAtp42Eh6Wb0rGIv/m9Bgo7x4GhQbm5Ys4SG5ow==", "dev": true }, + "node_modules/lodash.defaults": { + "version": "4.2.0", + "resolved": "https://registry.npmjs.org/lodash.defaults/-/lodash.defaults-4.2.0.tgz", + "integrity": "sha512-qjxPLHd3r5DnsdGacqOMU6pb/avJzdh9tFX2ymgoZE27BmjXrNy/y4LoaiTeAb+O3gL8AfpJGtqfX/ae2leYYQ==" + }, + "node_modules/lodash.isarguments": { + "version": "3.1.0", + "resolved": "https://registry.npmjs.org/lodash.isarguments/-/lodash.isarguments-3.1.0.tgz", + "integrity": "sha512-chi4NHZlZqZD18a0imDHnZPrDeBbTtVN7GXMwuGdRH9qotxAjYs3aVLKc7zNOG9eddR5Ksd8rvFEBc9SsggPpg==" + }, "node_modules/lodash.memoize": { "version": "4.1.2", "resolved": "https://registry.npmjs.org/lodash.memoize/-/lodash.memoize-4.1.2.tgz", @@ -14892,6 +15056,14 @@ "loose-envify": "cli.js" } }, + "node_modules/luxon": { + "version": "3.5.0", + "resolved": "https://registry.npmjs.org/luxon/-/luxon-3.5.0.tgz", + "integrity": "sha512-rh+Zjr6DNfUYR3bPwJEnuwDdqMbxZW7LOQfUN4B54+Cl+0o5zaU9RJ6bcidfDtC1cWCZXQ+nvX8bf6bAji37QQ==", + "engines": { + "node": ">=12" + } + }, "node_modules/magic-string": { "version": "0.30.8", "resolved": "https://registry.npmjs.org/magic-string/-/magic-string-0.30.8.tgz", @@ -15096,6 +15268,35 @@ "resolved": "https://registry.npmjs.org/ms/-/ms-2.0.0.tgz", "integrity": "sha512-Tpp60P6IUJDTuOq/5Z8cdskzJujfwqfOTkrwIwj7IRISpnkJnT6SyJ4PCPnGMoFjC9ddhal5KVIYtAt97ix05A==" }, + "node_modules/msgpackr": { + "version": "1.11.2", + "resolved": "https://registry.npmjs.org/msgpackr/-/msgpackr-1.11.2.tgz", + "integrity": "sha512-F9UngXRlPyWCDEASDpTf6c9uNhGPTqnTeLVt7bN+bU1eajoR/8V9ys2BRaV5C/e5ihE6sJ9uPIKaYt6bFuO32g==", + "optionalDependencies": { + "msgpackr-extract": "^3.0.2" + } + }, + "node_modules/msgpackr-extract": { + "version": "3.0.3", + "resolved": "https://registry.npmjs.org/msgpackr-extract/-/msgpackr-extract-3.0.3.tgz", + "integrity": "sha512-P0efT1C9jIdVRefqjzOQ9Xml57zpOXnIuS+csaB4MdZbTdmGDLo8XhzBG1N7aO11gKDDkJvBLULeFTo46wwreA==", + "hasInstallScript": true, + "optional": true, + "dependencies": { + "node-gyp-build-optional-packages": "5.2.2" + }, + "bin": { + "download-msgpackr-prebuilds": "bin/download-prebuilds.js" + }, + "optionalDependencies": { + "@msgpackr-extract/msgpackr-extract-darwin-arm64": "3.0.3", + "@msgpackr-extract/msgpackr-extract-darwin-x64": "3.0.3", + "@msgpackr-extract/msgpackr-extract-linux-arm": "3.0.3", + "@msgpackr-extract/msgpackr-extract-linux-arm64": "3.0.3", + "@msgpackr-extract/msgpackr-extract-linux-x64": "3.0.3", + "@msgpackr-extract/msgpackr-extract-win32-x64": "3.0.3" + } + }, "node_modules/mz": { "version": "2.7.0", "resolved": "https://registry.npmjs.org/mz/-/mz-2.7.0.tgz", @@ -15152,6 +15353,11 @@ "react-dom": "^16.8 || ^17 || ^18 || ^19 || ^19.0.0-rc" } }, + "node_modules/node-abort-controller": { + "version": "3.1.1", + "resolved": "https://registry.npmjs.org/node-abort-controller/-/node-abort-controller-3.1.1.tgz", + "integrity": "sha512-AGK2yQKIjRuqnc6VkX2Xj5d+QW8xZ87pa1UK6yA6ouUyuxfHuMP6umE5QK7UmTeOAymo+Zx1Fxiuw9rVx8taHQ==" + }, "node_modules/node-addon-api": { "version": "7.1.1", "resolved": "https://registry.npmjs.org/node-addon-api/-/node-addon-api-7.1.1.tgz", @@ -15180,6 +15386,29 @@ } } }, + "node_modules/node-gyp-build-optional-packages": { + "version": "5.2.2", + "resolved": "https://registry.npmjs.org/node-gyp-build-optional-packages/-/node-gyp-build-optional-packages-5.2.2.tgz", + "integrity": "sha512-s+w+rBWnpTMwSFbaE0UXsRlg7hU4FjekKU4eyAih5T8nJuNZT1nNsskXpxmeqSK9UzkBl6UgRlnKc8hz8IEqOw==", + "optional": true, + "dependencies": { + "detect-libc": "^2.0.1" + }, + "bin": { + "node-gyp-build-optional-packages": "bin.js", + "node-gyp-build-optional-packages-optional": "optional.js", + "node-gyp-build-optional-packages-test": "build-test.js" + } + }, + "node_modules/node-gyp-build-optional-packages/node_modules/detect-libc": { + "version": "2.0.3", + "resolved": "https://registry.npmjs.org/detect-libc/-/detect-libc-2.0.3.tgz", + "integrity": "sha512-bwy0MGW55bG41VqxxypOsdSdGqLwXPI/focwgTYCFMbdUiBAxLg9CFzG08sz2aqzknwiX7Hkl0bQENjg8iLByw==", + "optional": true, + "engines": { + "node": ">=8" + } + }, "node_modules/node-int64": { "version": "0.4.0", "resolved": "https://registry.npmjs.org/node-int64/-/node-int64-0.4.0.tgz", @@ -16405,6 +16634,14 @@ "@redis/time-series": "1.1.0" } }, + "node_modules/redis-errors": { + "version": "1.2.0", + "resolved": "https://registry.npmjs.org/redis-errors/-/redis-errors-1.2.0.tgz", + "integrity": "sha512-1qny3OExCf0UvUV/5wpYKf2YwPcOqXzkwKKSmKHiE6ZMQs5heeE/c8eXK+PNllPvmjgAbfnsbpkGZWy8cBpn9w==", + "engines": { + "node": ">=4" + } + }, "node_modules/redis-mock": { "version": "0.56.3", "resolved": "https://registry.npmjs.org/redis-mock/-/redis-mock-0.56.3.tgz", @@ -16415,6 +16652,17 @@ "node": ">=6" } }, + "node_modules/redis-parser": { + "version": "3.0.0", + "resolved": "https://registry.npmjs.org/redis-parser/-/redis-parser-3.0.0.tgz", + "integrity": "sha512-DJnGAeenTdpMEH6uAJRK/uiyEIH9WVsUmoLwzudwGJUwZPp80PDBWPHXSAGNPwNvIXAbe7MSUB1zQFugFml66A==", + "dependencies": { + "redis-errors": "^1.0.0" + }, + "engines": { + "node": ">=4" + } + }, "node_modules/reflect.getprototypeof": { "version": "1.0.9", "resolved": "https://registry.npmjs.org/reflect.getprototypeof/-/reflect.getprototypeof-1.0.9.tgz", @@ -17332,6 +17580,11 @@ "node": ">=8" } }, + "node_modules/standard-as-callback": { + "version": "2.1.0", + "resolved": "https://registry.npmjs.org/standard-as-callback/-/standard-as-callback-2.1.0.tgz", + "integrity": "sha512-qoRRSyROncaz1z0mvYqIE4lCd9p2R90i6GxW3uZv5ucSu8tU7B5HXUP1gG8pVZsYNVaXjk8ClXHPttLyxAL48A==" + }, "node_modules/statuses": { "version": "2.0.1", "resolved": "https://registry.npmjs.org/statuses/-/statuses-2.0.1.tgz", @@ -18907,6 +19160,18 @@ "node": ">= 0.4.0" } }, + "node_modules/uuid": { + "version": "9.0.1", + "resolved": "https://registry.npmjs.org/uuid/-/uuid-9.0.1.tgz", + "integrity": "sha512-b+1eJOlsR9K8HJpow9Ok3fiWOWSIcIzXodvv0rQjVoOVNpWMpxf1wZNpt4y9h10odCNrqnYp1OBzRktckBe3sA==", + "funding": [ + "https://github.com/sponsors/broofa", + "https://github.com/sponsors/ctavan" + ], + "bin": { + "uuid": "dist/bin/uuid" + } + }, "node_modules/v8-compile-cache-lib": { "version": "3.0.1", "resolved": "https://registry.npmjs.org/v8-compile-cache-lib/-/v8-compile-cache-lib-3.0.1.tgz", @@ -19585,6 +19850,7 @@ "@sc07-canvas/lib": "^1.0.0", "@sentry/node": "^8.47.0", "body-parser": "^1.20.2", + "bullmq": "^5.40.2", "connect-redis": "^8.0.1", "cors": "^2.8.5", "express": "^4.21.2", diff --git a/packages/server/package.json b/packages/server/package.json index e081697..707c987 100644 --- a/packages/server/package.json +++ b/packages/server/package.json @@ -43,6 +43,7 @@ "@sc07-canvas/lib": "^1.0.0", "@sentry/node": "^8.47.0", "body-parser": "^1.20.2", + "bullmq": "^5.40.2", "connect-redis": "^8.0.1", "cors": "^2.8.5", "express": "^4.21.2", diff --git a/packages/server/src/jobs/bullmq.ts b/packages/server/src/jobs/bullmq.ts new file mode 100644 index 0000000..50dd159 --- /dev/null +++ b/packages/server/src/jobs/bullmq.ts @@ -0,0 +1,8 @@ +import { Queue } from "bullmq"; + +const mainQueue = new Queue("main", { + prefix: "canvas_main_queue", + connection: { + url: process.env.REDIS_HOST, + }, +}); diff --git a/packages/server/src/lib/SocketServer.ts b/packages/server/src/lib/SocketServer.ts index 754a60f..9421912 100644 --- a/packages/server/src/lib/SocketServer.ts +++ b/packages/server/src/lib/SocketServer.ts @@ -70,13 +70,6 @@ type Socket = RawSocket; export class SocketServer { static instance: SocketServer; io: Server; - /** - * Prevent users from time attacking pixel placements to place more pixels than stacked - * - * @key user sub (grant@grants.cafe) - * @value timestamp - */ - userPlaceLock = new Map(); constructor(server: http.Server) { SocketServer.instance = this; @@ -275,13 +268,13 @@ export class SocketServer { // force a user data update await user.update(true); - if (this.userPlaceLock.has(user.sub)) { + try { + await Redis.acquireLock("user_pixel", user.sub); + } catch (_e) { ack({ success: false, error: "pixel_already_pending" }); return; } - this.userPlaceLock.set(user.sub, Date.now()); - if (bypassCooldown && !user.isModerator) { // only moderators can do this ack({ success: false, error: "invalid_pixel" }); diff --git a/packages/server/src/lib/redis.ts b/packages/server/src/lib/redis.ts index 6da750f..7bac228 100644 --- a/packages/server/src/lib/redis.ts +++ b/packages/server/src/lib/redis.ts @@ -2,6 +2,7 @@ import { RedisClientType } from "@redis/client"; import { createClient } from "redis"; import { getLogger } from "./Logger"; +import { User } from "@prisma/client"; const Logger = getLogger("REDIS"); @@ -23,6 +24,9 @@ interface IRedisKeys { // pub/sub channels channel_heatmap(): string; + + // locks + lock(id: string): string; } /** @@ -36,6 +40,7 @@ const RedisKeys: IRedisKeys = { canvas_cache_write_queue: (workerId) => `CANVAS:CACHE_QUEUE:${workerId}`, socketToSub: (socketId: string) => `CANVAS:SOCKET:${socketId}`, channel_heatmap: () => `CANVAS:HEATMAP`, + lock: (id: string) => `CANVAS:LOCK:${id}`, }; class _Redis { @@ -126,6 +131,25 @@ class _Redis { ): (...params: Parameters) => string { return (...params) => this.key(key, ...params); } + + async acquireLock(type: "user_pixel", userSub: User["sub"]) { + const client = await this.getClient("MAIN"); + const resp = await client.set( + this.key("lock", type + "_" + userSub), + Date.now(), + { + EX: 60 * 1000, + NX: true, + } + ); + + if (!resp) throw new Error("lock"); + } + + async releaseLock(type: "user_pixel", userSub: User["sub"]) { + const client = await this.getClient("MAIN"); + await client.del(this.key("lock", type + "_" + userSub)); + } } export const Redis = new _Redis(RedisKeys); -- GitLab From 62071df9b8478dd648c01e5f1256276b794ef14f Mon Sep 17 00:00:00 2001 From: Grant <3380410-grahhnt@users.noreply.gitlab.com> Date: Sun, 9 Feb 2025 13:09:11 -0700 Subject: [PATCH 02/12] rewrite user place locks to avoid setInterval hack --- packages/server/src/lib/SocketServer.ts | 165 +++++++++++------------- packages/server/src/lib/redis.ts | 64 +++++++-- 2 files changed, 123 insertions(+), 106 deletions(-) diff --git a/packages/server/src/lib/SocketServer.ts b/packages/server/src/lib/SocketServer.ts index 9421912..ff754cd 100644 --- a/packages/server/src/lib/SocketServer.ts +++ b/packages/server/src/lib/SocketServer.ts @@ -17,7 +17,7 @@ import { session } from "./Express"; import { getLogger } from "./Logger"; import { prisma } from "./prisma"; import { Recaptcha } from "./Recaptcha"; -import { Redis } from "./redis"; +import { LockExists, Redis } from "./redis"; const Logger = getLogger("SOCKET"); @@ -82,29 +82,6 @@ export class SocketServer { this.io.on("connection", this.handleConnection.bind(this)); if (process.env.NODE_ENV !== "test") { - // clear pixel locks if they have existed for more than a minute - setInterval(() => { - const oneMinuteAgo = new Date(); - oneMinuteAgo.setMinutes(oneMinuteAgo.getMinutes() - 1); - - const expired = [...this.userPlaceLock.entries()].filter( - ([_user, time]) => time < oneMinuteAgo.getTime() - ); - - if (expired.length > 0) { - Logger.warn( - "A pixel lock has existed for too long for " + - expired.length + - " users : " + - expired.map((a) => a[0]).join(",") - ); - } - - for (const expire of expired) { - this.userPlaceLock.delete(expire[0]); - } - }, 1000 * 30); - // pixel stacking // - needs to be exponential (takes longer to aquire more pixels stacked) // - convert to config options instead of hard-coded @@ -269,83 +246,85 @@ export class SocketServer { await user.update(true); try { - await Redis.acquireLock("user_pixel", user.sub); - } catch (_e) { - ack({ success: false, error: "pixel_already_pending" }); - return; - } - - if (bypassCooldown && !user.isModerator) { - // only moderators can do this - ack({ success: false, error: "invalid_pixel" }); - this.userPlaceLock.delete(user.sub); - return; - } + await Redis.useLock("user_pixel", [user.sub], async () => { + if (bypassCooldown && !user.isModerator) { + // only moderators can do this + ack({ success: false, error: "invalid_pixel" }); + return; + } - if (!bypassCooldown && user.pixelStack < 1) { - ack({ success: false, error: "pixel_cooldown" }); - this.userPlaceLock.delete(user.sub); - return; - } + if (!bypassCooldown && user.pixelStack < 1) { + ack({ success: false, error: "pixel_cooldown" }); + return; + } - if ((user.getBan()?.expires || 0) > new Date()) { - ack({ success: false, error: "banned" }); - this.userPlaceLock.delete(user.sub); - return; - } + if ((user.getBan()?.expires || 0) > new Date()) { + ack({ success: false, error: "banned" }); + return; + } - const paletteColor = await prisma.paletteColor.findFirst({ - where: { - id: pixel.color, - }, - }); - if (!paletteColor) { - ack({ - success: false, - error: "palette_color_invalid", - }); - this.userPlaceLock.delete(user.sub); - return; - } + const paletteColor = await prisma.paletteColor.findFirst({ + where: { + id: pixel.color, + }, + }); + if (!paletteColor) { + ack({ + success: false, + error: "palette_color_invalid", + }); + return; + } - const pixelAtTheSameLocation = await Canvas.getPixel(pixel.x, pixel.y); + const pixelAtTheSameLocation = await Canvas.getPixel( + pixel.x, + pixel.y + ); - if ( - pixelAtTheSameLocation && - pixelAtTheSameLocation.userId === user.sub && - pixelAtTheSameLocation.color === paletteColor.hex - ) { - ack({ success: false, error: "you_already_placed_that" }); - this.userPlaceLock.delete(user.sub); - return; - } + if ( + pixelAtTheSameLocation && + pixelAtTheSameLocation.userId === user.sub && + pixelAtTheSameLocation.color === paletteColor.hex + ) { + ack({ success: false, error: "you_already_placed_that" }); + return; + } - Recaptcha.maybeChallenge(socket); + Recaptcha.maybeChallenge(socket); - await user.modifyStack(-1); - await Canvas.setPixel( - user, - pixel.x, - pixel.y, - paletteColor.hex, - bypassCooldown - ); - // give undo capabilities - await user.setUndo( - new Date(Date.now() + Canvas.getCanvasConfig().undo.grace_period) - ); + await user.modifyStack(-1); + await Canvas.setPixel( + user, + pixel.x, + pixel.y, + paletteColor.hex, + bypassCooldown + ); + // give undo capabilities + await user.setUndo( + new Date(Date.now() + Canvas.getCanvasConfig().undo.grace_period) + ); - const newPixel: Pixel = { - x: pixel.x, - y: pixel.y, - color: pixel.color, - }; - ack({ - success: true, - data: newPixel, - }); - socket.broadcast.emit("pixel", newPixel); - this.userPlaceLock.delete(user.sub); + const newPixel: Pixel = { + x: pixel.x, + y: pixel.y, + color: pixel.color, + }; + ack({ + success: true, + data: newPixel, + }); + socket.broadcast.emit("pixel", newPixel); + }); + } catch (e) { + if (e instanceof LockExists) { + ack({ success: false, error: "pixel_already_pending" }); + } else { + ack({ success: false, error: "invalid_pixel" }); + throw e; + } + return; + } }); socket.on("undo", async (ack) => { diff --git a/packages/server/src/lib/redis.ts b/packages/server/src/lib/redis.ts index 7bac228..91b81cc 100644 --- a/packages/server/src/lib/redis.ts +++ b/packages/server/src/lib/redis.ts @@ -1,8 +1,8 @@ +import { User } from "@prisma/client"; import { RedisClientType } from "@redis/client"; import { createClient } from "redis"; import { getLogger } from "./Logger"; -import { User } from "@prisma/client"; const Logger = getLogger("REDIS"); @@ -132,23 +132,61 @@ class _Redis { return (...params) => this.key(key, ...params); } - async acquireLock(type: "user_pixel", userSub: User["sub"]) { + async acquireLock( + type: Type, + ...args: Parameters + ) { + const KEY = this.key("lock", type + "_" + args.join("_")); const client = await this.getClient("MAIN"); - const resp = await client.set( - this.key("lock", type + "_" + userSub), - Date.now(), - { - EX: 60 * 1000, - NX: true, - } - ); + const resp = await client.set(KEY, Date.now(), { + EX: 60 * 1000, + NX: true, + }); - if (!resp) throw new Error("lock"); + if (!resp) { + Logger.debug("Lock acquire failed for " + KEY); + throw new LockExists(); + } + + Logger.debug("Lock acquired for " + KEY); } - async releaseLock(type: "user_pixel", userSub: User["sub"]) { + async releaseLock( + type: Type, + ...args: Parameters + ) { + const KEY = this.key("lock", type + "_" + args.join("_")); const client = await this.getClient("MAIN"); - await client.del(this.key("lock", type + "_" + userSub)); + const time = await client.getDel(KEY); + + if (!time) { + Logger.debug("Lock failed release for " + KEY + " (not existing)"); + return; + } + + Logger.debug( + "Lock released for " + KEY + " (" + (Date.now() - parseInt(time)) + "ms)" + ); + } + + async useLock( + type: Type, + args: Parameters, + use: (() => any) | (() => Promise) + ) { + await this.acquireLock(type, ...args); + await use(); + await this.releaseLock(type, ...args); + } +} + +interface ILocks { + user_pixel(userSub: User["sub"]): void; +} + +export class LockExists extends Error { + constructor() { + super("Lock already exists"); } } -- GitLab From 56af3ce6df482509850458f3c9d9668d018d98e0 Mon Sep 17 00:00:00 2001 From: Grant <3380410-grahhnt@users.noreply.gitlab.com> Date: Sun, 9 Feb 2025 13:15:50 -0700 Subject: [PATCH 03/12] eslint rule to disable setInterval & setTimeout in server --- packages/server/.eslintrc.json | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/packages/server/.eslintrc.json b/packages/server/.eslintrc.json index cf3c95e..80e0728 100644 --- a/packages/server/.eslintrc.json +++ b/packages/server/.eslintrc.json @@ -36,6 +36,17 @@ "varsIgnorePattern": "^_", "ignoreRestSiblings": true } + ], + "no-restricted-globals": [ + "error", + { + "name": "setInterval", + "message": "Avoid using timers. Consider using job queue." + }, + { + "name": "setTimeout", + "message": "Avoid using timers. Consider using job queue." + } ] } } -- GitLab From 41a2eddaaca5009d64fcd2d92d3d8b733d456e91 Mon Sep 17 00:00:00 2001 From: Grant <3380410-grahhnt@users.noreply.gitlab.com> Date: Sat, 15 Feb 2025 19:21:21 -0700 Subject: [PATCH 04/12] [wip] pixel stacking job queue --- packages/server/src/index.ts | 1 + packages/server/src/jobs/bullmq.ts | 2 + packages/server/src/jobs/pixelStacking.ts | 60 +++++++++++++++++++++++ 3 files changed, 63 insertions(+) create mode 100644 packages/server/src/jobs/pixelStacking.ts diff --git a/packages/server/src/index.ts b/packages/server/src/index.ts index 579cc59..9153042 100644 --- a/packages/server/src/index.ts +++ b/packages/server/src/index.ts @@ -2,6 +2,7 @@ import "./lib/sentry"; // load declare module import "./types"; import "./workers/worker"; +import "./jobs/bullmq"; import { ExpressServer } from "./lib/Express"; import { getLogger } from "./lib/Logger"; diff --git a/packages/server/src/jobs/bullmq.ts b/packages/server/src/jobs/bullmq.ts index 50dd159..df8691f 100644 --- a/packages/server/src/jobs/bullmq.ts +++ b/packages/server/src/jobs/bullmq.ts @@ -6,3 +6,5 @@ const mainQueue = new Queue("main", { url: process.env.REDIS_HOST, }, }); + +import "./pixelStacking"; diff --git a/packages/server/src/jobs/pixelStacking.ts b/packages/server/src/jobs/pixelStacking.ts new file mode 100644 index 0000000..ed5374f --- /dev/null +++ b/packages/server/src/jobs/pixelStacking.ts @@ -0,0 +1,60 @@ +import { Queue, Worker } from "bullmq"; + +import { getLogger } from "../lib/Logger"; + +const Logger = getLogger(); + +const queue = new Queue("pixel-stacking", { + prefix: "canvas-stacking", + connection: { + url: process.env.REDIS_HOST, + }, +}); + +void queue + .upsertJobScheduler( + "pixel-stacking", + { + every: 1000, + }, + { + name: "pixel-stacking", + data: { + test: true, + }, + } + ) + .then((job) => { + Logger.info("pixel stacking upserted job scheduler"); + console.log(job.asJSON()); + }); + +const worker = new Worker( + queue.name, + async (job) => { + console.log("worker pixel-stacking running", job); + }, + { + connection: { + url: process.env.REDIS_HOST, + }, + } +); + +worker.on("ready", () => { + console.log("worker active"); + + queue.getJobs().then(console.log); +}); + +worker.on("error", () => { + console.log("worker error"); +}); + +worker.on("failed", () => { + console.log("job failed"); +}); + +worker.on("drained", () => { + console.log("worker drained"); +}); -- GitLab From 594e9699b9756e405f04e3564a23e7adc8731e72 Mon Sep 17 00:00:00 2001 From: Grant Date: Sat, 15 Feb 2025 21:31:36 -0700 Subject: [PATCH 05/12] implement pixel stacking job via bullmq --- packages/server/src/jobs/bullmq.ts | 13 +- packages/server/src/jobs/pixelStacking.ts | 144 +++++++++++++++------- packages/server/src/lib/SocketServer.ts | 52 -------- 3 files changed, 109 insertions(+), 100 deletions(-) diff --git a/packages/server/src/jobs/bullmq.ts b/packages/server/src/jobs/bullmq.ts index df8691f..0c2cd1e 100644 --- a/packages/server/src/jobs/bullmq.ts +++ b/packages/server/src/jobs/bullmq.ts @@ -1,5 +1,8 @@ import { Queue } from "bullmq"; +import { getLogger } from "../lib/Logger"; +import { Job_PixelStacking } from "./pixelStacking"; + const mainQueue = new Queue("main", { prefix: "canvas_main_queue", connection: { @@ -7,4 +10,12 @@ const mainQueue = new Queue("main", { }, }); -import "./pixelStacking"; +const Logger = getLogger(); + +const worker = Job_PixelStacking.startWorker(); +worker.on("ready", () => { + Logger.info("pixelstacking worker is ready"); +}); +worker.on("error", (err) => { + console.error("pixelstacking worker errored", err); +}); diff --git a/packages/server/src/jobs/pixelStacking.ts b/packages/server/src/jobs/pixelStacking.ts index ed5374f..7a4a36a 100644 --- a/packages/server/src/jobs/pixelStacking.ts +++ b/packages/server/src/jobs/pixelStacking.ts @@ -1,60 +1,110 @@ -import { Queue, Worker } from "bullmq"; +import { CanvasLib } from "@sc07-canvas/lib"; +import { Job, Queue, Worker } from "bullmq"; import { getLogger } from "../lib/Logger"; +import { Redis } from "../lib/redis"; +import { getClientConfig, SocketServer } from "../lib/SocketServer"; +import { User } from "../models/User"; const Logger = getLogger(); -const queue = new Queue("pixel-stacking", { - prefix: "canvas-stacking", - connection: { - url: process.env.REDIS_HOST, - }, -}); - -void queue - .upsertJobScheduler( - "pixel-stacking", - { - every: 1000, - }, - { - name: "pixel-stacking", - data: { - test: true, +export const Job_PixelStacking = new (class Job_PixelStacking { + #queue: Queue; + #worker?: Worker; + + constructor() { + this.#queue = new Queue("pixel-stacking", { + connection: { + url: process.env.REDIS_HOST, + }, + }); + + this.setupScheduler(); + } + + private setupScheduler() { + void this.#queue + .upsertJobScheduler( + "pixel-stacking", + { + every: 1000, + }, + { + name: "pixel-stacking", + data: { + test: true, + }, + } + ) + .then(() => { + Logger.info("pixel stacking upserted job scheduler"); + }); + } + + startWorker() { + this.#worker = new Worker(this.#queue.name, this.processJob, { + connection: { + url: process.env.REDIS_HOST, }, + }); + + return this.#worker!; + } + + async processJob(job: Job) { + switch (job.name) { + case "pixel-stacking": + await runPixelStacking(); + break; } - ) - .then((job) => { - Logger.info("pixel stacking upserted job scheduler"); - console.log(job.asJSON()); - }); - -const worker = new Worker( - queue.name, - async (job) => { - console.log("worker pixel-stacking running", job); - }, - { - connection: { - url: process.env.REDIS_HOST, - }, } -); -worker.on("ready", () => { - console.log("worker active"); + get queue() { + return this.#queue; + } +})(); + +const runPixelStacking = async () => { + const DEBUG = false; - queue.getJobs().then(console.log); -}); + if (DEBUG) Logger.debug("Running pixel stacking..."); + const redis = await Redis.getClient(); + const sockets = await SocketServer.instance.io.local.fetchSockets(); -worker.on("error", () => { - console.log("worker error"); -}); + for (const socket of sockets) { + const sub = await redis.get(Redis.key("socketToSub", socket.id)); + if (!sub) { + if (DEBUG) Logger.warn(`Socket ${socket.id} has no user`); + continue; + } + + const user = await User.fromSub(sub); + if (!user) { + if (DEBUG) + Logger.warn( + `Socket ${socket.id}'s user (${sub}) does not exist in the database` + ); + continue; + } -worker.on("failed", () => { - console.log("job failed"); -}); + // time in seconds since last stack gain (including a potential bonus depending on previously remaining time) + const timeSinceLastPlace = + (Date.now() - user.lastTimeGainStarted.getTime()) / 1000; + const cooldown = CanvasLib.getPixelCooldown( + user.pixelStack + 1, + getClientConfig() + ); -worker.on("drained", () => { - console.log("worker drained"); -}); + await user.update(true); + + // this impl has the side affect of giving previously offline users all the stack upon reconnecting + if ( + timeSinceLastPlace >= cooldown && + user.pixelStack < getClientConfig().canvas.pixel.maxStack + ) { + await user.modifyStack(1); + + if (DEBUG) Logger.debug(sub + " has gained another pixel in their stack"); + } + } +}; diff --git a/packages/server/src/lib/SocketServer.ts b/packages/server/src/lib/SocketServer.ts index ff754cd..603fecf 100644 --- a/packages/server/src/lib/SocketServer.ts +++ b/packages/server/src/lib/SocketServer.ts @@ -1,6 +1,5 @@ import http from "node:http"; -import { CanvasLib } from "@sc07-canvas/lib"; import { ClientConfig, ClientToServerEvents, @@ -80,57 +79,6 @@ export class SocketServer { this.io.engine.use(session); this.io.on("connection", this.handleConnection.bind(this)); - - if (process.env.NODE_ENV !== "test") { - // pixel stacking - // - needs to be exponential (takes longer to aquire more pixels stacked) - // - convert to config options instead of hard-coded - setInterval(async () => { - const DEBUG = false; - - if (DEBUG) Logger.debug("Running pixel stacking..."); - const redis = await Redis.getClient(); - const sockets = await this.io.local.fetchSockets(); - - for (const socket of sockets) { - const sub = await redis.get(Redis.key("socketToSub", socket.id)); - if (!sub) { - if (DEBUG) Logger.warn(`Socket ${socket.id} has no user`); - continue; - } - - const user = await User.fromSub(sub); - if (!user) { - if (DEBUG) - Logger.warn( - `Socket ${socket.id}'s user (${sub}) does not exist in the database` - ); - continue; - } - - // time in seconds since last stack gain (including a potential bonus depending on previously remaining time) - const timeSinceLastPlace = - (Date.now() - user.lastTimeGainStarted.getTime()) / 1000; - const cooldown = CanvasLib.getPixelCooldown( - user.pixelStack + 1, - getClientConfig() - ); - - await user.update(true); - - // this impl has the side affect of giving previously offline users all the stack upon reconnecting - if ( - timeSinceLastPlace >= cooldown && - user.pixelStack < getClientConfig().canvas.pixel.maxStack - ) { - await user.modifyStack(1); - - if (DEBUG) - Logger.debug(sub + " has gained another pixel in their stack"); - } - } - }, 1000); - } } /** -- GitLab From 1cc07e9993eff76c41c3c49909c5aeca94676af7 Mon Sep 17 00:00:00 2001 From: Grant Date: Sat, 15 Feb 2025 21:51:46 -0700 Subject: [PATCH 06/12] implement heatmap generation --- packages/server/src/jobs/Jobs.ts | 33 -------------- packages/server/src/jobs/bullmq.ts | 27 +++++++++--- packages/server/src/jobs/canvas.ts | 70 ++++++++++++++++++++++++++++++ 3 files changed, 90 insertions(+), 40 deletions(-) delete mode 100644 packages/server/src/jobs/Jobs.ts create mode 100644 packages/server/src/jobs/canvas.ts diff --git a/packages/server/src/jobs/Jobs.ts b/packages/server/src/jobs/Jobs.ts deleted file mode 100644 index a21e80a..0000000 --- a/packages/server/src/jobs/Jobs.ts +++ /dev/null @@ -1,33 +0,0 @@ -import Canvas from "../lib/Canvas"; -import { getLogger } from "../lib/Logger"; - -const Logger = getLogger("JOB_WORKER"); - -/** - * Job scheduler - * - * This should run in a different process - */ -export class Jobs { - constructor() { - Logger.info("Starting job worker..."); - - // every 5 minutes - setInterval(this.generateHeatmap, 1000 * 60 * 5); - - this.generateHeatmap(); - } - - async generateHeatmap() { - Logger.info("Generating heatmap..."); - const now = Date.now(); - - await Canvas.generateHeatmap(); - - Logger.info( - "Generated heatmap in " + - ((Date.now() - now) / 1000).toFixed(1) + - " seconds" - ); - } -} diff --git a/packages/server/src/jobs/bullmq.ts b/packages/server/src/jobs/bullmq.ts index 0c2cd1e..c0a6a76 100644 --- a/packages/server/src/jobs/bullmq.ts +++ b/packages/server/src/jobs/bullmq.ts @@ -1,6 +1,7 @@ import { Queue } from "bullmq"; import { getLogger } from "../lib/Logger"; +import { Job_Canvas } from "./canvas"; import { Job_PixelStacking } from "./pixelStacking"; const mainQueue = new Queue("main", { @@ -12,10 +13,22 @@ const mainQueue = new Queue("main", { const Logger = getLogger(); -const worker = Job_PixelStacking.startWorker(); -worker.on("ready", () => { - Logger.info("pixelstacking worker is ready"); -}); -worker.on("error", (err) => { - console.error("pixelstacking worker errored", err); -}); +{ + const worker = Job_PixelStacking.startWorker(); + worker.on("ready", () => { + Logger.info("pixelstacking worker is ready"); + }); + worker.on("error", (err) => { + console.error("pixelstacking worker errored", err); + }); +} + +{ + const worker = Job_Canvas.startWorker(); + worker.on("ready", () => { + Logger.info("pixelstacking worker is ready"); + }); + worker.on("error", (err) => { + console.error("pixelstacking worker errored", err); + }); +} diff --git a/packages/server/src/jobs/canvas.ts b/packages/server/src/jobs/canvas.ts new file mode 100644 index 0000000..c5035b1 --- /dev/null +++ b/packages/server/src/jobs/canvas.ts @@ -0,0 +1,70 @@ +import { Job, Queue, Worker } from "bullmq"; + +import Canvas from "../lib/Canvas"; +import { getLogger } from "../lib/Logger"; + +const Logger = getLogger(); + +/** + * all jobs that are for the canvas (like heatmaps) + */ +export const Job_Canvas = new (class Job_Canvas { + #queue: Queue; + + constructor() { + this.#queue = new Queue("canvas", { + connection: { + url: process.env.REDIS_HOST, + }, + }); + + this.setupScheduler(); + } + + private setupScheduler() { + void this.#queue + .upsertJobScheduler( + "heatmap", + { + every: 1000 * 60 * 5, // 5 minutes + }, + { + name: "heatmap", + } + ) + .then(() => { + Logger.info("canvas worker heatmap upserted job scheduler"); + }); + } + + startWorker() { + const worker = new Worker(this.#queue.name, this.processJob, { + connection: { + url: process.env.REDIS_HOST, + }, + }); + + return worker; + } + + async processJob(job: Job) { + switch (job.name) { + case "heatmap": + await generateHeatmap(); + break; + } + } +})(); + +const generateHeatmap = async () => { + Logger.info("Generating heatmap..."); + const now = Date.now(); + + await Canvas.generateHeatmap(); + + Logger.info( + "Generated heatmap in " + + ((Date.now() - now) / 1000).toFixed(1) + + " seconds" + ); +}; -- GitLab From 3d3c200a566c421c871cf252c182b9f8084aed5c Mon Sep 17 00:00:00 2001 From: Grant Date: Sat, 15 Feb 2025 22:28:28 -0700 Subject: [PATCH 07/12] logger colorize --- packages/server/src/lib/Logger.ts | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/packages/server/src/lib/Logger.ts b/packages/server/src/lib/Logger.ts index 28c1291..94e5456 100644 --- a/packages/server/src/lib/Logger.ts +++ b/packages/server/src/lib/Logger.ts @@ -34,7 +34,11 @@ const formatter = format.printf((options) => { const Winston = winston.createLogger({ level: process.env.LOG_LEVEL || "info", - format: format.combine(format.timestamp(), formatter), + format: format.combine( + format.colorize({ all: true }), + format.timestamp(), + formatter + ), transports: [new winston.transports.Console()], }); -- GitLab From 7e93b4323754449395b2b295e5a19a429d27cea6 Mon Sep 17 00:00:00 2001 From: Grant Date: Sat, 15 Feb 2025 22:43:08 -0700 Subject: [PATCH 08/12] use .env.local & tsx --- packages/server/package.json | 4 ++-- packages/server/tool.sh | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/server/package.json b/packages/server/package.json index 707c987..baa50fb 100644 --- a/packages/server/package.json +++ b/packages/server/package.json @@ -2,7 +2,7 @@ "name": "@sc07-canvas/server", "version": "1.0.0", "scripts": { - "dev": "tsx watch -r dotenv/config src/index.ts", + "dev": "DOTENV_CONFIG_PATH=.env.local tsx watch -r dotenv/config src/index.ts", "start": "node --enable-source-maps dist/index.js", "profiler": "node --inspect=0.0.0.0:9229 --enable-source-maps dist/index.js", "build": "tsc", @@ -11,7 +11,7 @@ "prisma:studio": "dotenv -v BROWSER=none -e .env.local -- prisma studio", "prisma:migrate": "prisma migrate deploy", "prisma:seed:palette": "./tool.sh seed_palette", - "tool": "./tool.sh", + "tool": "dotenv -e .env.local -- ./tool.sh", "pretest": "if [ ! -f .env.test ]; then echo '.env.test not found'; exit 1; fi; dotenv -e .env.test -- prisma migrate reset --force", "test": "dotenv -e .env.test -- jest --coverage --detectOpenHandles" }, diff --git a/packages/server/tool.sh b/packages/server/tool.sh index 36e14bd..8be1ea2 100755 --- a/packages/server/tool.sh +++ b/packages/server/tool.sh @@ -42,5 +42,5 @@ shift; # remove first argument to tool.sh, so we can pass the rest to the tool if $USE_PROD; then node $PROD_TOOLS_ROOT/$TOOL_NAME.js "$@" else - npx ts-node --transpile-only $DEV_TOOLS_ROOT/$TOOL_NAME.ts "$@" + npx tsx $DEV_TOOLS_ROOT/$TOOL_NAME.ts "$@" fi \ No newline at end of file -- GitLab From 3d01d5513d59742e0d43d08a2ff52f6ca0bd3c04 Mon Sep 17 00:00:00 2001 From: Grant Date: Sat, 15 Feb 2025 22:51:00 -0700 Subject: [PATCH 09/12] run workers depending on environment --- packages/server/src/jobs/bullmq.ts | 44 +++++++++---------- packages/server/src/tools/start_job_worker.ts | 4 +- 2 files changed, 24 insertions(+), 24 deletions(-) diff --git a/packages/server/src/jobs/bullmq.ts b/packages/server/src/jobs/bullmq.ts index c0a6a76..2b43b82 100644 --- a/packages/server/src/jobs/bullmq.ts +++ b/packages/server/src/jobs/bullmq.ts @@ -1,34 +1,34 @@ -import { Queue } from "bullmq"; +import { Worker } from "bullmq"; import { getLogger } from "../lib/Logger"; import { Job_Canvas } from "./canvas"; import { Job_PixelStacking } from "./pixelStacking"; -const mainQueue = new Queue("main", { - prefix: "canvas_main_queue", - connection: { - url: process.env.REDIS_HOST, - }, -}); - const Logger = getLogger(); -{ - const worker = Job_PixelStacking.startWorker(); +const setupWorker = (worker: Worker) => { worker.on("ready", () => { - Logger.info("pixelstacking worker is ready"); - }); - worker.on("error", (err) => { - console.error("pixelstacking worker errored", err); + Logger.info(`${worker.name} is now ready`); }); -} -{ - const worker = Job_Canvas.startWorker(); - worker.on("ready", () => { - Logger.info("pixelstacking worker is ready"); - }); worker.on("error", (err) => { - console.error("pixelstacking worker errored", err); + console.error(`${worker.name} error`, err); }); -} +}; + +export const BullMQ_JobManager = new (class BullMQ_JobManager { + constructor() { + if ( + process.env.NODE_ENV === "development" && + !("PROHIBIT_JOB_WORKERS" in process.env) + ) { + Logger.debug("Starting workers for jobs on main thread"); + this.startWorkers(); + } + } + + startWorkers() { + setupWorker(Job_PixelStacking.startWorker()); + setupWorker(Job_Canvas.startWorker()); + } +})(); diff --git a/packages/server/src/tools/start_job_worker.ts b/packages/server/src/tools/start_job_worker.ts index 1632f34..60d63af 100644 --- a/packages/server/src/tools/start_job_worker.ts +++ b/packages/server/src/tools/start_job_worker.ts @@ -1,6 +1,6 @@ -import { Jobs } from "../jobs/Jobs"; +import { BullMQ_JobManager } from "../jobs/bullmq"; import { loadSettings } from "../lib/Settings"; loadSettings(true).then(() => { - new Jobs(); + BullMQ_JobManager.startWorkers(); }); -- GitLab From f037b5fc02687b0d7dce63c636df668f59a1a305 Mon Sep 17 00:00:00 2001 From: Grant Date: Sat, 15 Feb 2025 23:01:18 -0700 Subject: [PATCH 10/12] move online announcement to scheduler --- packages/server/src/jobs/bullmq.ts | 4 +-- .../jobs/{pixelStacking.ts => networking.ts} | 31 ++++++++++++++++--- packages/server/src/lib/SocketServer.ts | 9 ------ 3 files changed, 28 insertions(+), 16 deletions(-) rename packages/server/src/jobs/{pixelStacking.ts => networking.ts} (79%) diff --git a/packages/server/src/jobs/bullmq.ts b/packages/server/src/jobs/bullmq.ts index 2b43b82..eb0c786 100644 --- a/packages/server/src/jobs/bullmq.ts +++ b/packages/server/src/jobs/bullmq.ts @@ -2,7 +2,7 @@ import { Worker } from "bullmq"; import { getLogger } from "../lib/Logger"; import { Job_Canvas } from "./canvas"; -import { Job_PixelStacking } from "./pixelStacking"; +import { Job_Networking } from "./networking"; const Logger = getLogger(); @@ -28,7 +28,7 @@ export const BullMQ_JobManager = new (class BullMQ_JobManager { } startWorkers() { - setupWorker(Job_PixelStacking.startWorker()); + setupWorker(Job_Networking.startWorker()); setupWorker(Job_Canvas.startWorker()); } })(); diff --git a/packages/server/src/jobs/pixelStacking.ts b/packages/server/src/jobs/networking.ts similarity index 79% rename from packages/server/src/jobs/pixelStacking.ts rename to packages/server/src/jobs/networking.ts index 7a4a36a..90282bd 100644 --- a/packages/server/src/jobs/pixelStacking.ts +++ b/packages/server/src/jobs/networking.ts @@ -8,12 +8,12 @@ import { User } from "../models/User"; const Logger = getLogger(); -export const Job_PixelStacking = new (class Job_PixelStacking { +export const Job_Networking = new (class Job_Networking { #queue: Queue; #worker?: Worker; constructor() { - this.#queue = new Queue("pixel-stacking", { + this.#queue = new Queue("networking", { connection: { url: process.env.REDIS_HOST, }, @@ -31,14 +31,25 @@ export const Job_PixelStacking = new (class Job_PixelStacking { }, { name: "pixel-stacking", - data: { - test: true, - }, } ) .then(() => { Logger.info("pixel stacking upserted job scheduler"); }); + + void this.#queue + .upsertJobScheduler( + "online", + { + every: 1000 * 5, // 5 seconds + }, + { + name: "online", + } + ) + .then(() => { + Logger.info("online status upserted job scheduler"); + }); } startWorker() { @@ -56,6 +67,9 @@ export const Job_PixelStacking = new (class Job_PixelStacking { case "pixel-stacking": await runPixelStacking(); break; + case "online": + await runOnlineCount(); + break; } } @@ -108,3 +122,10 @@ const runPixelStacking = async () => { } } }; + +const runOnlineCount = async () => { + const sockets = await SocketServer.instance.io.sockets.fetchSockets(); + for (const socket of sockets) { + socket.emit("online", { count: sockets.length }); + } +}; diff --git a/packages/server/src/lib/SocketServer.ts b/packages/server/src/lib/SocketServer.ts index 603fecf..cc3f2fb 100644 --- a/packages/server/src/lib/SocketServer.ts +++ b/packages/server/src/lib/SocketServer.ts @@ -355,15 +355,6 @@ export class SocketServer { async setupMasterShard() { if (process.env.NODE_ENV === "test") return; - // online announcement event - setInterval(async () => { - // possible issue: this includes every connected socket, not user count - const sockets = await this.io.sockets.fetchSockets(); - for (const socket of sockets) { - socket.emit("online", { count: sockets.length }); - } - }, 5000); - const redis = await Redis.getClient("SUB"); redis.subscribe(Redis.key("channel_heatmap"), (message, _channel) => { this.io.to("sub:heatmap").emit("heatmap", message); -- GitLab From 7cf8f50d75a1128f60a4250622108917bd691c6d Mon Sep 17 00:00:00 2001 From: Grant Date: Sun, 16 Feb 2025 12:39:41 -0700 Subject: [PATCH 11/12] eslint ignore for existing cache workers --- packages/server/src/workers/canvas_cache.ts | 2 ++ packages/server/src/workers/worker.ts | 2 ++ 2 files changed, 4 insertions(+) diff --git a/packages/server/src/workers/canvas_cache.ts b/packages/server/src/workers/canvas_cache.ts index 6ae2979..94ad70c 100644 --- a/packages/server/src/workers/canvas_cache.ts +++ b/packages/server/src/workers/canvas_cache.ts @@ -85,6 +85,8 @@ const startWriteQueue = async () => { Redis.key("canvas_cache_write_queue", workerId) ); if (!item) { + // rewriting the workers system to use bullmq would be the solution to this + // eslint-disable-next-line no-restricted-globals setTimeout(() => { startWriteQueue(); }, 250); diff --git a/packages/server/src/workers/worker.ts b/packages/server/src/workers/worker.ts index d09c72d..f388964 100644 --- a/packages/server/src/workers/worker.ts +++ b/packages/server/src/workers/worker.ts @@ -161,6 +161,8 @@ export const callCacheWorker = (type: string, data: any) => { res(); clearTimeout(watchdog); }; + // rewriting the workers system to use bullmq would be the solution to this + // eslint-disable-next-line no-restricted-globals const watchdog = setTimeout(() => { Logger.error( `Callback for ${type} ${callbackId} has taken too long, is it dead?` -- GitLab From 3d7639353d32da17dca824b114ae3ec365e65601 Mon Sep 17 00:00:00 2001 From: Grant Date: Sun, 16 Feb 2025 12:48:31 -0700 Subject: [PATCH 12/12] remove console usages --- packages/server/src/jobs/bullmq.ts | 2 +- packages/server/src/lib/Logger.ts | 22 +++++++++++++++++++++- packages/server/src/utils/Palette.ts | 7 +++++-- 3 files changed, 27 insertions(+), 4 deletions(-) diff --git a/packages/server/src/jobs/bullmq.ts b/packages/server/src/jobs/bullmq.ts index eb0c786..bb706ce 100644 --- a/packages/server/src/jobs/bullmq.ts +++ b/packages/server/src/jobs/bullmq.ts @@ -12,7 +12,7 @@ const setupWorker = (worker: Worker) => { }); worker.on("error", (err) => { - console.error(`${worker.name} error`, err); + Logger.error(`${worker.name} error`, { error: err }); }); }; diff --git a/packages/server/src/lib/Logger.ts b/packages/server/src/lib/Logger.ts index 94e5456..00de568 100644 --- a/packages/server/src/lib/Logger.ts +++ b/packages/server/src/lib/Logger.ts @@ -29,12 +29,32 @@ const formatter = format.printf((options) => { options.message + "", ]; - return parts.join("\t"); + let message = parts.join("\t"); + + if (process.env.NODE_ENV !== "production") { + // in production we assume sentry is active, which will catch the traces + + if ("stack" in options) { + // if not in production and the stack is passed, append it to the log line to aid in debugging + message += "\n" + options.stack; + } + + if ( + "error" in options && + options.error instanceof Error && + options.error.stack + ) { + message += "\n" + options.error.stack; + } + } + + return message; }); const Winston = winston.createLogger({ level: process.env.LOG_LEVEL || "info", format: format.combine( + format.errors({ stack: true }), format.colorize({ all: true }), format.timestamp(), formatter diff --git a/packages/server/src/utils/Palette.ts b/packages/server/src/utils/Palette.ts index 5876957..befb90c 100644 --- a/packages/server/src/utils/Palette.ts +++ b/packages/server/src/utils/Palette.ts @@ -1,7 +1,10 @@ import { PaletteColor } from "@prisma/client"; +import { getLogger } from "../lib/Logger"; import { prisma } from "../lib/prisma"; +const Logger = getLogger(); + export const Palette = new (class Palette { private palette: PaletteColor[] = []; @@ -16,10 +19,10 @@ export const Palette = new (class Palette { .findMany() .then((paletteColors) => { this.palette = paletteColors; - console.info(`Loaded ${paletteColors.length} pallete colors`); + Logger.info(`Loaded ${paletteColors.length} pallete colors`); }) .catch((e) => { - console.error("Failed to get pallete colors", e); + Logger.error("Failed to get pallete colors", { error: e }); }); } -- GitLab