Skip to content
import e from "express";
import client, { register } from "prom-client";
import { CACHE_WORKERS, getCacheWorkerQueueLength } from "../workers/worker";
import Canvas from "./Canvas";
import { prisma } from "./prisma";
import { Redis } from "./redis";
import { SocketServer } from "./SocketServer";
client.collectDefaultMetrics({
labels: process.env.NODE_APP_INSTANCE
? {
NODE_APP_INSTANCE: process.env.NODE_APP_INSTANCE,
}
: {},
});
export const PixelCount = new client.Gauge({
name: "pixel_count",
help: "total pixel count",
async collect() {
this.set(await prisma.pixel.count());
},
});
export const UserCount = new client.Gauge({
name: "user_count",
help: "total user count",
async collect() {
this.set(await prisma.user.count());
},
});
export const InstanceCount = new client.Gauge({
name: "instance_count",
help: "total number of unique instances",
async collect() {
this.set(await prisma.instance.count());
},
});
export const OnlineUsers = new client.Gauge({
name: "connected_count",
help: "total connected sockets",
async collect() {
this.set((await SocketServer.instance.io.fetchSockets()).length);
},
});
/**
* Rough estimate of filled pixels
*/
export const FilledPixels = new client.Gauge({
name: "filled_pixels",
help: "total number of filled pixels",
async collect() {
const [width, height] = Canvas.getCanvasConfig().size;
const filledPixels = await prisma.pixel.findMany({
where: {
x: {
gte: 0,
lt: width,
},
y: {
gte: 0,
lt: height,
},
isTop: true,
},
});
this.set(filledPixels.length);
},
});
export const TotalPixels = new client.Gauge({
name: "total_pixels",
help: "total number of pixels the canvas allows",
async collect() {
const [width, height] = Canvas.getCanvasConfig().size;
this.set(width * height);
},
});
new client.Gauge({
name: "cache_worker_callback_queue_main",
help: "cache worker callback queue length for main process",
collect() {
this.set(getCacheWorkerQueueLength());
},
});
new client.Gauge({
name: "cache_worker_queue_workers",
help: "cache worker write queue length per worker process",
labelNames: ["worker_id"],
async collect() {
const redis = await Redis.getClient();
for (let i = 0; i < CACHE_WORKERS; i++) {
this.set(
{
worker_id: i,
},
await redis.lLen(Redis.key("canvas_cache_write_queue", i))
);
}
},
});
export const handleMetricsEndpoint = async (
req: e.Request,
res: e.Response
) => {
if (!process.env.PROMETHEUS_TOKEN) {
res.status(500);
res.send("PROMETHEUS_TOKEN is not set.");
return;
}
if (req.headers.authorization !== "Bearer " + process.env.PROMETHEUS_TOKEN) {
res.status(401);
res.send("Invalid bearer token");
return;
}
res.setHeader("Content-Type", register.contentType);
res.send(await register.metrics());
res.end();
};
import rateLimit from "express-rate-limit";
import RedisStore from "rate-limit-redis";
import { Redis } from "./redis";
const REDIS_PREFIX = process.env.REDIS_RATELIMIT_PREFIX || "canvas_ratelimit:";
// TODO: Move these static settings to dynamic (related #101)
export const RateLimiter = {
ADMIN: rateLimit({
windowMs: 15 * 60 * 1000,
max: 15,
standardHeaders: true,
legacyHeaders: false,
skipSuccessfulRequests: true,
store: new RedisStore({
prefix: REDIS_PREFIX + "admin:",
sendCommand: async (...args: string[]) => {
const client = await Redis.getClient();
return await client.sendCommand(args);
},
}),
}),
HIGH: rateLimit({
windowMs: 15 * 60 * 1000,
max: 150,
standardHeaders: true,
legacyHeaders: false,
store: new RedisStore({
prefix: REDIS_PREFIX + "high:",
sendCommand: async (...args: string[]) => {
const client = await Redis.getClient();
return await client.sendCommand(args);
},
}),
}),
};
import {
ClientToServerEvents,
ServerToClientEvents,
} from "@sc07-canvas/lib/src/net";
import { Socket } from "socket.io";
import { User } from "../models/User";
import { getLogger } from "./Logger";
const Logger = getLogger("RECAPTCHA");
class Recaptcha_ {
disabled = false;
chance: number | null = null;
constructor() {
this.disabled =
!process.env.RECAPTCHA_SITE_KEY ||
!process.env.RECAPTCHA_SECRET_KEY ||
!process.env.RECAPTCHA_PIXEL_CHANCE;
if (!process.env.RECAPTCHA_PIXEL_CHANCE) {
Logger.warn("No RECAPTCHA_PIXEL_CHANCE set, captchas will not be sent!");
} else {
this.chance = parseFloat(process.env.RECAPTCHA_PIXEL_CHANCE);
if (this.chance > 1 || this.chance < 0) {
this.chance = null;
this.disabled = true;
Logger.warn("RECAPTCHA_PIXEL_CHANCE is not within (0<x<1)");
}
}
}
maybeChallenge(
socket: Socket<ClientToServerEvents, ServerToClientEvents>
): boolean {
if (this.disabled || !this.chance) return false;
if (Math.random() > this.chance) {
socket.emitWithAck("recaptcha_challenge").then((token) => {
this.verifyToken(token).then(async (data) => {
if (!data.success) {
this.notifyStaffOfError(data).then(() => {});
} else {
// if (data.score < 0.5 || true) {
// try {
// const user = (await User.fromAuthSession(
// socket.request.session.user!
// ))!;
// this.notifyStaff(user, data.score).then(() => {});
// } catch (e) {}
// }
}
});
});
return true;
}
return false;
}
async verifyToken(
token: string
): Promise<
| { success: true; challenge_ts: string; hostname: string; score: number }
| { success: false; "error-codes": string[] }
> {
return await fetch(
`https://www.google.com/recaptcha/api/siteverify?secret=${process.env.RECAPTCHA_SECRET_KEY!}&response=${token}`,
{
method: "POST",
}
).then((a) => a.json());
}
async notifyStaff(user: User, score: number) {
if (!process.env.DISCORD_WEBHOOK) return;
return await fetch(process.env.DISCORD_WEBHOOK, {
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify({
content: `User ${user.sub} got a low score ${score}`,
}),
});
}
async notifyStaffOfError(obj: any) {
if (!process.env.DISCORD_WEBHOOK) return;
return await fetch(process.env.DISCORD_WEBHOOK, {
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify({
content:
"Error while verifying captcha\n```\n" +
JSON.stringify(obj, null, 2) +
"\n```",
}),
});
}
}
export const Recaptcha = new Recaptcha_();
import Canvas from "./Canvas";
import { getLogger } from "./Logger";
import { prisma } from "./prisma";
const Logger = getLogger("SETTINGS");
export const loadSettings = async (frozen = false) => {
Logger.info("Loading settings...");
const sideEffects: Promise<unknown>[] = [];
// canvas size
const canvasSize = await prisma.setting.findFirst({
where: { key: "canvas.size" },
});
if (canvasSize) {
const data = JSON.parse(canvasSize.value);
Logger.info("Canvas size loaded as " + JSON.stringify(data));
sideEffects.push(
Canvas.setSize(data.width, data.height, frozen).then(() => {
Logger.info("Canvas size successfully updated");
})
);
} else {
Logger.warn("Setting canvas.size is not set, did you run init_settings?");
}
// canvas frozen
const canvasFrozen = await prisma.setting.findFirst({
where: { key: "canvas.frozen" },
});
if (canvasFrozen) {
const data = JSON.parse(canvasFrozen.value);
Logger.info(`Canvas frozen loaded as ${data}`);
Canvas.setFrozen(data);
}
Logger.info(
"Settings loaded into memory, waiting for side effects to finish..."
);
await Promise.allSettled(sideEffects);
};
import http from "node:http";
import { PaletteColor } from "@prisma/client";
import { CanvasLib } from "@sc07-canvas/lib";
import {
ClientConfig,
ClientToServerEvents,
Pixel,
ServerToClientEvents,
} from "@sc07-canvas/lib/src/net";
import { CanvasLib } from "@sc07-canvas/lib";
import { Server, Socket as RawSocket } from "socket.io";
import { session } from "./Express";
import { SHORT_HASH } from "../const";
import { User } from "../models/User";
import Canvas from "./Canvas";
import { PaletteColor } from "@prisma/client";
import { session } from "./Express";
import { getLogger } from "./Logger";
import { prisma } from "./prisma";
import { Logger } from "./Logger";
import { Recaptcha } from "./Recaptcha";
import { Redis } from "./redis";
import { User } from "../models/User";
const Logger = getLogger("SOCKET");
/**
* get socket.io server config, generated from environment vars
......@@ -53,22 +59,39 @@ prisma.paletteColor
Logger.error("Failed to get pallete colors", e);
});
const getClientConfig = (): ClientConfig => {
export const getClientConfig = (): ClientConfig => {
return {
version: SHORT_HASH,
pallete: {
colors: PALLETE,
pixel_cooldown: PIXEL_TIMEOUT_MS,
},
canvas: Canvas.getCanvasConfig(),
chat: {
enabled: true,
matrix_homeserver: process.env.MATRIX_HOMESERVER,
element_host: process.env.ELEMENT_HOST,
general_alias: process.env.MATRIX_GENERAL_ALIAS,
},
};
};
type Socket = RawSocket<ClientToServerEvents, ServerToClientEvents>;
export class SocketServer {
static instance: SocketServer;
io: Server<ClientToServerEvents, ServerToClientEvents>;
/**
* Prevent users from time attacking pixel placements to place more pixels than stacked
*
* @key user sub (grant@grants.cafe)
* @value timestamp
*/
userPlaceLock = new Map<string, number>();
constructor(server: http.Server) {
SocketServer.instance = this;
this.io = new Server(server, getSocketConfig());
this.setupMasterShard();
......@@ -76,6 +99,29 @@ export class SocketServer {
this.io.engine.use(session);
this.io.on("connection", this.handleConnection.bind(this));
// clear pixel locks if they have existed for more than a minute
setInterval(() => {
const oneMinuteAgo = new Date();
oneMinuteAgo.setMinutes(oneMinuteAgo.getMinutes() - 1);
const expired = [...this.userPlaceLock.entries()].filter(
([_user, time]) => time < oneMinuteAgo.getTime()
);
if (expired.length > 0) {
Logger.warn(
"A pixel lock has existed for too long for " +
expired.length +
" users : " +
expired.map((a) => a[0]).join(",")
);
}
for (const expire of expired) {
this.userPlaceLock.delete(expire[0]);
}
}, 1000 * 30);
// pixel stacking
// - needs to be exponential (takes longer to aquire more pixels stacked)
// - convert to config options instead of hard-coded
......@@ -102,17 +148,16 @@ export class SocketServer {
continue;
}
// time in seconds since last pixel placement
// TODO: this causes a mismatch between placement times
// - going from 0 stack to 6 stack has a steady increase between each
// - going from 3 stack to 6 stack takes longer
// time in seconds since last stack gain (including a potential bonus depending on previously remaining time)
const timeSinceLastPlace =
(Date.now() - user.lastPixelTime.getTime()) / 1000;
(Date.now() - user.lastTimeGainStarted.getTime()) / 1000;
const cooldown = CanvasLib.getPixelCooldown(
user.pixelStack + 1,
getClientConfig()
);
await user.update(true);
// this impl has the side affect of giving previously offline users all the stack upon reconnecting
if (
timeSinceLastPlace >= cooldown &&
......@@ -127,6 +172,15 @@ export class SocketServer {
}, 1000);
}
/**
* Broadcast config to all connected clients
*
* Used by canvas size updates
*/
broadcastConfig() {
this.io.emit("config", getClientConfig());
}
async handleConnection(socket: Socket) {
const user =
socket.request.session.user &&
......@@ -136,7 +190,19 @@ export class SocketServer {
);
user?.sockets.add(socket);
let ip = socket.handshake.address;
if (process.env.NODE_ENV === "production") {
if (typeof socket.handshake.headers["x-forwarded-for"] === "string") {
ip = socket.handshake.headers["x-forwarded-for"];
} else {
ip = socket.handshake.headers["x-forwarded-for"]?.[0] || ip;
}
}
user?.trackIP(ip);
Logger.debug("handleConnection " + user?.sockets.size);
socket.emit("clearCanvasChunks");
Redis.getClient().then((redis) => {
if (user) redis.set(Redis.key("socketToSub", socket.id), user.sub);
......@@ -149,13 +215,36 @@ export class SocketServer {
if (user) {
socket.emit("availablePixels", user.pixelStack);
socket.emit("pixelLastPlaced", user.lastPixelTime.getTime());
socket.emit("pixelLastPlaced", user.lastTimeGainStarted.getTime());
const ban = user.getBan();
socket.emit(
"standing",
ban
? {
banned: true,
until: ban.expires.toISOString(),
reason: ban.publicNote || undefined,
}
: { banned: false }
);
}
if (process.env.RECAPTCHA_SITE_KEY)
socket.emit("recaptcha", process.env.RECAPTCHA_SITE_KEY);
socket.emit("config", getClientConfig());
Canvas.getPixelsArray().then((pixels) => {
socket.emit("canvas", pixels);
});
{
Canvas.sendCanvasChunksToSocket(socket).then(() => {
socket.emit("alert", {
is: "toast",
action: "system",
severity: "success",
title: "Canvas loaded",
autoDismiss: true,
});
});
}
socket.on("disconnect", () => {
Logger.debug(`Socket ${socket.id} disconnected`);
......@@ -167,7 +256,12 @@ export class SocketServer {
});
});
socket.on("place", async (pixel, ack) => {
socket.on("place", async (pixel, bypassCooldown, ack) => {
if (getClientConfig().canvas.frozen) {
ack({ success: false, error: "canvas_frozen" });
return;
}
if (!user) {
ack({ success: false, error: "no_user" });
return;
......@@ -186,8 +280,29 @@ export class SocketServer {
// force a user data update
await user.update(true);
if (user.pixelStack < 1) {
if (this.userPlaceLock.has(user.sub)) {
ack({ success: false, error: "pixel_already_pending" });
return;
}
this.userPlaceLock.set(user.sub, Date.now());
if (bypassCooldown && !user.isModerator) {
// only moderators can do this
ack({ success: false, error: "invalid_pixel" });
this.userPlaceLock.delete(user.sub);
return;
}
if (!bypassCooldown && user.pixelStack < 1) {
ack({ success: false, error: "pixel_cooldown" });
this.userPlaceLock.delete(user.sub);
return;
}
if ((user.getBan()?.expires || 0) > new Date()) {
ack({ success: false, error: "banned" });
this.userPlaceLock.delete(user.sub);
return;
}
......@@ -201,11 +316,36 @@ export class SocketServer {
success: false,
error: "palette_color_invalid",
});
this.userPlaceLock.delete(user.sub);
return;
}
const pixelAtTheSameLocation = await Canvas.getPixel(pixel.x, pixel.y);
if (
pixelAtTheSameLocation &&
pixelAtTheSameLocation.userId === user.sub &&
pixelAtTheSameLocation.color === paletteColor.hex
) {
ack({ success: false, error: "you_already_placed_that" });
this.userPlaceLock.delete(user.sub);
return;
}
Recaptcha.maybeChallenge(socket);
await user.modifyStack(-1);
await Canvas.setPixel(user, pixel.x, pixel.y, paletteColor.hex);
await Canvas.setPixel(
user,
pixel.x,
pixel.y,
paletteColor.hex,
bypassCooldown
);
// give undo capabilities
await user.setUndo(
new Date(Date.now() + Canvas.getCanvasConfig().undo.grace_period)
);
const newPixel: Pixel = {
x: pixel.x,
......@@ -217,6 +357,74 @@ export class SocketServer {
data: newPixel,
});
socket.broadcast.emit("pixel", newPixel);
this.userPlaceLock.delete(user.sub);
});
socket.on("undo", async (ack) => {
if (getClientConfig().canvas.frozen) {
ack({ success: false, error: "canvas_frozen" });
return;
}
if (!user) {
ack({ success: false, error: "no_user" });
return;
}
await user.update(true);
if (!user.undoExpires) {
// user has no undo available
ack({ success: false, error: "unavailable" });
return;
}
const isExpired = user.undoExpires.getTime() - Date.now() < 0;
if (isExpired) {
// expiration date is in the past, so no undo is available
ack({ success: false, error: "unavailable" });
return;
}
// find most recent pixel
const pixel = await prisma.pixel.findFirst({
where: { userId: user.sub },
orderBy: { createdAt: "desc" },
});
if (!pixel) {
// user doesn't have a pixel, idk how we got here, but they can't do anything
ack({ success: false, error: "unavailable" });
return;
}
// delete most recent pixel
try {
await Canvas.undoPixel(pixel);
} catch (_e) {
ack({ success: false, error: "pixel_covered" });
return;
}
// mark the undo as used
await user.setUndo();
// give user pixel back
await user.modifyStack(1);
// trigger re-cache on redis
await Canvas.refreshPixel(pixel.x, pixel.y);
ack({ success: true, data: {} });
});
socket.on("subscribe", (topic) => {
socket.join("sub:" + topic);
});
socket.on("unsubscribe", (topic) => {
socket.leave("sub:" + topic);
});
}
......@@ -229,7 +437,7 @@ export class SocketServer {
*
* this does work with multiple socket.io instances, so this needs to only be executed by one shard
*/
setupMasterShard() {
async setupMasterShard() {
// online announcement event
setInterval(async () => {
// possible issue: this includes every connected socket, not user count
......@@ -238,5 +446,10 @@ export class SocketServer {
socket.emit("online", { count: sockets.length });
}
}, 5000);
const redis = await Redis.getClient("SUB");
redis.subscribe(Redis.key("channel_heatmap"), (message, _channel) => {
this.io.to("sub:heatmap").emit("heatmap", message);
});
}
}
import * as openid from "openid-client";
class OpenID_ {
config: openid.Configuration = {} as any;
async setup() {
if (process.env.INHIBIT_LOGIN) {
// eslint-disable-next-line no-console
console.warn(
"OpenID is not setup; INHIBIT_LOGIN environment variable set! Proceed with caution!"
);
return;
}
const { AUTH_ENDPOINT, AUTH_CLIENT, AUTH_SECRET } = process.env;
this.config = await openid.discovery(new URL(AUTH_ENDPOINT), AUTH_CLIENT, {
client_secret: AUTH_SECRET,
});
}
getRedirectUrl() {
return process.env.OIDC_CALLBACK_HOST + "/api/callback";
}
getAuthorizationURL() {
return openid
.buildAuthorizationUrl(this.config, {
redirect_uri: this.getRedirectUrl(),
prompt: "consent",
scope: "openid instance",
})
.toString();
}
exchangeToken(relativePath: string) {
return openid.authorizationCodeGrant(
this.config,
new URL(relativePath, process.env.OIDC_CALLBACK_HOST)
);
}
userInfo<Data extends object = object>(
accessToken: string,
expectedSub: string
): Promise<openid.UserInfoResponse & Data> {
return openid.fetchUserInfo(this.config, accessToken, expectedSub) as any;
}
}
export const OpenID = new OpenID_();
import { RedisClientType } from "@redis/client";
import { createClient } from "redis";
import { Logger } from "./Logger";
import { getLogger } from "./Logger";
const Logger = getLogger("REDIS");
/**
* Typedef for RedisKeys
*/
interface IRedisKeys {
// canvas
pixelColor(x: number, y: number): string;
canvas(): string;
// canvas(): string;
heatmap(): string;
canvas_section(
start: [x: number, y: number],
end: [x: number, y: number]
): string;
canvas_cache_write_queue(workerId: number): string;
// users
socketToSub(socketId: string): string;
// pub/sub channels
channel_heatmap(): string;
}
/**
* Defined as a variable due to boottime augmentation
*/
const RedisKeys: IRedisKeys = {
pixelColor: (x: number, y: number) => `CANVAS:PIXELS[${x},${y}]:COLOR`,
canvas: () => `CANVAS:PIXELS`,
// canvas: () => `CANVAS:PIXELS`,
heatmap: () => `CANVAS:HEATMAP`,
canvas_section: (start, end) =>
`CANVAS:PIXELS:${start.join(",")}:${end.join(",")}`,
canvas_cache_write_queue: (workerId) => `CANVAS:CACHE_QUEUE:${workerId}`,
socketToSub: (socketId: string) => `CANVAS:SOCKET:${socketId}`,
channel_heatmap: () => `CANVAS:HEATMAP`,
};
class _Redis {
isConnecting = false;
isConnected = false;
client: RedisClientType;
sub_client: RedisClientType; // the client used for pubsub
waitingForConnect: ((...args: any) => any)[] = [];
keys: IRedisKeys;
......@@ -38,6 +57,9 @@ class _Redis {
this.client = createClient({
url: process.env.REDIS_HOST,
});
this.sub_client = createClient({
url: process.env.REDIS_HOST,
});
this.keys = keys;
}
......@@ -46,9 +68,18 @@ class _Redis {
if (this.isConnected)
throw new Error("Attempted to run Redis#connect when already connected");
this.isConnecting = true;
await this.client.connect();
Logger.info("Connected to Redis");
await this.sub_client.connect();
Logger.info(
`Connected to Redis, there's ${this.waitingForConnect.length} function(s) waiting for Redis`
);
this.isConnecting = false;
this.isConnected = true;
for (const func of this.waitingForConnect) {
func();
}
}
async disconnect() {
......@@ -62,12 +93,24 @@ class _Redis {
this.isConnected = false;
}
async getClient() {
async getClient(intent: "MAIN" | "SUB" = "MAIN") {
if (this.isConnecting) {
await (() =>
new Promise((res) => {
Logger.warn("getClient() called and is now pending in queue");
this.waitingForConnect.push(res);
}))();
}
if (!this.isConnected) {
await this.connect();
this.isConnected = true;
}
if (intent === "SUB") {
return this.sub_client;
}
return this.client;
}
......
import * as Sentry from "@sentry/node";
import { LONG_HASH } from "../const";
if (process.env.SENTRY_DSN) {
// only initialize sentry if environment variable is set
Sentry.init({
dsn: process.env.SENTRY_DSN,
release: LONG_HASH,
environment: process.env.SENTRY_ENVIRONMENT ?? "development",
tracesSampleRate: 1.0,
});
// ignore because this is related to error handling
// eslint-disable-next-line no-console
console.log("Sentry init with", process.env.SENTRY_DSN);
}
/**
* Create enum from array of strings
*
* @param values
* @returns
*/
export const createEnum = <T extends string>(values: T[]): { [k in T]: k } => {
const ret: { [k in T]: k } = {} as any;
for (const val of values) {
ret[val] = val;
}
return ret;
};
export type ConditionalPromise<
T,
UsePromise extends boolean = false,
> = UsePromise extends true ? Promise<T> : UsePromise extends false ? T : never;
import { AuditLog as AuditLogDB, Ban, User } from "@prisma/client";
import { prisma } from "../lib/prisma";
export class AuditLog {
static Factory(user: User | string | null) {
return new AuditLogFactory(user);
}
static async createEmpty(
user: User,
action: AuditLogDB["action"],
reason?: string
) {
return await prisma.auditLog.create({
data: {
userId: user.sub,
action,
reason,
},
});
}
}
class AuditLogFactory {
/**
* User who committed the action
*
* If null; the system did the action
*
* @nullable
*/
private _userId: string | null;
/**
* @required
*/
private _action?: AuditLogDB["action"];
private _reason: string | null = null;
private _comment: string | null = null;
/**
* Associated ban, if applicable
*/
private _ban?: Ban;
constructor(user: User | string | null) {
if (typeof user === "string" || user === null) {
this._userId = user;
} else {
this._userId = user.sub;
}
}
doing(action: AuditLogDB["action"]) {
this._action = action;
return this;
}
reason(reason: string | null) {
this._reason = reason;
return this;
}
/**
* Add comment from the service
* @param comment
* @returns
*/
withComment(comment: string | null) {
this._comment = comment;
return this;
}
withBan(ban: Ban) {
this._ban = ban;
return this;
}
async create() {
if (!this._action) {
throw new Error("Missing action");
}
return await prisma.auditLog.create({
data: {
action: this._action,
userId: this._userId || null,
reason: this._reason,
comment: this._comment,
banId: this._ban?.id,
},
});
}
}
import { Ban, Instance as InstanceDB } from "@prisma/client";
import { prisma } from "../lib/prisma";
export interface IInstanceMeta {
logo_uri?: string;
banner_uri?: string;
name?: string;
}
export class Instance {
private instance: InstanceDB;
private constructor(data: InstanceDB) {
this.instance = data;
}
get hostname() {
return this.instance.hostname;
}
/**
* Update Instance instance
*
* @throws InstanceNotFound Instance no longer exists (was deleted?)
*/
async update() {
const instance = await prisma.instance.findFirst({
where: {
id: this.instance.id,
},
});
if (!instance) throw new InstanceNotFound("Instance no longer exists");
this.instance = instance;
}
/**
* Get effective ban
*
* Filters through any subdomain bans
*/
async getEffectiveBan(): Promise<(Ban & { hostname: string }) | undefined> {
let applicable: Ban | undefined | null;
let hostname: string = this.instance.hostname;
const check = async (domain: string): Promise<any> => {
const instance = await Instance.fromDomain(domain);
hostname = domain;
applicable = await instance.getBan();
if (!applicable) {
const newDomain = domain.split(".").slice(1).join(".");
if (newDomain) {
return check(newDomain);
}
}
};
await check(this.instance.hostname);
return applicable
? {
...applicable,
hostname,
}
: undefined;
}
/**
* Get ban for this hostname
*
* @see Instance#getBans use this instead
*/
async getBan() {
const ban = await prisma.ban.findFirst({
where: {
instanceId: this.instance.id,
},
});
return ban;
}
/**
* Bans an instance (create / update)
*
* This bans all subdomains
*
* @note does not create audit log
* @note does not retroactively ban users, only blocks new users
*/
async ban(
expires: Date,
publicNote: string | null | undefined,
privateNote: string | null | undefined
) {
/*const subdomains = await Instance.getRegisteredSubdomains(
this.instance.hostname
);
const existing = await this.getBan();*/
const ban = await prisma.ban.upsert({
where: {
instanceId: this.instance.id,
},
create: {
instanceId: this.instance.id,
expiresAt: expires,
publicNote,
privateNote,
},
update: {
instanceId: this.instance.id,
expiresAt: expires,
publicNote,
privateNote,
},
});
return ban;
}
/**
* Unbans an instance
*
* @note does not create audit log
* @note does not unban a subdomain that was banned because of inheritance
* @throws InstanceNotBanned
*/
async unban() {
const existing = await this.getBan();
if (!existing) throw new InstanceNotBanned();
const ban = await prisma.ban.delete({
where: { id: existing.id },
});
return ban;
}
static async fromDomain(hostname: string): Promise<Instance> {
const instance = await prisma.instance.upsert({
where: {
hostname,
},
update: {},
create: {
hostname,
},
});
return new this(instance);
}
/**
* Get instance from hostname & update with new instance meta
* @param hostname
* @param instanceMeta
* @returns
*/
static async fromAuth(
hostname: string,
instanceMeta: IInstanceMeta
): Promise<Instance> {
if (!this.isHostnameValid(hostname)) {
throw new InstanceInvalid();
}
const instance = await prisma.instance.upsert({
where: {
hostname,
},
update: {
hostname,
name: instanceMeta.name,
logo_url: instanceMeta.logo_uri,
banner_url: instanceMeta.banner_uri,
},
create: {
hostname,
name: instanceMeta.name,
logo_url: instanceMeta.logo_uri,
banner_url: instanceMeta.banner_uri,
},
});
return new this(instance);
}
/**
* Get all registered subdomains from a domain
* @param hostname
*/
static async getRegisteredSubdomains(_hostname: string): Promise<Instance[]> {
return [];
}
/**
* Determine if a hostname is valid to be an instance
*
* Currently restricts the amount of domain parts
*
* @param hostname
* @returns
*/
static isHostnameValid(hostname: string): boolean {
return (hostname.match(/\./g) || []).length <= 5;
}
}
export class InstanceInvalid extends Error {
constructor() {
super();
this.name = "InstanceInvalid";
}
}
export class InstanceNotFound extends Error {
constructor(message?: string) {
super(message);
this.name = "InstanceNotFound";
}
}
export class InstanceNotBanned extends Error {
constructor() {
super();
this.name = "InstanceNotBanned";
}
}
import { Ban, User as UserDB } from "@prisma/client";
import { CanvasLib } from "@sc07-canvas/lib";
import {
AuthSession,
ClientToServerEvents,
IAlert,
ServerToClientEvents,
} from "@sc07-canvas/lib/src/net";
import { Socket } from "socket.io";
import { Logger } from "../lib/Logger";
import { getLogger } from "../lib/Logger";
import { prisma } from "../lib/prisma";
import { AuthSession } from "@sc07-canvas/lib/src/net";
import { getClientConfig } from "../lib/SocketServer";
import { ConditionalPromise } from "../lib/utils";
import { Instance } from "./Instance";
const Logger = getLogger();
interface IUserData {
sub: string;
lastPixelTime: Date;
pixelStack: number;
}
/**
* Represents a user ban
*
* Has implementation in here for making instance bans retroactive,
* but at time of writing, instance bans will only block new users
*/
export type IUserBan = {
id: number;
expires: Date;
publicNote: string | null;
} & (
| {
type: "user";
}
| {
type: "instance";
hostname: string;
}
);
export class User {
static instances: Map<string, User> = new Map();
sub: string;
lastPixelTime: Date;
lastTimeGainStarted: Date;
pixelStack: number;
authSession?: AuthSession;
undoExpires?: Date;
private _ban?: IUserBan;
sockets: Set<Socket> = new Set();
isAdmin: boolean;
isModerator: boolean;
sockets: Set<Socket<ClientToServerEvents, ServerToClientEvents>> = new Set();
private _updatedAt: number;
private constructor(data: IUserData) {
private constructor(data: UserDB & { Ban: Ban | null }) {
Logger.debug("User class instansiated for " + data.sub);
this.sub = data.sub;
this.lastPixelTime = data.lastPixelTime;
this.lastTimeGainStarted = data.lastTimeGainStarted;
this.pixelStack = data.pixelStack;
this.undoExpires = data.undoExpires || undefined;
this.isAdmin = data.isAdmin;
this.isModerator = data.isModerator;
this.updateBanFromUserData(data).then(() => {});
this._updatedAt = Date.now();
}
......@@ -38,30 +75,273 @@ export class User {
where: {
sub: this.sub,
},
include: {
Ban: true,
},
});
if (!userData) throw new UserNotFound();
this.lastPixelTime = userData.lastPixelTime;
this.lastTimeGainStarted = userData.lastTimeGainStarted;
this.pixelStack = userData.pixelStack;
this.undoExpires = userData.undoExpires || undefined;
this.isAdmin = userData.isAdmin;
this.isModerator = userData.isModerator;
await this.updateBanFromUserData(userData);
}
private async updateBanFromUserData(userData: UserDB & { Ban: Ban | null }) {
if (userData.Ban) {
this._ban = {
id: userData.Ban.id,
expires: userData.Ban.expiresAt,
publicNote: userData.Ban.publicNote,
type: "user",
};
} else {
// the code below is for making instance bans retroactive
//
// const instance = await this.getInstance();
// const instanceBan = await instance.getEffectiveBan();
// if (instanceBan) {
// this.ban = {
// id: instanceBan.id,
// expires: instanceBan.expiresAt,
// publicNote: instanceBan.publicNote,
// type: "instance",
// hostname: instanceBan.hostname,
// };
// }
}
}
async getInstance(): Promise<Instance> {
const [_local, hostname] = this.sub.split("@");
return await Instance.fromDomain(hostname);
}
async modifyStack(modifyBy: number): Promise<any> {
let new_date = new Date();
if (modifyBy > 0) {
let cooldown_to_add = 0.0;
for (let i = 0; i < modifyBy; i++) {
cooldown_to_add += CanvasLib.getPixelCooldown(
this.pixelStack + i + 1,
getClientConfig()
);
}
new_date = new Date(
this.lastTimeGainStarted.valueOf() + cooldown_to_add * 1000
);
} else if (modifyBy < 0) {
const cooldown_before_change_s = CanvasLib.getPixelCooldown(
this.pixelStack + 1,
getClientConfig()
);
const cooldown_after_change_s = CanvasLib.getPixelCooldown(
this.pixelStack + 1 + modifyBy,
getClientConfig()
);
const would_gain_next_at_timestamp_ms =
this.lastTimeGainStarted.valueOf() + cooldown_before_change_s * 1000;
const time_before_next =
would_gain_next_at_timestamp_ms - Date.now().valueOf();
// To avoid issue if a negative value is present for some reason
if (time_before_next > 0) {
if (time_before_next < cooldown_after_change_s * 1000) {
new_date = new Date(
Date.now() - cooldown_after_change_s * 1000 + time_before_next
);
}
}
}
const updatedUser = await prisma.user.update({
where: { sub: this.sub },
data: {
pixelStack: { increment: modifyBy },
lastTimeGainStarted: new_date,
},
});
for (const socket of this.sockets) {
socket.emit("availablePixels", updatedUser.pixelStack);
socket.emit("pixelLastPlaced", updatedUser.lastTimeGainStarted.getTime());
}
// we just modified the user data, so we should force an update
await this.update(true);
}
/**
* Set undoExpires in database and notify all user's sockets of undo ttl
*/
async setUndo(expires?: Date) {
if (expires) {
// expiration being set
await prisma.user.update({
where: { sub: this.sub },
data: {
undoExpires: expires,
},
});
for (const socket of this.sockets) {
socket.emit("undo", { available: true, expireAt: expires.getTime() });
}
} else {
// clear undo capability
await prisma.user.update({
where: { sub: this.sub },
data: {
undoExpires: undefined,
},
});
for (const socket of this.sockets) {
socket.emit("undo", { available: false });
}
}
await this.update(true);
}
/**
* Sends packet to all user's sockets with current standing information
*/
updateStanding() {
const ban = this.getBan();
if (ban) {
for (const socket of this.sockets) {
socket.emit("standing", {
banned: true,
until: ban.expires.toISOString(),
reason: ban.publicNote || undefined,
});
}
} else {
for (const socket of this.sockets) {
socket.emit("standing", { banned: false });
}
}
}
getBan<DoUpdate extends boolean = false>(
update: DoUpdate = false as DoUpdate
): ConditionalPromise<typeof this._ban, DoUpdate> {
if (update) {
return new Promise((res, rej) => {
prisma.user
.findFirst({
where: {
sub: this.sub,
},
include: {
Ban: true,
},
})
.then((user) => {
if (!user?.Ban) {
return res(undefined);
}
this._ban = {
type: "user",
id: user.Ban.id,
expires: user.Ban.expiresAt,
publicNote: user.Ban.publicNote,
};
res(this._ban);
})
.catch(rej);
}) as any;
} else {
return this._ban as any;
}
}
async ban(
expires: Date,
publicNote: string | null | undefined,
privateNote: string | null | undefined
) {
const ban = await prisma.ban.upsert({
where: {
userId: this.sub,
},
create: {
userId: this.sub,
expiresAt: expires,
publicNote,
privateNote,
},
update: {
userId: this.sub,
expiresAt: expires,
publicNote,
privateNote,
},
});
this._ban = {
id: ban.id,
type: "user",
expires,
publicNote: publicNote || null,
};
return ban;
}
async unban() {
const existing = await this.getBan(true);
if (!existing) throw new UserNotBanned();
const ban = await prisma.ban.delete({
where: { id: existing.id },
});
return ban;
}
/**
* Notifies all sockets for this user of a message
* @param alert
*/
notify(alert: IAlert) {
for (const socket of this.sockets) {
socket.emit("alert", alert);
}
}
async trackIP(ip: string) {
await prisma.iPAddress.upsert({
where: {
ip_userSub: {
ip,
userSub: this.sub,
},
},
create: {
ip,
userSub: this.sub,
lastUsedAt: new Date(),
},
update: {
ip,
userSub: this.sub,
lastUsedAt: new Date(),
},
});
}
/**
* Determine if this user data is stale and should be updated
* @see User#update
......@@ -94,6 +374,9 @@ export class User {
where: {
sub,
},
include: {
Ban: true,
},
});
if (!userData) throw new UserNotFound();
......@@ -110,3 +393,10 @@ export class UserNotFound extends Error {
this.name = "UserNotFound";
}
}
export class UserNotBanned extends Error {
constructor() {
super();
this.name = "UserNotBanned";
}
}
# src/tools
This directory contains scripts that can get executed outside of the main server
import Canvas from "../lib/Canvas";
import { Redis } from "../lib/redis";
const log = (...data: any) => {
// eslint-disable-next-line no-console
console.log(...data);
};
(async () => {
log("Caching pixels from database to Redis...");
await Canvas.pixelsToRedis();
await Redis.disconnect();
log("Cached");
})();
import { PrismaClient } from "@prisma/client";
const prisma = new PrismaClient();
// eslint-disable-next-line no-console
const log = (...msg: any[]) => console.log(...msg);
async function main() {
const SETTINGS: { key: string; defaultValue: any }[] = [
{
key: "canvas.size",
defaultValue: {
width: 100,
height: 100,
},
},
{
key: "canvas.frozen",
defaultValue: false,
},
];
for (const setting of SETTINGS) {
log("Ensuring setting", setting.key);
await prisma.setting.upsert({
where: { key: setting.key },
update: {},
create: {
key: setting.key,
value: JSON.stringify(setting.defaultValue),
},
});
}
}
main()
.then(async () => {
await prisma.$disconnect();
})
.catch(async (e) => {
// eslint-disable-next-line no-console
console.error(e);
await prisma.$disconnect();
process.exit(1);
});
......@@ -6,6 +6,8 @@ const prisma = new PrismaClient();
const log = (...msg: any[]) => console.log(...msg);
async function main() {
// pxls palette 13
// https://github.com/pxlsspace/Pxls/commit/1e0d85ddfc1258e6fc0ff9a0c1b1bff06cd9ee21
const palette: { name: string; hex: string }[] = [
{
name: "White",
......@@ -13,15 +15,19 @@ async function main() {
},
{
name: "Light Grey",
hex: "C2CBD4",
hex: "B9C3CF",
},
{
name: "Medium Grey",
hex: "858D98",
hex: "777F8C",
},
{
name: "Deep Grey",
hex: "4B4F58",
hex: "424651",
},
{
name: "Dark Grey",
hex: "1F1E26",
},
{
name: "Black",
......@@ -29,128 +35,137 @@ async function main() {
},
{
name: "Dark Chocolate",
hex: "38271D",
hex: "382215",
},
{
name: "Chocolate",
hex: "6C422C",
hex: "7C3F20",
},
{
name: "Brown",
hex: "BC7541",
hex: "C06F37",
},
{
name: "Peach",
hex: "FFB27F",
hex: "FEAD6C",
},
{
name: "Beige",
hex: "FFD68F",
hex: "FFD2B1",
},
{
name: "Pink",
hex: "FEB2D9",
hex: "FFA4D0",
},
{
name: "Magenta",
hex: "F854CF",
hex: "F14FB4",
},
{
name: "Mauve",
hex: "C785F3",
hex: "E973FF",
},
{
name: "Purple",
hex: "9C29BC",
hex: "A630D2",
},
{
name: "Dark Purple",
hex: "562972",
hex: "531D8C",
},
{
name: "Navy",
hex: "1E1E5B",
hex: "242367",
},
{
name: "Blue",
hex: "153FA2",
hex: "0334BF",
},
{
name: "Azure",
hex: "1C95DF",
hex: "149CFF",
},
{
name: "Aqua",
hex: "A0E8FF",
hex: "8DF5FF",
},
{
name: "Light Teal",
hex: "17A8A3",
hex: "01BFA5",
},
{
name: "Dark Teal",
hex: "226677",
hex: "16777E",
},
{
name: "Forest",
hex: "094C45",
hex: "054523",
},
{
name: "Dark Green",
hex: "278242",
hex: "18862F",
},
{
name: "Green",
hex: "43C91E",
hex: "61E021",
},
{
name: "Lime",
hex: "B7F954",
hex: "B1FF37",
},
{
name: "Pastel Yellow",
hex: "FFFFAF",
hex: "FFFFA5",
},
{
name: "Yellow",
hex: "FAE70F",
hex: "FDE111",
},
{
name: "Orange",
hex: "FEA815",
hex: "FF9F17",
},
{
name: "Rust",
hex: "EA5B15",
hex: "F66E08",
},
{
name: "Maroon",
hex: "5A0400",
hex: "550022",
},
{
name: "Rose",
hex: "990700",
hex: "99011A",
},
{
name: "Red",
hex: "D81515",
hex: "F30F0C",
},
{
name: "Watermelon",
hex: "FF635E",
hex: "FF7872",
},
];
for (const { name, hex } of palette) {
log("Ensuring color", { name, hex });
await prisma.paletteColor.upsert({
where: { hex },
update: {},
create: {
name,
hex,
},
});
if (process.argv?.[2] === "sql") {
log(`ALTER SEQUENCE "PaletteColor_id_seq" RESTART WITH 1;`);
for (const { name, hex } of palette) {
log(
`INSERT INTO "PaletteColor" (name, hex) VALUES ('${name}', '${hex}');`
);
}
} else {
for (const { name, hex } of palette) {
log("Ensuring color", { name, hex });
await prisma.paletteColor.upsert({
where: { hex },
update: {},
create: {
name,
hex,
},
});
}
}
}
......
import { Jobs } from "../jobs/Jobs";
import { loadSettings } from "../lib/Settings";
loadSettings(true).then(() => {
new Jobs();
});
import { AuthSession } from "@sc07-canvas/lib/src/net";
import type { Session } from "express-session";
import session from "express-session";
import { AuthSession } from "@sc07-canvas/lib/src/net";
declare module "express-session" {
interface SessionData {
......@@ -19,12 +19,24 @@ declare global {
namespace NodeJS {
interface ProcessEnv {
NODE_ENV: "development" | "production";
NODE_APP_INSTANCE?: string;
PORT: string;
LOG_LEVEL?: string;
SESSION_SECRET: string;
PROMETHEUS_TOKEN?: string;
REDIS_HOST: string;
REDIS_SESSION_PREFIX: string;
REDIS_SESSION_PREFIX?: string;
REDIS_RATELIMIT_PREFIX?: string;
/**
* hostname that is used in the callback
*
* @example http://localhost:3000
* @example https://canvas.com
*/
OIDC_CALLBACK_HOST: string;
/**
* If this is set, enable socket.io CORS to this origin
*
......@@ -44,6 +56,24 @@ declare global {
AUTH_ENDPOINT: string;
AUTH_CLIENT: string;
AUTH_SECRET: string;
MATRIX_HOMESERVER: string;
ELEMENT_HOST: string;
MATRIX_GENERAL_ALIAS: string;
PIXEL_LOG_PATH?: string;
RECAPTCHA_SITE_KEY?: string;
RECAPTCHA_SECRET_KEY?: string;
RECAPTCHA_PIXEL_CHANCE?: string;
DISCORD_WEBHOOK?: string;
CACHE_WORKERS?: string;
SENTRY_DSN?: string;
SENTRY_ENVIRONMENT?: string;
SENTRY_TUNNEL_PROJECT_IDS?: string;
}
}
}
/**
* Cache the contents of the database into redis keys
*
* Each cache chunk should aim be 100x100 pixels
*/
import { parentPort } from "node:worker_threads";
import { getLogger } from "../lib/Logger";
import { prisma } from "../lib/prisma";
import { Redis } from "../lib/redis";
// TODO: config maybe?
// <!> this value is hardcoded in #getCanvasSectionFromCoords
const canvasSectionSize = [100, 100];
type Message =
| { type: "id"; workerId: number }
| {
type: "cache";
start: [x: number, y: number];
end: [x: number, y: number];
callbackId: string;
}
| {
type: "write_pixel";
};
let Logger = getLogger("CANVAS_WORK");
/**
* We run the connection directly instead of via class functions to prevent side effects
*/
const redis = Redis.client;
redis.connect().then(() => {
Logger.info("Connected to Redis");
});
let workerId: number;
parentPort?.on("message", (msg: Message) => {
switch (msg.type) {
case "id":
workerId = msg.workerId;
Logger = getLogger("CANVAS_WORK", workerId);
Logger.info("Received worker ID assignment: " + workerId);
startWriteQueue().then(() => {});
break;
case "cache":
doCache(msg.start, msg.end).then(() => {
parentPort?.postMessage({
type: "callback",
callbackId: msg.callbackId,
});
});
break;
}
});
/**
* Get canvas section from coordinates
*
* @note This is hardcoded to expect the section size to be 100x100 pixels
*
* @param x
* @param y
*/
const getCanvasSectionFromCoords = (
x: number,
y: number
): { start: [x: number, y: number]; end: [x: number, y: number] } => {
// since we are assuming the section size is 100x100
// we can get the start position based on the hundreds position
const baseX = Math.floor((x % 1000) / 100); // get the hundreds
const baseY = Math.floor((y % 1000) / 100); // get the hundreds
return {
start: [baseX * 100, baseY * 100],
end: [baseX * 100 + 100, baseY * 100 + 100],
};
};
const startWriteQueue = async () => {
const item = await redis.lPop(
Redis.key("canvas_cache_write_queue", workerId)
);
if (!item) {
setTimeout(() => {
startWriteQueue();
}, 250);
return;
}
const x = parseInt(item.split(",")[0]);
const y = parseInt(item.split(",")[1]);
const color = item.split(",")[2];
const section = getCanvasSectionFromCoords(x, y);
const pixels: string[] = (
(await redis.get(
Redis.key("canvas_section", section.start, section.end)
)) || ""
).split(",");
const arrX = x - section.start[0];
const arrY = y - section.start[1];
pixels[canvasSectionSize[0] * arrY + arrX] = color;
await redis.set(
Redis.key("canvas_section", section.start, section.end),
pixels.join(",")
);
startWriteQueue();
};
const doCache = async (
start: [x: number, y: number],
end: [x: number, y: number]
) => {
const now = Date.now();
Logger.info(
"starting cache of section " + start.join(",") + " -> " + end.join(",")
);
const dbpixels = await prisma.pixel.findMany({
where: {
x: {
gte: start[0],
lt: end[0],
},
y: {
gte: start[1],
lt: end[1],
},
isTop: true,
},
});
const pixels: string[] = [];
// (y -> x) because of how the conversion needs to be done later
// if this is inverted, the map will flip when rebuilding the cache (5 minute expiry)
// fixes #24
for (let y = start[1]; y < end[1]; y++) {
for (let x = start[0]; x < end[0]; x++) {
pixels.push(
dbpixels.find((px) => px.x === x && px.y === y)?.color || "transparent"
);
}
}
await redis.set(Redis.key("canvas_section", start, end), pixels.join(","));
Logger.info(
"finished cache of section " +
start.join(",") +
" -> " +
end.join(",") +
" in " +
((Date.now() - now) / 1000).toFixed(2) +
"s"
);
};
import crypto from "node:crypto";
import path from "node:path";
import { Worker, WorkerOptions } from "node:worker_threads";
import { getLogger } from "../lib/Logger";
const Logger = getLogger("WORKER_ROOT");
export const CACHE_WORKERS = process.env.CACHE_WORKERS
? parseInt(process.env.CACHE_WORKERS) || 1
: 1;
export const spawnWorker = (file: string, wkOpts: WorkerOptions = {}) => {
if (process.env.NODE_ENV === "production") {
// when compiled we no longer need ts-node as it's already raw JS
// replace the file extension so it can load it directly
file = path.join(__dirname, file.replace(".ts", ".js"));
return new Worker(file, wkOpts);
} else {
// when in development we just have TS files
// this loads TS dynamically when the worker is created
// https://github.com/TypeStrong/ts-node/issues/676#issuecomment-531620154
wkOpts.eval = true;
if (!wkOpts.workerData) {
wkOpts.workerData = {};
}
wkOpts.workerData.__filename = path.join(__dirname, file);
return new Worker(
`
const wk = require('worker_threads');
require('ts-node').register();
let file = wk.workerData.__filename;
delete wk.workerData.__filename;
require(file);
`,
wkOpts
);
}
};
// not used as of right now
// dedicated worker threads for specific tasks would go here
const AllWorkers: { [k: string]: Worker } = {};
const cacheWorkers: Worker[] = [];
/**
* Return consistent worker ID for the specified coordinates
* @param x
* @param y
* @returns
*/
export const getCacheWorkerIdForCoords = (x: number, y: number): number => {
const key = (x + y) % cacheWorkers.length;
return key;
};
/**
* Return consistent worker for the specified coordinates
* @param x
* @param y
* @returns
*/
export const getCacheWorkerForCoords = (x: number, y: number): Worker => {
return cacheWorkers[getCacheWorkerIdForCoords(x, y)];
};
/**
* Spawns cache workers
*
* Promise resolves when all of them are alive
*
* @param num
*/
export const spawnCacheWorkers = async (num?: number): Promise<void> => {
if (typeof num === "undefined") {
// if the function isn't told, use the environment variables
num = CACHE_WORKERS;
}
Logger.info(`Spawning ${num} cache workers...`);
const pending: Promise<unknown>[] = [];
for (let i = 0; i < num; i++) {
const worker = spawnWorker("canvas_cache");
pending.push(
new Promise((res) => {
worker.on("online", () => {
Logger.info(`Canvas cache worker #${i} is now online`);
worker.postMessage({ type: "id", workerId: i });
res(undefined);
});
})
);
worker.on("error", (err) => {
Logger.error(`Canvas cache worker #${i} has errored`);
// eslint-disable-next-line no-console
console.error(err);
});
worker.on("exit", (exit) => {
Logger.warn(`Canvas cache worker #${i} has exited ${exit}`);
const index = cacheWorkers.indexOf(worker);
if (index > -1) {
cacheWorkers.splice(index, 1);
Logger.info(`Removed dead worker #${i} from pool`);
}
});
setupWorkerCallback(worker);
cacheWorkers.push(worker);
}
await Promise.allSettled(pending);
Logger.info(`Successfully spawned ${num} cache workers`);
};
export const getCanvasCacheWorker = () => {
return cacheWorkers[Math.floor(Math.random() * cacheWorkers.length)];
};
const cacheWorkerQueue: { [k: string]: () => any } = {};
/**
* Prometheus metrics
* @returns
*/
export const getCacheWorkerQueueLength = () =>
Object.keys(cacheWorkerQueue).length;
const setupWorkerCallback = (worker: Worker) => {
worker.on("message", (message: { type: "callback"; callbackId: number }) => {
if (message.type !== "callback") return;
const callback = cacheWorkerQueue[message.callbackId];
if (!callback) {
Logger.warn(
"Received callback message from worker, but no callbacks are waiting " +
message.callbackId
);
return;
}
callback();
delete cacheWorkerQueue[message.callbackId];
});
};
export const callCacheWorker = (type: string, data: any) => {
return new Promise<void>((res) => {
const callbackId = crypto.randomUUID();
cacheWorkerQueue[callbackId] = () => {
res();
clearTimeout(watchdog);
};
const watchdog = setTimeout(() => {
Logger.error(
`Callback for ${type} ${callbackId} has taken too long, is it dead?`
);
}, 10000);
const worker = getCanvasCacheWorker();
worker.postMessage({
...data,
type,
callbackId,
});
});
};
export const callWorkerMethod = (worker: Worker, type: string, data: any) => {
return new Promise<void>((res) => {
const callbackId = Math.floor(Math.random() * 99999);
Logger.info(`Calling worker method ${type} ${callbackId}`);
const handleMessage = (message: {
type: "callback";
callbackId: number;
}) => {
if (message.type !== "callback") return;
if (message.callbackId !== callbackId) return;
Logger.info(`Finished worker call ${type} ${callbackId}`);
res();
worker.off("message", handleMessage);
};
worker.on("message", handleMessage);
worker.postMessage({
...data,
type,
callbackId,
});
});
};
for (const [name, worker] of Object.entries(AllWorkers)) {
worker.on("online", () => {
Logger.info(`${name} worker is now online`);
});
worker.on("exit", (exitCode) => {
Logger.warn(`${name} worker has exited ${exitCode}`);
});
worker.on("error", (err) => {
Logger.warn(`${name} worker has errored ${err.message}`);
// eslint-disable-next-line no-console
console.error(err);
});
}