Loading packages/server/src/lib/Canvas.ts +11 −21 Original line number Diff line number Diff line Loading @@ -4,7 +4,7 @@ import { Redis } from "./redis"; import { SocketServer } from "./SocketServer"; import { getLogger } from "./Logger"; import { Pixel } from "@prisma/client"; import { CanvasWorker } from "../workers/worker"; import { CanvasWorker, callWorkerMethod } from "../workers/worker"; import { LogMan } from "./LogMan"; const Logger = getLogger("CANVAS"); Loading Loading @@ -74,6 +74,8 @@ class Canvas { * @param height */ async setSize(width: number, height: number, useStatic = false) { CanvasWorker.postMessage({ type: "canvasSize", width, height }); if (useStatic) { this.canvasSize = [width, height]; return; Loading Loading @@ -237,33 +239,21 @@ class Canvas { * force an update at a specific position */ async updateCanvasRedisAtPos(x: number, y: number) { const redis = await Redis.getClient(); const pixels: string[] = ( (await redis.get(Redis.key("canvas"))) || "" ).split(","); const dbpixel = await this.getPixel(x, y); pixels[this.canvasSize[0] * y + x] = dbpixel?.color || "transparent"; await redis.set(Redis.key("canvas"), pixels.join(","), { EX: 60 * 5 }); await callWorkerMethod(CanvasWorker, "updateCanvasRedisAtPos", { x, y, hex: dbpixel?.color || "transparent", }); } async updateCanvasRedisWithBatch( pixelBatch: { x: number; y: number; hex: string }[] ) { const redis = await Redis.getClient(); const pixels: string[] = ( (await redis.get(Redis.key("canvas"))) || "" ).split(","); for (const pixel of pixelBatch) { pixels[this.canvasSize[0] * pixel.y + pixel.x] = pixel.hex; } await redis.set(Redis.key("canvas"), pixels.join(","), { EX: 60 * 5 }); await callWorkerMethod(CanvasWorker, "updateCanvasRedisWithBatch", { batch: pixelBatch, }); } async isPixelArrayCached() { Loading packages/server/src/workers/canvas.ts +85 −6 Original line number Diff line number Diff line Loading @@ -2,11 +2,24 @@ import { parentPort } from "node:worker_threads"; import { Redis } from "../lib/redis"; import { prisma } from "../lib/prisma"; import { getLogger } from "../lib/Logger"; import { Pixel } from "@prisma/client"; type Message = { type Message = | { type: "canvasSize"; width: number; height: number } | { type: "canvasToRedis"; width: number; height: number; } | { type: "updateCanvasRedisAtPos"; callbackId: number; x: number; y: number; hex: string | "transparent"; } | { type: "updateCanvasRedisWithBatch"; callbackId: number; batch: { x: number; y: number; hex: string }[]; }; const Logger = getLogger("CANVAS_WORK"); Loading @@ -19,16 +32,50 @@ redis.connect().then(() => { Logger.info("Connected to Redis"); }); const queuedCanvasRedis: { id: number; pixels: { x: number; y: number; hex: string | "transparent" }[]; }[] = []; let canvasSize = { width: -1, height: -1 }; parentPort?.on("message", (msg: Message) => { switch (msg.type) { case "canvasSize": canvasSize = { width: msg.width, height: msg.height }; Logger.info("Received canvasSize " + JSON.stringify(canvasSize)); break; case "canvasToRedis": canvasToRedis(msg.width, msg.height).then((str) => { if (canvasSize.width === -1 || canvasSize.height === -1) { Logger.error("Received canvasToRedis but i do not have the dimentions"); return; } canvasToRedis(canvasSize.width, canvasSize.height).then((str) => { parentPort?.postMessage({ type: "canvasToRedis", data: str }); }); break; case "updateCanvasRedisAtPos": queuedCanvasRedis.push({ id: msg.callbackId, pixels: [{ x: msg.x, y: msg.y, hex: msg.hex }], }); startCanvasRedisIfNeeded(); break; case "updateCanvasRedisWithBatch": queuedCanvasRedis.push({ id: msg.callbackId, pixels: msg.batch, }); startCanvasRedisIfNeeded(); break; } }); const execCallback = (id: number) => { parentPort?.postMessage({ type: "callback", callbackId: id }); }; /** * Convert database pixels to Redis cache * Loading Loading @@ -76,3 +123,35 @@ const canvasToRedis = async (width: number, height: number) => { ); return pixels; }; let isCanvasRedisWorking = false; const startCanvasRedisIfNeeded = () => { if (isCanvasRedisWorking) return; tickCanvasRedis(); }; const tickCanvasRedis = async () => { isCanvasRedisWorking = true; const item = queuedCanvasRedis.shift(); if (!item) { isCanvasRedisWorking = false; return; } const pixels: string[] = ((await redis.get(Redis.key("canvas"))) || "").split( "," ); for (const pixel of item.pixels) { pixels[canvasSize.width * pixel.y + pixel.x] = pixel.hex; } await redis.set(Redis.key("canvas"), pixels.join(","), { EX: 60 * 5 }); execCallback(item.id); await tickCanvasRedis(); }; packages/server/src/workers/worker.ts +28 −0 Original line number Diff line number Diff line Loading @@ -40,6 +40,34 @@ const AllWorkers = { export const CanvasWorker = AllWorkers.canvas; export const callWorkerMethod = (worker: Worker, type: string, data: any) => { return new Promise<void>((res) => { const callbackId = Math.floor(Math.random() * 99999); Logger.info(`Calling worker method ${type} ${callbackId}`); const handleMessage = (message: { type: "callback"; callbackId: number; }) => { if (message.type !== "callback") return; if (message.callbackId !== callbackId) return; Logger.info(`Finished worker call ${type} ${callbackId}`); res(); worker.off("message", handleMessage); }; worker.on("message", handleMessage); worker.postMessage({ ...data, type, callbackId, }); }); }; for (const [name, worker] of Object.entries(AllWorkers)) { worker.on("online", () => { Logger.info(`${name} worker is now online`); Loading Loading
packages/server/src/lib/Canvas.ts +11 −21 Original line number Diff line number Diff line Loading @@ -4,7 +4,7 @@ import { Redis } from "./redis"; import { SocketServer } from "./SocketServer"; import { getLogger } from "./Logger"; import { Pixel } from "@prisma/client"; import { CanvasWorker } from "../workers/worker"; import { CanvasWorker, callWorkerMethod } from "../workers/worker"; import { LogMan } from "./LogMan"; const Logger = getLogger("CANVAS"); Loading Loading @@ -74,6 +74,8 @@ class Canvas { * @param height */ async setSize(width: number, height: number, useStatic = false) { CanvasWorker.postMessage({ type: "canvasSize", width, height }); if (useStatic) { this.canvasSize = [width, height]; return; Loading Loading @@ -237,33 +239,21 @@ class Canvas { * force an update at a specific position */ async updateCanvasRedisAtPos(x: number, y: number) { const redis = await Redis.getClient(); const pixels: string[] = ( (await redis.get(Redis.key("canvas"))) || "" ).split(","); const dbpixel = await this.getPixel(x, y); pixels[this.canvasSize[0] * y + x] = dbpixel?.color || "transparent"; await redis.set(Redis.key("canvas"), pixels.join(","), { EX: 60 * 5 }); await callWorkerMethod(CanvasWorker, "updateCanvasRedisAtPos", { x, y, hex: dbpixel?.color || "transparent", }); } async updateCanvasRedisWithBatch( pixelBatch: { x: number; y: number; hex: string }[] ) { const redis = await Redis.getClient(); const pixels: string[] = ( (await redis.get(Redis.key("canvas"))) || "" ).split(","); for (const pixel of pixelBatch) { pixels[this.canvasSize[0] * pixel.y + pixel.x] = pixel.hex; } await redis.set(Redis.key("canvas"), pixels.join(","), { EX: 60 * 5 }); await callWorkerMethod(CanvasWorker, "updateCanvasRedisWithBatch", { batch: pixelBatch, }); } async isPixelArrayCached() { Loading
packages/server/src/workers/canvas.ts +85 −6 Original line number Diff line number Diff line Loading @@ -2,11 +2,24 @@ import { parentPort } from "node:worker_threads"; import { Redis } from "../lib/redis"; import { prisma } from "../lib/prisma"; import { getLogger } from "../lib/Logger"; import { Pixel } from "@prisma/client"; type Message = { type Message = | { type: "canvasSize"; width: number; height: number } | { type: "canvasToRedis"; width: number; height: number; } | { type: "updateCanvasRedisAtPos"; callbackId: number; x: number; y: number; hex: string | "transparent"; } | { type: "updateCanvasRedisWithBatch"; callbackId: number; batch: { x: number; y: number; hex: string }[]; }; const Logger = getLogger("CANVAS_WORK"); Loading @@ -19,16 +32,50 @@ redis.connect().then(() => { Logger.info("Connected to Redis"); }); const queuedCanvasRedis: { id: number; pixels: { x: number; y: number; hex: string | "transparent" }[]; }[] = []; let canvasSize = { width: -1, height: -1 }; parentPort?.on("message", (msg: Message) => { switch (msg.type) { case "canvasSize": canvasSize = { width: msg.width, height: msg.height }; Logger.info("Received canvasSize " + JSON.stringify(canvasSize)); break; case "canvasToRedis": canvasToRedis(msg.width, msg.height).then((str) => { if (canvasSize.width === -1 || canvasSize.height === -1) { Logger.error("Received canvasToRedis but i do not have the dimentions"); return; } canvasToRedis(canvasSize.width, canvasSize.height).then((str) => { parentPort?.postMessage({ type: "canvasToRedis", data: str }); }); break; case "updateCanvasRedisAtPos": queuedCanvasRedis.push({ id: msg.callbackId, pixels: [{ x: msg.x, y: msg.y, hex: msg.hex }], }); startCanvasRedisIfNeeded(); break; case "updateCanvasRedisWithBatch": queuedCanvasRedis.push({ id: msg.callbackId, pixels: msg.batch, }); startCanvasRedisIfNeeded(); break; } }); const execCallback = (id: number) => { parentPort?.postMessage({ type: "callback", callbackId: id }); }; /** * Convert database pixels to Redis cache * Loading Loading @@ -76,3 +123,35 @@ const canvasToRedis = async (width: number, height: number) => { ); return pixels; }; let isCanvasRedisWorking = false; const startCanvasRedisIfNeeded = () => { if (isCanvasRedisWorking) return; tickCanvasRedis(); }; const tickCanvasRedis = async () => { isCanvasRedisWorking = true; const item = queuedCanvasRedis.shift(); if (!item) { isCanvasRedisWorking = false; return; } const pixels: string[] = ((await redis.get(Redis.key("canvas"))) || "").split( "," ); for (const pixel of item.pixels) { pixels[canvasSize.width * pixel.y + pixel.x] = pixel.hex; } await redis.set(Redis.key("canvas"), pixels.join(","), { EX: 60 * 5 }); execCallback(item.id); await tickCanvasRedis(); };
packages/server/src/workers/worker.ts +28 −0 Original line number Diff line number Diff line Loading @@ -40,6 +40,34 @@ const AllWorkers = { export const CanvasWorker = AllWorkers.canvas; export const callWorkerMethod = (worker: Worker, type: string, data: any) => { return new Promise<void>((res) => { const callbackId = Math.floor(Math.random() * 99999); Logger.info(`Calling worker method ${type} ${callbackId}`); const handleMessage = (message: { type: "callback"; callbackId: number; }) => { if (message.type !== "callback") return; if (message.callbackId !== callbackId) return; Logger.info(`Finished worker call ${type} ${callbackId}`); res(); worker.off("message", handleMessage); }; worker.on("message", handleMessage); worker.postMessage({ ...data, type, callbackId, }); }); }; for (const [name, worker] of Object.entries(AllWorkers)) { worker.on("online", () => { Logger.info(`${name} worker is now online`); Loading