diff --git a/package-lock.json b/package-lock.json index e98548f6173a99047dff443bce783a7bf4a5acb9..5d0741f38572f7aae806a7cdc00c33f0098d5d80 100644 --- a/package-lock.json +++ b/package-lock.json @@ -2965,6 +2965,11 @@ "@swc/helpers": "^0.5.0" } }, + "node_modules/@ioredis/commands": { + "version": "1.2.0", + "resolved": "https://registry.npmjs.org/@ioredis/commands/-/commands-1.2.0.tgz", + "integrity": "sha512-Sx1pU8EM64o2BrqNpEO1CNLtKQwyhuXuqyfH7oGKCk+1a33d2r5saW8zNwm3j6BTExtjrv2BxTgzzkMwts6vGg==" + }, "node_modules/@isaacs/cliui": { "version": "8.0.2", "resolved": "https://registry.npmjs.org/@isaacs/cliui/-/cliui-8.0.2.tgz", @@ -3503,6 +3508,78 @@ "@jridgewell/sourcemap-codec": "^1.4.10" } }, + "node_modules/@msgpackr-extract/msgpackr-extract-darwin-arm64": { + "version": "3.0.3", + "resolved": "https://registry.npmjs.org/@msgpackr-extract/msgpackr-extract-darwin-arm64/-/msgpackr-extract-darwin-arm64-3.0.3.tgz", + "integrity": "sha512-QZHtlVgbAdy2zAqNA9Gu1UpIuI8Xvsd1v8ic6B2pZmeFnFcMWiPLfWXh7TVw4eGEZ/C9TH281KwhVoeQUKbyjw==", + "cpu": [ + "arm64" + ], + "optional": true, + "os": [ + "darwin" + ] + }, + "node_modules/@msgpackr-extract/msgpackr-extract-darwin-x64": { + "version": "3.0.3", + "resolved": "https://registry.npmjs.org/@msgpackr-extract/msgpackr-extract-darwin-x64/-/msgpackr-extract-darwin-x64-3.0.3.tgz", + "integrity": "sha512-mdzd3AVzYKuUmiWOQ8GNhl64/IoFGol569zNRdkLReh6LRLHOXxU4U8eq0JwaD8iFHdVGqSy4IjFL4reoWCDFw==", + "cpu": [ + "x64" + ], + "optional": true, + "os": [ + "darwin" + ] + }, + "node_modules/@msgpackr-extract/msgpackr-extract-linux-arm": { + "version": "3.0.3", + "resolved": "https://registry.npmjs.org/@msgpackr-extract/msgpackr-extract-linux-arm/-/msgpackr-extract-linux-arm-3.0.3.tgz", + "integrity": "sha512-fg0uy/dG/nZEXfYilKoRe7yALaNmHoYeIoJuJ7KJ+YyU2bvY8vPv27f7UKhGRpY6euFYqEVhxCFZgAUNQBM3nw==", + "cpu": [ + "arm" + ], + "optional": true, + "os": [ + "linux" + ] + }, + "node_modules/@msgpackr-extract/msgpackr-extract-linux-arm64": { + "version": "3.0.3", + "resolved": "https://registry.npmjs.org/@msgpackr-extract/msgpackr-extract-linux-arm64/-/msgpackr-extract-linux-arm64-3.0.3.tgz", + "integrity": "sha512-YxQL+ax0XqBJDZiKimS2XQaf+2wDGVa1enVRGzEvLLVFeqa5kx2bWbtcSXgsxjQB7nRqqIGFIcLteF/sHeVtQg==", + "cpu": [ + "arm64" + ], + "optional": true, + "os": [ + "linux" + ] + }, + "node_modules/@msgpackr-extract/msgpackr-extract-linux-x64": { + "version": "3.0.3", + "resolved": "https://registry.npmjs.org/@msgpackr-extract/msgpackr-extract-linux-x64/-/msgpackr-extract-linux-x64-3.0.3.tgz", + "integrity": "sha512-cvwNfbP07pKUfq1uH+S6KJ7dT9K8WOE4ZiAcsrSes+UY55E/0jLYc+vq+DO7jlmqRb5zAggExKm0H7O/CBaesg==", + "cpu": [ + "x64" + ], + "optional": true, + "os": [ + "linux" + ] + }, + "node_modules/@msgpackr-extract/msgpackr-extract-win32-x64": { + "version": "3.0.3", + "resolved": "https://registry.npmjs.org/@msgpackr-extract/msgpackr-extract-win32-x64/-/msgpackr-extract-win32-x64-3.0.3.tgz", + "integrity": "sha512-x0fWaQtYp4E6sktbsdAqnehxDgEc/VwM7uLsRCYWaiGu0ykYdZPiS8zCWdnjHwyiumousxfBm4SO31eXqwEZhQ==", + "cpu": [ + "x64" + ], + "optional": true, + "os": [ + "win32" + ] + }, "node_modules/@nextui-org/accordion": { "version": "2.2.7", "resolved": "https://registry.npmjs.org/@nextui-org/accordion/-/accordion-2.2.7.tgz", @@ -10165,6 +10242,20 @@ "integrity": "sha512-E+XQCRwSbaaiChtv6k6Dwgc+bx+Bs6vuKJHHl5kox/BaKbhiXzqQOwK4cO22yElGp2OCmjwVhT3HmxgyPGnJfQ==", "dev": true }, + "node_modules/bullmq": { + "version": "5.40.2", + "resolved": "https://registry.npmjs.org/bullmq/-/bullmq-5.40.2.tgz", + "integrity": "sha512-Cn4NUpwGAF4WnuXR2kTZCTAUEUHajSCn/IqiDG9ry1kVvAwwwg1Ati3J5HN2uZjqD5PBfNDXYnsc2+0PzakDwg==", + "dependencies": { + "cron-parser": "^4.9.0", + "ioredis": "^5.4.1", + "msgpackr": "^1.11.2", + "node-abort-controller": "^3.1.1", + "semver": "^7.5.4", + "tslib": "^2.0.0", + "uuid": "^9.0.0" + } + }, "node_modules/bytes": { "version": "3.1.2", "resolved": "https://registry.npmjs.org/bytes/-/bytes-3.1.2.tgz", @@ -10700,6 +10791,17 @@ "integrity": "sha512-dcKFX3jn0MpIaXjisoRvexIJVEKzaq7z2rZKxf+MSr9TkdmHmsU4m2lcLojrj/FHl8mk5VxMmYA+ftRkP/3oKQ==", "devOptional": true }, + "node_modules/cron-parser": { + "version": "4.9.0", + "resolved": "https://registry.npmjs.org/cron-parser/-/cron-parser-4.9.0.tgz", + "integrity": "sha512-p0SaNjrHOnQeR8/VnfGbmg9te2kfyYSQ7Sc/j/6DtPL3JQvKxmjO9TSjNFpujqV3vEYYBvNNvXSxzyksBWAx1Q==", + "dependencies": { + "luxon": "^3.2.1" + }, + "engines": { + "node": ">=12.0.0" + } + }, "node_modules/cross-spawn": { "version": "7.0.6", "resolved": "https://registry.npmjs.org/cross-spawn/-/cross-spawn-7.0.6.tgz", @@ -10873,6 +10975,14 @@ "node": ">=0.4.0" } }, + "node_modules/denque": { + "version": "2.1.0", + "resolved": "https://registry.npmjs.org/denque/-/denque-2.1.0.tgz", + "integrity": "sha512-HVQE3AAb/pxF8fQAoiqpvg9i3evqug3hoiwakOyZAwJm+6vZehbkYXZ0l4JxS+I3QxM97v5aaRNhj8v5oBhekw==", + "engines": { + "node": ">=0.10" + } + }, "node_modules/depd": { "version": "2.0.0", "resolved": "https://registry.npmjs.org/depd/-/depd-2.0.0.tgz", @@ -13425,6 +13535,50 @@ "tslib": "2" } }, + "node_modules/ioredis": { + "version": "5.5.0", + "resolved": "https://registry.npmjs.org/ioredis/-/ioredis-5.5.0.tgz", + "integrity": "sha512-7CutT89g23FfSa8MDoIFs2GYYa0PaNiW/OrT+nRyjRXHDZd17HmIgy+reOQ/yhh72NznNjGuS8kbCAcA4Ro4mw==", + "dependencies": { + "@ioredis/commands": "^1.1.1", + "cluster-key-slot": "^1.1.0", + "debug": "^4.3.4", + "denque": "^2.1.0", + "lodash.defaults": "^4.2.0", + "lodash.isarguments": "^3.1.0", + "redis-errors": "^1.2.0", + "redis-parser": "^3.0.0", + "standard-as-callback": "^2.1.0" + }, + "engines": { + "node": ">=12.22.0" + }, + "funding": { + "type": "opencollective", + "url": "https://opencollective.com/ioredis" + } + }, + "node_modules/ioredis/node_modules/debug": { + "version": "4.4.0", + "resolved": "https://registry.npmjs.org/debug/-/debug-4.4.0.tgz", + "integrity": "sha512-6WTZ/IxCY/T6BALoZHaE4ctp9xm+Z5kY/pzYaCHRFeyVhojxlrm+46y68HA6hr0TcwEssoxNiDEUJQjfPZ/RYA==", + "dependencies": { + "ms": "^2.1.3" + }, + "engines": { + "node": ">=6.0" + }, + "peerDependenciesMeta": { + "supports-color": { + "optional": true + } + } + }, + "node_modules/ioredis/node_modules/ms": { + "version": "2.1.3", + "resolved": "https://registry.npmjs.org/ms/-/ms-2.1.3.tgz", + "integrity": "sha512-6FlzubTLZG3J2a/NVCAleEhjzq5oxgHyaCU9yYXvcLsvoVaHJq/s5xXI6/XXP6tz7R9xAOtHnSO/tXtF3WRTlA==" + }, "node_modules/ipaddr.js": { "version": "1.9.1", "resolved": "https://registry.npmjs.org/ipaddr.js/-/ipaddr.js-1.9.1.tgz", @@ -14842,6 +14996,16 @@ "integrity": "sha512-FT1yDzDYEoYWhnSGnpE/4Kj1fLZkDFyqRb7fNt6FdYOSxlUWAtp42Eh6Wb0rGIv/m9Bgo7x4GhQbm5Ys4SG5ow==", "dev": true }, + "node_modules/lodash.defaults": { + "version": "4.2.0", + "resolved": "https://registry.npmjs.org/lodash.defaults/-/lodash.defaults-4.2.0.tgz", + "integrity": "sha512-qjxPLHd3r5DnsdGacqOMU6pb/avJzdh9tFX2ymgoZE27BmjXrNy/y4LoaiTeAb+O3gL8AfpJGtqfX/ae2leYYQ==" + }, + "node_modules/lodash.isarguments": { + "version": "3.1.0", + "resolved": "https://registry.npmjs.org/lodash.isarguments/-/lodash.isarguments-3.1.0.tgz", + "integrity": "sha512-chi4NHZlZqZD18a0imDHnZPrDeBbTtVN7GXMwuGdRH9qotxAjYs3aVLKc7zNOG9eddR5Ksd8rvFEBc9SsggPpg==" + }, "node_modules/lodash.memoize": { "version": "4.1.2", "resolved": "https://registry.npmjs.org/lodash.memoize/-/lodash.memoize-4.1.2.tgz", @@ -14892,6 +15056,14 @@ "loose-envify": "cli.js" } }, + "node_modules/luxon": { + "version": "3.5.0", + "resolved": "https://registry.npmjs.org/luxon/-/luxon-3.5.0.tgz", + "integrity": "sha512-rh+Zjr6DNfUYR3bPwJEnuwDdqMbxZW7LOQfUN4B54+Cl+0o5zaU9RJ6bcidfDtC1cWCZXQ+nvX8bf6bAji37QQ==", + "engines": { + "node": ">=12" + } + }, "node_modules/magic-string": { "version": "0.30.8", "resolved": "https://registry.npmjs.org/magic-string/-/magic-string-0.30.8.tgz", @@ -15096,6 +15268,35 @@ "resolved": "https://registry.npmjs.org/ms/-/ms-2.0.0.tgz", "integrity": "sha512-Tpp60P6IUJDTuOq/5Z8cdskzJujfwqfOTkrwIwj7IRISpnkJnT6SyJ4PCPnGMoFjC9ddhal5KVIYtAt97ix05A==" }, + "node_modules/msgpackr": { + "version": "1.11.2", + "resolved": "https://registry.npmjs.org/msgpackr/-/msgpackr-1.11.2.tgz", + "integrity": "sha512-F9UngXRlPyWCDEASDpTf6c9uNhGPTqnTeLVt7bN+bU1eajoR/8V9ys2BRaV5C/e5ihE6sJ9uPIKaYt6bFuO32g==", + "optionalDependencies": { + "msgpackr-extract": "^3.0.2" + } + }, + "node_modules/msgpackr-extract": { + "version": "3.0.3", + "resolved": "https://registry.npmjs.org/msgpackr-extract/-/msgpackr-extract-3.0.3.tgz", + "integrity": "sha512-P0efT1C9jIdVRefqjzOQ9Xml57zpOXnIuS+csaB4MdZbTdmGDLo8XhzBG1N7aO11gKDDkJvBLULeFTo46wwreA==", + "hasInstallScript": true, + "optional": true, + "dependencies": { + "node-gyp-build-optional-packages": "5.2.2" + }, + "bin": { + "download-msgpackr-prebuilds": "bin/download-prebuilds.js" + }, + "optionalDependencies": { + "@msgpackr-extract/msgpackr-extract-darwin-arm64": "3.0.3", + "@msgpackr-extract/msgpackr-extract-darwin-x64": "3.0.3", + "@msgpackr-extract/msgpackr-extract-linux-arm": "3.0.3", + "@msgpackr-extract/msgpackr-extract-linux-arm64": "3.0.3", + "@msgpackr-extract/msgpackr-extract-linux-x64": "3.0.3", + "@msgpackr-extract/msgpackr-extract-win32-x64": "3.0.3" + } + }, "node_modules/mz": { "version": "2.7.0", "resolved": "https://registry.npmjs.org/mz/-/mz-2.7.0.tgz", @@ -15152,6 +15353,11 @@ "react-dom": "^16.8 || ^17 || ^18 || ^19 || ^19.0.0-rc" } }, + "node_modules/node-abort-controller": { + "version": "3.1.1", + "resolved": "https://registry.npmjs.org/node-abort-controller/-/node-abort-controller-3.1.1.tgz", + "integrity": "sha512-AGK2yQKIjRuqnc6VkX2Xj5d+QW8xZ87pa1UK6yA6ouUyuxfHuMP6umE5QK7UmTeOAymo+Zx1Fxiuw9rVx8taHQ==" + }, "node_modules/node-addon-api": { "version": "7.1.1", "resolved": "https://registry.npmjs.org/node-addon-api/-/node-addon-api-7.1.1.tgz", @@ -15180,6 +15386,29 @@ } } }, + "node_modules/node-gyp-build-optional-packages": { + "version": "5.2.2", + "resolved": "https://registry.npmjs.org/node-gyp-build-optional-packages/-/node-gyp-build-optional-packages-5.2.2.tgz", + "integrity": "sha512-s+w+rBWnpTMwSFbaE0UXsRlg7hU4FjekKU4eyAih5T8nJuNZT1nNsskXpxmeqSK9UzkBl6UgRlnKc8hz8IEqOw==", + "optional": true, + "dependencies": { + "detect-libc": "^2.0.1" + }, + "bin": { + "node-gyp-build-optional-packages": "bin.js", + "node-gyp-build-optional-packages-optional": "optional.js", + "node-gyp-build-optional-packages-test": "build-test.js" + } + }, + "node_modules/node-gyp-build-optional-packages/node_modules/detect-libc": { + "version": "2.0.3", + "resolved": "https://registry.npmjs.org/detect-libc/-/detect-libc-2.0.3.tgz", + "integrity": "sha512-bwy0MGW55bG41VqxxypOsdSdGqLwXPI/focwgTYCFMbdUiBAxLg9CFzG08sz2aqzknwiX7Hkl0bQENjg8iLByw==", + "optional": true, + "engines": { + "node": ">=8" + } + }, "node_modules/node-int64": { "version": "0.4.0", "resolved": "https://registry.npmjs.org/node-int64/-/node-int64-0.4.0.tgz", @@ -16405,6 +16634,14 @@ "@redis/time-series": "1.1.0" } }, + "node_modules/redis-errors": { + "version": "1.2.0", + "resolved": "https://registry.npmjs.org/redis-errors/-/redis-errors-1.2.0.tgz", + "integrity": "sha512-1qny3OExCf0UvUV/5wpYKf2YwPcOqXzkwKKSmKHiE6ZMQs5heeE/c8eXK+PNllPvmjgAbfnsbpkGZWy8cBpn9w==", + "engines": { + "node": ">=4" + } + }, "node_modules/redis-mock": { "version": "0.56.3", "resolved": "https://registry.npmjs.org/redis-mock/-/redis-mock-0.56.3.tgz", @@ -16415,6 +16652,17 @@ "node": ">=6" } }, + "node_modules/redis-parser": { + "version": "3.0.0", + "resolved": "https://registry.npmjs.org/redis-parser/-/redis-parser-3.0.0.tgz", + "integrity": "sha512-DJnGAeenTdpMEH6uAJRK/uiyEIH9WVsUmoLwzudwGJUwZPp80PDBWPHXSAGNPwNvIXAbe7MSUB1zQFugFml66A==", + "dependencies": { + "redis-errors": "^1.0.0" + }, + "engines": { + "node": ">=4" + } + }, "node_modules/reflect.getprototypeof": { "version": "1.0.9", "resolved": "https://registry.npmjs.org/reflect.getprototypeof/-/reflect.getprototypeof-1.0.9.tgz", @@ -17332,6 +17580,11 @@ "node": ">=8" } }, + "node_modules/standard-as-callback": { + "version": "2.1.0", + "resolved": "https://registry.npmjs.org/standard-as-callback/-/standard-as-callback-2.1.0.tgz", + "integrity": "sha512-qoRRSyROncaz1z0mvYqIE4lCd9p2R90i6GxW3uZv5ucSu8tU7B5HXUP1gG8pVZsYNVaXjk8ClXHPttLyxAL48A==" + }, "node_modules/statuses": { "version": "2.0.1", "resolved": "https://registry.npmjs.org/statuses/-/statuses-2.0.1.tgz", @@ -18907,6 +19160,18 @@ "node": ">= 0.4.0" } }, + "node_modules/uuid": { + "version": "9.0.1", + "resolved": "https://registry.npmjs.org/uuid/-/uuid-9.0.1.tgz", + "integrity": "sha512-b+1eJOlsR9K8HJpow9Ok3fiWOWSIcIzXodvv0rQjVoOVNpWMpxf1wZNpt4y9h10odCNrqnYp1OBzRktckBe3sA==", + "funding": [ + "https://github.com/sponsors/broofa", + "https://github.com/sponsors/ctavan" + ], + "bin": { + "uuid": "dist/bin/uuid" + } + }, "node_modules/v8-compile-cache-lib": { "version": "3.0.1", "resolved": "https://registry.npmjs.org/v8-compile-cache-lib/-/v8-compile-cache-lib-3.0.1.tgz", @@ -19585,6 +19850,7 @@ "@sc07-canvas/lib": "^1.0.0", "@sentry/node": "^8.47.0", "body-parser": "^1.20.2", + "bullmq": "^5.40.2", "connect-redis": "^8.0.1", "cors": "^2.8.5", "express": "^4.21.2", diff --git a/packages/server/.eslintrc.json b/packages/server/.eslintrc.json index cf3c95e38ff53e8621a143a93d2a30dcba43b6fc..80e0728735e8a43c695ded11847de777c9570ac7 100644 --- a/packages/server/.eslintrc.json +++ b/packages/server/.eslintrc.json @@ -36,6 +36,17 @@ "varsIgnorePattern": "^_", "ignoreRestSiblings": true } + ], + "no-restricted-globals": [ + "error", + { + "name": "setInterval", + "message": "Avoid using timers. Consider using job queue." + }, + { + "name": "setTimeout", + "message": "Avoid using timers. Consider using job queue." + } ] } } diff --git a/packages/server/package.json b/packages/server/package.json index e0816979faa9e0036201d819fb45802496137c7f..baa50fb611ec10d0066a1f351863f551953de112 100644 --- a/packages/server/package.json +++ b/packages/server/package.json @@ -2,7 +2,7 @@ "name": "@sc07-canvas/server", "version": "1.0.0", "scripts": { - "dev": "tsx watch -r dotenv/config src/index.ts", + "dev": "DOTENV_CONFIG_PATH=.env.local tsx watch -r dotenv/config src/index.ts", "start": "node --enable-source-maps dist/index.js", "profiler": "node --inspect=0.0.0.0:9229 --enable-source-maps dist/index.js", "build": "tsc", @@ -11,7 +11,7 @@ "prisma:studio": "dotenv -v BROWSER=none -e .env.local -- prisma studio", "prisma:migrate": "prisma migrate deploy", "prisma:seed:palette": "./tool.sh seed_palette", - "tool": "./tool.sh", + "tool": "dotenv -e .env.local -- ./tool.sh", "pretest": "if [ ! -f .env.test ]; then echo '.env.test not found'; exit 1; fi; dotenv -e .env.test -- prisma migrate reset --force", "test": "dotenv -e .env.test -- jest --coverage --detectOpenHandles" }, @@ -43,6 +43,7 @@ "@sc07-canvas/lib": "^1.0.0", "@sentry/node": "^8.47.0", "body-parser": "^1.20.2", + "bullmq": "^5.40.2", "connect-redis": "^8.0.1", "cors": "^2.8.5", "express": "^4.21.2", diff --git a/packages/server/src/index.ts b/packages/server/src/index.ts index 579cc5943b3a48d983b18f9df0f0d30cd8ea0b14..915304265104e27bd204004e8ad27dfac8409e46 100644 --- a/packages/server/src/index.ts +++ b/packages/server/src/index.ts @@ -2,6 +2,7 @@ import "./lib/sentry"; // load declare module import "./types"; import "./workers/worker"; +import "./jobs/bullmq"; import { ExpressServer } from "./lib/Express"; import { getLogger } from "./lib/Logger"; diff --git a/packages/server/src/jobs/Jobs.ts b/packages/server/src/jobs/Jobs.ts deleted file mode 100644 index a21e80a7600f8f2fc1b63d08d65caa0c8affab8d..0000000000000000000000000000000000000000 --- a/packages/server/src/jobs/Jobs.ts +++ /dev/null @@ -1,33 +0,0 @@ -import Canvas from "../lib/Canvas"; -import { getLogger } from "../lib/Logger"; - -const Logger = getLogger("JOB_WORKER"); - -/** - * Job scheduler - * - * This should run in a different process - */ -export class Jobs { - constructor() { - Logger.info("Starting job worker..."); - - // every 5 minutes - setInterval(this.generateHeatmap, 1000 * 60 * 5); - - this.generateHeatmap(); - } - - async generateHeatmap() { - Logger.info("Generating heatmap..."); - const now = Date.now(); - - await Canvas.generateHeatmap(); - - Logger.info( - "Generated heatmap in " + - ((Date.now() - now) / 1000).toFixed(1) + - " seconds" - ); - } -} diff --git a/packages/server/src/jobs/bullmq.ts b/packages/server/src/jobs/bullmq.ts new file mode 100644 index 0000000000000000000000000000000000000000..bb706ced1e1b29d836adfe8b6e894930435bc09a --- /dev/null +++ b/packages/server/src/jobs/bullmq.ts @@ -0,0 +1,34 @@ +import { Worker } from "bullmq"; + +import { getLogger } from "../lib/Logger"; +import { Job_Canvas } from "./canvas"; +import { Job_Networking } from "./networking"; + +const Logger = getLogger(); + +const setupWorker = (worker: Worker) => { + worker.on("ready", () => { + Logger.info(`${worker.name} is now ready`); + }); + + worker.on("error", (err) => { + Logger.error(`${worker.name} error`, { error: err }); + }); +}; + +export const BullMQ_JobManager = new (class BullMQ_JobManager { + constructor() { + if ( + process.env.NODE_ENV === "development" && + !("PROHIBIT_JOB_WORKERS" in process.env) + ) { + Logger.debug("Starting workers for jobs on main thread"); + this.startWorkers(); + } + } + + startWorkers() { + setupWorker(Job_Networking.startWorker()); + setupWorker(Job_Canvas.startWorker()); + } +})(); diff --git a/packages/server/src/jobs/canvas.ts b/packages/server/src/jobs/canvas.ts new file mode 100644 index 0000000000000000000000000000000000000000..c5035b1796bbe3fe9fe0dd4b5cbb73509e86752d --- /dev/null +++ b/packages/server/src/jobs/canvas.ts @@ -0,0 +1,70 @@ +import { Job, Queue, Worker } from "bullmq"; + +import Canvas from "../lib/Canvas"; +import { getLogger } from "../lib/Logger"; + +const Logger = getLogger(); + +/** + * all jobs that are for the canvas (like heatmaps) + */ +export const Job_Canvas = new (class Job_Canvas { + #queue: Queue; + + constructor() { + this.#queue = new Queue("canvas", { + connection: { + url: process.env.REDIS_HOST, + }, + }); + + this.setupScheduler(); + } + + private setupScheduler() { + void this.#queue + .upsertJobScheduler( + "heatmap", + { + every: 1000 * 60 * 5, // 5 minutes + }, + { + name: "heatmap", + } + ) + .then(() => { + Logger.info("canvas worker heatmap upserted job scheduler"); + }); + } + + startWorker() { + const worker = new Worker(this.#queue.name, this.processJob, { + connection: { + url: process.env.REDIS_HOST, + }, + }); + + return worker; + } + + async processJob(job: Job) { + switch (job.name) { + case "heatmap": + await generateHeatmap(); + break; + } + } +})(); + +const generateHeatmap = async () => { + Logger.info("Generating heatmap..."); + const now = Date.now(); + + await Canvas.generateHeatmap(); + + Logger.info( + "Generated heatmap in " + + ((Date.now() - now) / 1000).toFixed(1) + + " seconds" + ); +}; diff --git a/packages/server/src/jobs/networking.ts b/packages/server/src/jobs/networking.ts new file mode 100644 index 0000000000000000000000000000000000000000..90282bdbea8a2e80059cdf3d9ea09e3b3a44468f --- /dev/null +++ b/packages/server/src/jobs/networking.ts @@ -0,0 +1,131 @@ +import { CanvasLib } from "@sc07-canvas/lib"; +import { Job, Queue, Worker } from "bullmq"; + +import { getLogger } from "../lib/Logger"; +import { Redis } from "../lib/redis"; +import { getClientConfig, SocketServer } from "../lib/SocketServer"; +import { User } from "../models/User"; + +const Logger = getLogger(); + +export const Job_Networking = new (class Job_Networking { + #queue: Queue; + #worker?: Worker; + + constructor() { + this.#queue = new Queue("networking", { + connection: { + url: process.env.REDIS_HOST, + }, + }); + + this.setupScheduler(); + } + + private setupScheduler() { + void this.#queue + .upsertJobScheduler( + "pixel-stacking", + { + every: 1000, + }, + { + name: "pixel-stacking", + } + ) + .then(() => { + Logger.info("pixel stacking upserted job scheduler"); + }); + + void this.#queue + .upsertJobScheduler( + "online", + { + every: 1000 * 5, // 5 seconds + }, + { + name: "online", + } + ) + .then(() => { + Logger.info("online status upserted job scheduler"); + }); + } + + startWorker() { + this.#worker = new Worker(this.#queue.name, this.processJob, { + connection: { + url: process.env.REDIS_HOST, + }, + }); + + return this.#worker!; + } + + async processJob(job: Job) { + switch (job.name) { + case "pixel-stacking": + await runPixelStacking(); + break; + case "online": + await runOnlineCount(); + break; + } + } + + get queue() { + return this.#queue; + } +})(); + +const runPixelStacking = async () => { + const DEBUG = false; + + if (DEBUG) Logger.debug("Running pixel stacking..."); + const redis = await Redis.getClient(); + const sockets = await SocketServer.instance.io.local.fetchSockets(); + + for (const socket of sockets) { + const sub = await redis.get(Redis.key("socketToSub", socket.id)); + if (!sub) { + if (DEBUG) Logger.warn(`Socket ${socket.id} has no user`); + continue; + } + + const user = await User.fromSub(sub); + if (!user) { + if (DEBUG) + Logger.warn( + `Socket ${socket.id}'s user (${sub}) does not exist in the database` + ); + continue; + } + + // time in seconds since last stack gain (including a potential bonus depending on previously remaining time) + const timeSinceLastPlace = + (Date.now() - user.lastTimeGainStarted.getTime()) / 1000; + const cooldown = CanvasLib.getPixelCooldown( + user.pixelStack + 1, + getClientConfig() + ); + + await user.update(true); + + // this impl has the side affect of giving previously offline users all the stack upon reconnecting + if ( + timeSinceLastPlace >= cooldown && + user.pixelStack < getClientConfig().canvas.pixel.maxStack + ) { + await user.modifyStack(1); + + if (DEBUG) Logger.debug(sub + " has gained another pixel in their stack"); + } + } +}; + +const runOnlineCount = async () => { + const sockets = await SocketServer.instance.io.sockets.fetchSockets(); + for (const socket of sockets) { + socket.emit("online", { count: sockets.length }); + } +}; diff --git a/packages/server/src/lib/Logger.ts b/packages/server/src/lib/Logger.ts index 28c12917f113879be6dc492c93c310c01e64a012..00de5689bfeed339f098019b520d91a0badff012 100644 --- a/packages/server/src/lib/Logger.ts +++ b/packages/server/src/lib/Logger.ts @@ -29,12 +29,36 @@ const formatter = format.printf((options) => { options.message + "", ]; - return parts.join("\t"); + let message = parts.join("\t"); + + if (process.env.NODE_ENV !== "production") { + // in production we assume sentry is active, which will catch the traces + + if ("stack" in options) { + // if not in production and the stack is passed, append it to the log line to aid in debugging + message += "\n" + options.stack; + } + + if ( + "error" in options && + options.error instanceof Error && + options.error.stack + ) { + message += "\n" + options.error.stack; + } + } + + return message; }); const Winston = winston.createLogger({ level: process.env.LOG_LEVEL || "info", - format: format.combine(format.timestamp(), formatter), + format: format.combine( + format.errors({ stack: true }), + format.colorize({ all: true }), + format.timestamp(), + formatter + ), transports: [new winston.transports.Console()], }); diff --git a/packages/server/src/lib/SocketServer.ts b/packages/server/src/lib/SocketServer.ts index 754a60f629bf924d878142091e8f9afd408e546f..cc3f2fbb85258dbe78f5cfa89b73f0d225ca0db3 100644 --- a/packages/server/src/lib/SocketServer.ts +++ b/packages/server/src/lib/SocketServer.ts @@ -1,6 +1,5 @@ import http from "node:http"; -import { CanvasLib } from "@sc07-canvas/lib"; import { ClientConfig, ClientToServerEvents, @@ -17,7 +16,7 @@ import { session } from "./Express"; import { getLogger } from "./Logger"; import { prisma } from "./prisma"; import { Recaptcha } from "./Recaptcha"; -import { Redis } from "./redis"; +import { LockExists, Redis } from "./redis"; const Logger = getLogger("SOCKET"); @@ -70,13 +69,6 @@ type Socket = RawSocket; export class SocketServer { static instance: SocketServer; io: Server; - /** - * Prevent users from time attacking pixel placements to place more pixels than stacked - * - * @key user sub (grant@grants.cafe) - * @value timestamp - */ - userPlaceLock = new Map(); constructor(server: http.Server) { SocketServer.instance = this; @@ -87,80 +79,6 @@ export class SocketServer { this.io.engine.use(session); this.io.on("connection", this.handleConnection.bind(this)); - - if (process.env.NODE_ENV !== "test") { - // clear pixel locks if they have existed for more than a minute - setInterval(() => { - const oneMinuteAgo = new Date(); - oneMinuteAgo.setMinutes(oneMinuteAgo.getMinutes() - 1); - - const expired = [...this.userPlaceLock.entries()].filter( - ([_user, time]) => time < oneMinuteAgo.getTime() - ); - - if (expired.length > 0) { - Logger.warn( - "A pixel lock has existed for too long for " + - expired.length + - " users : " + - expired.map((a) => a[0]).join(",") - ); - } - - for (const expire of expired) { - this.userPlaceLock.delete(expire[0]); - } - }, 1000 * 30); - - // pixel stacking - // - needs to be exponential (takes longer to aquire more pixels stacked) - // - convert to config options instead of hard-coded - setInterval(async () => { - const DEBUG = false; - - if (DEBUG) Logger.debug("Running pixel stacking..."); - const redis = await Redis.getClient(); - const sockets = await this.io.local.fetchSockets(); - - for (const socket of sockets) { - const sub = await redis.get(Redis.key("socketToSub", socket.id)); - if (!sub) { - if (DEBUG) Logger.warn(`Socket ${socket.id} has no user`); - continue; - } - - const user = await User.fromSub(sub); - if (!user) { - if (DEBUG) - Logger.warn( - `Socket ${socket.id}'s user (${sub}) does not exist in the database` - ); - continue; - } - - // time in seconds since last stack gain (including a potential bonus depending on previously remaining time) - const timeSinceLastPlace = - (Date.now() - user.lastTimeGainStarted.getTime()) / 1000; - const cooldown = CanvasLib.getPixelCooldown( - user.pixelStack + 1, - getClientConfig() - ); - - await user.update(true); - - // this impl has the side affect of giving previously offline users all the stack upon reconnecting - if ( - timeSinceLastPlace >= cooldown && - user.pixelStack < getClientConfig().canvas.pixel.maxStack - ) { - await user.modifyStack(1); - - if (DEBUG) - Logger.debug(sub + " has gained another pixel in their stack"); - } - } - }, 1000); - } } /** @@ -275,84 +193,86 @@ export class SocketServer { // force a user data update await user.update(true); - if (this.userPlaceLock.has(user.sub)) { - ack({ success: false, error: "pixel_already_pending" }); - return; - } + try { + await Redis.useLock("user_pixel", [user.sub], async () => { + if (bypassCooldown && !user.isModerator) { + // only moderators can do this + ack({ success: false, error: "invalid_pixel" }); + return; + } - this.userPlaceLock.set(user.sub, Date.now()); + if (!bypassCooldown && user.pixelStack < 1) { + ack({ success: false, error: "pixel_cooldown" }); + return; + } - if (bypassCooldown && !user.isModerator) { - // only moderators can do this - ack({ success: false, error: "invalid_pixel" }); - this.userPlaceLock.delete(user.sub); - return; - } + if ((user.getBan()?.expires || 0) > new Date()) { + ack({ success: false, error: "banned" }); + return; + } - if (!bypassCooldown && user.pixelStack < 1) { - ack({ success: false, error: "pixel_cooldown" }); - this.userPlaceLock.delete(user.sub); - return; - } + const paletteColor = await prisma.paletteColor.findFirst({ + where: { + id: pixel.color, + }, + }); + if (!paletteColor) { + ack({ + success: false, + error: "palette_color_invalid", + }); + return; + } - if ((user.getBan()?.expires || 0) > new Date()) { - ack({ success: false, error: "banned" }); - this.userPlaceLock.delete(user.sub); - return; - } + const pixelAtTheSameLocation = await Canvas.getPixel( + pixel.x, + pixel.y + ); - const paletteColor = await prisma.paletteColor.findFirst({ - where: { - id: pixel.color, - }, - }); - if (!paletteColor) { - ack({ - success: false, - error: "palette_color_invalid", - }); - this.userPlaceLock.delete(user.sub); - return; - } + if ( + pixelAtTheSameLocation && + pixelAtTheSameLocation.userId === user.sub && + pixelAtTheSameLocation.color === paletteColor.hex + ) { + ack({ success: false, error: "you_already_placed_that" }); + return; + } - const pixelAtTheSameLocation = await Canvas.getPixel(pixel.x, pixel.y); + Recaptcha.maybeChallenge(socket); - if ( - pixelAtTheSameLocation && - pixelAtTheSameLocation.userId === user.sub && - pixelAtTheSameLocation.color === paletteColor.hex - ) { - ack({ success: false, error: "you_already_placed_that" }); - this.userPlaceLock.delete(user.sub); + await user.modifyStack(-1); + await Canvas.setPixel( + user, + pixel.x, + pixel.y, + paletteColor.hex, + bypassCooldown + ); + // give undo capabilities + await user.setUndo( + new Date(Date.now() + Canvas.getCanvasConfig().undo.grace_period) + ); + + const newPixel: Pixel = { + x: pixel.x, + y: pixel.y, + color: pixel.color, + }; + ack({ + success: true, + data: newPixel, + }); + socket.broadcast.emit("pixel", newPixel); + }); + } catch (e) { + if (e instanceof LockExists) { + ack({ success: false, error: "pixel_already_pending" }); + } else { + ack({ success: false, error: "invalid_pixel" }); + throw e; + } return; } - - Recaptcha.maybeChallenge(socket); - - await user.modifyStack(-1); - await Canvas.setPixel( - user, - pixel.x, - pixel.y, - paletteColor.hex, - bypassCooldown - ); - // give undo capabilities - await user.setUndo( - new Date(Date.now() + Canvas.getCanvasConfig().undo.grace_period) - ); - - const newPixel: Pixel = { - x: pixel.x, - y: pixel.y, - color: pixel.color, - }; - ack({ - success: true, - data: newPixel, - }); - socket.broadcast.emit("pixel", newPixel); - this.userPlaceLock.delete(user.sub); }); socket.on("undo", async (ack) => { @@ -435,15 +355,6 @@ export class SocketServer { async setupMasterShard() { if (process.env.NODE_ENV === "test") return; - // online announcement event - setInterval(async () => { - // possible issue: this includes every connected socket, not user count - const sockets = await this.io.sockets.fetchSockets(); - for (const socket of sockets) { - socket.emit("online", { count: sockets.length }); - } - }, 5000); - const redis = await Redis.getClient("SUB"); redis.subscribe(Redis.key("channel_heatmap"), (message, _channel) => { this.io.to("sub:heatmap").emit("heatmap", message); diff --git a/packages/server/src/lib/redis.ts b/packages/server/src/lib/redis.ts index 6da750f15958d92234aeaa90b5704c7547d20f62..91b81cc246d9b2845bc811d48dc227650645dcb9 100644 --- a/packages/server/src/lib/redis.ts +++ b/packages/server/src/lib/redis.ts @@ -1,3 +1,4 @@ +import { User } from "@prisma/client"; import { RedisClientType } from "@redis/client"; import { createClient } from "redis"; @@ -23,6 +24,9 @@ interface IRedisKeys { // pub/sub channels channel_heatmap(): string; + + // locks + lock(id: string): string; } /** @@ -36,6 +40,7 @@ const RedisKeys: IRedisKeys = { canvas_cache_write_queue: (workerId) => `CANVAS:CACHE_QUEUE:${workerId}`, socketToSub: (socketId: string) => `CANVAS:SOCKET:${socketId}`, channel_heatmap: () => `CANVAS:HEATMAP`, + lock: (id: string) => `CANVAS:LOCK:${id}`, }; class _Redis { @@ -126,6 +131,63 @@ class _Redis { ): (...params: Parameters) => string { return (...params) => this.key(key, ...params); } + + async acquireLock( + type: Type, + ...args: Parameters + ) { + const KEY = this.key("lock", type + "_" + args.join("_")); + const client = await this.getClient("MAIN"); + const resp = await client.set(KEY, Date.now(), { + EX: 60 * 1000, + NX: true, + }); + + if (!resp) { + Logger.debug("Lock acquire failed for " + KEY); + throw new LockExists(); + } + + Logger.debug("Lock acquired for " + KEY); + } + + async releaseLock( + type: Type, + ...args: Parameters + ) { + const KEY = this.key("lock", type + "_" + args.join("_")); + const client = await this.getClient("MAIN"); + const time = await client.getDel(KEY); + + if (!time) { + Logger.debug("Lock failed release for " + KEY + " (not existing)"); + return; + } + + Logger.debug( + "Lock released for " + KEY + " (" + (Date.now() - parseInt(time)) + "ms)" + ); + } + + async useLock( + type: Type, + args: Parameters, + use: (() => any) | (() => Promise) + ) { + await this.acquireLock(type, ...args); + await use(); + await this.releaseLock(type, ...args); + } +} + +interface ILocks { + user_pixel(userSub: User["sub"]): void; +} + +export class LockExists extends Error { + constructor() { + super("Lock already exists"); + } } export const Redis = new _Redis(RedisKeys); diff --git a/packages/server/src/tools/start_job_worker.ts b/packages/server/src/tools/start_job_worker.ts index 1632f3441798ec88a12182d769812cc31b906509..60d63af7652d7248c6a3a41d687b32157545530f 100644 --- a/packages/server/src/tools/start_job_worker.ts +++ b/packages/server/src/tools/start_job_worker.ts @@ -1,6 +1,6 @@ -import { Jobs } from "../jobs/Jobs"; +import { BullMQ_JobManager } from "../jobs/bullmq"; import { loadSettings } from "../lib/Settings"; loadSettings(true).then(() => { - new Jobs(); + BullMQ_JobManager.startWorkers(); }); diff --git a/packages/server/src/utils/Palette.ts b/packages/server/src/utils/Palette.ts index 5876957b94a77cfbf2c4a732824c28468be5a73a..befb90c7f7042092e7710ac15bcd3f3659bdab04 100644 --- a/packages/server/src/utils/Palette.ts +++ b/packages/server/src/utils/Palette.ts @@ -1,7 +1,10 @@ import { PaletteColor } from "@prisma/client"; +import { getLogger } from "../lib/Logger"; import { prisma } from "../lib/prisma"; +const Logger = getLogger(); + export const Palette = new (class Palette { private palette: PaletteColor[] = []; @@ -16,10 +19,10 @@ export const Palette = new (class Palette { .findMany() .then((paletteColors) => { this.palette = paletteColors; - console.info(`Loaded ${paletteColors.length} pallete colors`); + Logger.info(`Loaded ${paletteColors.length} pallete colors`); }) .catch((e) => { - console.error("Failed to get pallete colors", e); + Logger.error("Failed to get pallete colors", { error: e }); }); } diff --git a/packages/server/src/workers/canvas_cache.ts b/packages/server/src/workers/canvas_cache.ts index 6ae2979235a05f57f5393cf240128b1dd3141791..94ad70cddadf18b7c0d71fc9ddaa167ca98e1b18 100644 --- a/packages/server/src/workers/canvas_cache.ts +++ b/packages/server/src/workers/canvas_cache.ts @@ -85,6 +85,8 @@ const startWriteQueue = async () => { Redis.key("canvas_cache_write_queue", workerId) ); if (!item) { + // rewriting the workers system to use bullmq would be the solution to this + // eslint-disable-next-line no-restricted-globals setTimeout(() => { startWriteQueue(); }, 250); diff --git a/packages/server/src/workers/worker.ts b/packages/server/src/workers/worker.ts index d09c72d90683e92f37117734e4ada174415a7794..f388964dbe5ff785d169d28da306051e514a1c54 100644 --- a/packages/server/src/workers/worker.ts +++ b/packages/server/src/workers/worker.ts @@ -161,6 +161,8 @@ export const callCacheWorker = (type: string, data: any) => { res(); clearTimeout(watchdog); }; + // rewriting the workers system to use bullmq would be the solution to this + // eslint-disable-next-line no-restricted-globals const watchdog = setTimeout(() => { Logger.error( `Callback for ${type} ${callbackId} has taken too long, is it dead?` diff --git a/packages/server/tool.sh b/packages/server/tool.sh index 36e14bd6a1ac158c175bda00b3920ad072f36624..8be1ea2d8d5f88ccd94d31b67b50af25c0970cf2 100755 --- a/packages/server/tool.sh +++ b/packages/server/tool.sh @@ -42,5 +42,5 @@ shift; # remove first argument to tool.sh, so we can pass the rest to the tool if $USE_PROD; then node $PROD_TOOLS_ROOT/$TOOL_NAME.js "$@" else - npx ts-node --transpile-only $DEV_TOOLS_ROOT/$TOOL_NAME.ts "$@" + npx tsx $DEV_TOOLS_ROOT/$TOOL_NAME.ts "$@" fi \ No newline at end of file