Newer
Older
import { Worker, WorkerOptions } from "node:worker_threads";
import path from "node:path";
import { getLogger } from "../lib/Logger";
const Logger = getLogger("WORKER_ROOT");
export const CACHE_WORKERS = process.env.CACHE_WORKERS
? parseInt(process.env.CACHE_WORKERS) || 1
: 1;
export const spawnWorker = (file: string, wkOpts: WorkerOptions = {}) => {
if (process.env.NODE_ENV === "production") {
// when compiled we no longer need ts-node as it's already raw JS
// replace the file extension so it can load it directly
file = path.join(__dirname, file.replace(".ts", ".js"));
return new Worker(file, wkOpts);
} else {
// when in development we just have TS files
// this loads TS dynamically when the worker is created
// https://github.com/TypeStrong/ts-node/issues/676#issuecomment-531620154
wkOpts.eval = true;
if (!wkOpts.workerData) {
wkOpts.workerData = {};
}
wkOpts.workerData.__filename = path.join(__dirname, file);
return new Worker(
`
const wk = require('worker_threads');
require('ts-node').register();
let file = wk.workerData.__filename;
delete wk.workerData.__filename;
require(file);
`,
wkOpts
);
}
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
// not used as of right now
// dedicated worker threads for specific tasks would go here
const AllWorkers: { [k: string]: Worker } = {};
let cacheWorkers: Worker[] = [];
/**
* Return consistent worker ID for the specified coordinates
* @param x
* @param y
* @returns
*/
export const getCacheWorkerIdForCoords = (x: number, y: number): number => {
const key = (x + y) % cacheWorkers.length;
return key;
};
/**
* Return consistent worker for the specified coordinates
* @param x
* @param y
* @returns
*/
export const getCacheWorkerForCoords = (x: number, y: number): Worker => {
return cacheWorkers[getCacheWorkerIdForCoords(x, y)];
};
/**
* Spawns cache workers
*
* Promise resolves when all of them are alive
*
* @param num
*/
export const spawnCacheWorkers = async (num?: number): Promise<void> => {
if (typeof num === "undefined") {
// if the function isn't told, use the environment variables
num = CACHE_WORKERS;
}
Logger.info(`Spawning ${num} cache workers...`);
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
for (let i = 0; i < num; i++) {
const worker = spawnWorker("canvas_cache");
pending.push(
new Promise((res) => {
worker.on("online", () => {
Logger.info(`Canvas cache worker #${i} is now online`);
worker.postMessage({ type: "id", workerId: i });
res(undefined);
});
})
);
worker.on("error", (err) => {
Logger.error(`Canvas cache worker #${i} has errored`);
console.error(err);
});
worker.on("exit", (exit) => {
Logger.warn(`Canvas cache worker #${i} has exited ${exit}`);
const index = cacheWorkers.indexOf(worker);
if (index > -1) {
cacheWorkers.splice(index, 1);
Logger.info(`Removed dead worker #${i} from pool`);
}
});
setupWorkerCallback(worker);
cacheWorkers.push(worker);
}
await Promise.allSettled(pending);
Logger.info(`Successfully spawned ${num} cache workers`);
};
export const getCanvasCacheWorker = () => {
return cacheWorkers[Math.floor(Math.random() * cacheWorkers.length)];
};
let cacheWorkerQueue: { [k: string]: () => any } = {};
/**
* Prometheus metrics
* @returns
*/
export const getCacheWorkerQueueLength = () =>
Object.keys(cacheWorkerQueue).length;
const setupWorkerCallback = (worker: Worker) => {
worker.on("message", (message: { type: "callback"; callbackId: number }) => {
if (message.type !== "callback") return;
const callback = cacheWorkerQueue[message.callbackId];
if (!callback) {
Logger.warn(
"Received callback message from worker, but no callbacks are waiting " +
message.callbackId
);
return;
}
callback();
delete cacheWorkerQueue[message.callbackId];
});
export const callCacheWorker = (type: string, data: any) => {
return new Promise<void>((res) => {
cacheWorkerQueue[callbackId] = () => {
res();
clearTimeout(watchdog);
};
let watchdog = setTimeout(() => {
Logger.error(
`Callback for ${type} ${callbackId} has taken too long, is it dead?`
);
}, 10000);
const worker = getCanvasCacheWorker();
worker.postMessage({
...data,
type,
callbackId,
});
});
};
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
export const callWorkerMethod = (worker: Worker, type: string, data: any) => {
return new Promise<void>((res) => {
const callbackId = Math.floor(Math.random() * 99999);
Logger.info(`Calling worker method ${type} ${callbackId}`);
const handleMessage = (message: {
type: "callback";
callbackId: number;
}) => {
if (message.type !== "callback") return;
if (message.callbackId !== callbackId) return;
Logger.info(`Finished worker call ${type} ${callbackId}`);
res();
worker.off("message", handleMessage);
};
worker.on("message", handleMessage);
worker.postMessage({
...data,
type,
callbackId,
});
});
};
for (const [name, worker] of Object.entries(AllWorkers)) {
worker.on("online", () => {
Logger.info(`${name} worker is now online`);
});
worker.on("exit", (exitCode) => {
Logger.warn(`${name} worker has exited ${exitCode}`);
});
worker.on("error", (err) => {
Logger.warn(`${name} worker has errored ${err.message}`);
console.error(err);
});
}