Commit 12606f3e authored by Grant's avatar Grant
Browse files
parents ec338e68 872343d1
Loading
Loading
Loading
Loading
+11 −21
Original line number Diff line number Diff line
@@ -4,7 +4,7 @@ import { Redis } from "./redis";
import { SocketServer } from "./SocketServer";
import { getLogger } from "./Logger";
import { Pixel } from "@prisma/client";
import { CanvasWorker } from "../workers/worker";
import { CanvasWorker, callWorkerMethod } from "../workers/worker";
import { LogMan } from "./LogMan";

const Logger = getLogger("CANVAS");
@@ -74,6 +74,8 @@ class Canvas {
   * @param height
   */
  async setSize(width: number, height: number, useStatic = false) {
    CanvasWorker.postMessage({ type: "canvasSize", width, height });

    if (useStatic) {
      this.canvasSize = [width, height];
      return;
@@ -237,33 +239,21 @@ class Canvas {
   * force an update at a specific position
   */
  async updateCanvasRedisAtPos(x: number, y: number) {
    const redis = await Redis.getClient();

    const pixels: string[] = (
      (await redis.get(Redis.key("canvas"))) || ""
    ).split(",");

    const dbpixel = await this.getPixel(x, y);

    pixels[this.canvasSize[0] * y + x] = dbpixel?.color || "transparent";

    await redis.set(Redis.key("canvas"), pixels.join(","), { EX: 60 * 5 });
    await callWorkerMethod(CanvasWorker, "updateCanvasRedisAtPos", {
      x,
      y,
      hex: dbpixel?.color || "transparent",
    });
  }

  async updateCanvasRedisWithBatch(
    pixelBatch: { x: number; y: number; hex: string }[]
  ) {
    const redis = await Redis.getClient();

    const pixels: string[] = (
      (await redis.get(Redis.key("canvas"))) || ""
    ).split(",");

    for (const pixel of pixelBatch) {
      pixels[this.canvasSize[0] * pixel.y + pixel.x] = pixel.hex;
    }

    await redis.set(Redis.key("canvas"), pixels.join(","), { EX: 60 * 5 });
    await callWorkerMethod(CanvasWorker, "updateCanvasRedisWithBatch", {
      batch: pixelBatch,
    });
  }

  async isPixelArrayCached() {
+85 −6
Original line number Diff line number Diff line
@@ -2,11 +2,24 @@ import { parentPort } from "node:worker_threads";
import { Redis } from "../lib/redis";
import { prisma } from "../lib/prisma";
import { getLogger } from "../lib/Logger";
import { Pixel } from "@prisma/client";

type Message = {
type Message =
  | { type: "canvasSize"; width: number; height: number }
  | {
      type: "canvasToRedis";
  width: number;
  height: number;
    }
  | {
      type: "updateCanvasRedisAtPos";
      callbackId: number;
      x: number;
      y: number;
      hex: string | "transparent";
    }
  | {
      type: "updateCanvasRedisWithBatch";
      callbackId: number;
      batch: { x: number; y: number; hex: string }[];
    };

const Logger = getLogger("CANVAS_WORK");
@@ -19,16 +32,50 @@ redis.connect().then(() => {
  Logger.info("Connected to Redis");
});

const queuedCanvasRedis: {
  id: number;
  pixels: { x: number; y: number; hex: string | "transparent" }[];
}[] = [];

let canvasSize = { width: -1, height: -1 };

parentPort?.on("message", (msg: Message) => {
  switch (msg.type) {
    case "canvasSize":
      canvasSize = { width: msg.width, height: msg.height };
      Logger.info("Received canvasSize " + JSON.stringify(canvasSize));
      break;
    case "canvasToRedis":
      canvasToRedis(msg.width, msg.height).then((str) => {
      if (canvasSize.width === -1 || canvasSize.height === -1) {
        Logger.error("Received canvasToRedis but i do not have the dimentions");
        return;
      }

      canvasToRedis(canvasSize.width, canvasSize.height).then((str) => {
        parentPort?.postMessage({ type: "canvasToRedis", data: str });
      });
      break;
    case "updateCanvasRedisAtPos":
      queuedCanvasRedis.push({
        id: msg.callbackId,
        pixels: [{ x: msg.x, y: msg.y, hex: msg.hex }],
      });
      startCanvasRedisIfNeeded();
      break;
    case "updateCanvasRedisWithBatch":
      queuedCanvasRedis.push({
        id: msg.callbackId,
        pixels: msg.batch,
      });
      startCanvasRedisIfNeeded();
      break;
  }
});

const execCallback = (id: number) => {
  parentPort?.postMessage({ type: "callback", callbackId: id });
};

/**
 * Convert database pixels to Redis cache
 *
@@ -76,3 +123,35 @@ const canvasToRedis = async (width: number, height: number) => {
  );
  return pixels;
};

let isCanvasRedisWorking = false;

const startCanvasRedisIfNeeded = () => {
  if (isCanvasRedisWorking) return;

  tickCanvasRedis();
};

const tickCanvasRedis = async () => {
  isCanvasRedisWorking = true;

  const item = queuedCanvasRedis.shift();
  if (!item) {
    isCanvasRedisWorking = false;
    return;
  }

  const pixels: string[] = ((await redis.get(Redis.key("canvas"))) || "").split(
    ","
  );

  for (const pixel of item.pixels) {
    pixels[canvasSize.width * pixel.y + pixel.x] = pixel.hex;
  }

  await redis.set(Redis.key("canvas"), pixels.join(","), { EX: 60 * 5 });

  execCallback(item.id);

  await tickCanvasRedis();
};
+28 −0
Original line number Diff line number Diff line
@@ -40,6 +40,34 @@ const AllWorkers = {

export const CanvasWorker = AllWorkers.canvas;

export const callWorkerMethod = (worker: Worker, type: string, data: any) => {
  return new Promise<void>((res) => {
    const callbackId = Math.floor(Math.random() * 99999);
    Logger.info(`Calling worker method ${type} ${callbackId}`);

    const handleMessage = (message: {
      type: "callback";
      callbackId: number;
    }) => {
      if (message.type !== "callback") return;
      if (message.callbackId !== callbackId) return;

      Logger.info(`Finished worker call ${type} ${callbackId}`);
      res();

      worker.off("message", handleMessage);
    };

    worker.on("message", handleMessage);

    worker.postMessage({
      ...data,
      type,
      callbackId,
    });
  });
};

for (const [name, worker] of Object.entries(AllWorkers)) {
  worker.on("online", () => {
    Logger.info(`${name} worker is now online`);