mirror of
https://github.com/fosrl/pangolin.git
synced 2026-06-23 07:41:50 +00:00
feat: batch redis ws direct messages and dedupe rebuild queue jobs
This commit is contained in:
committed by
Owen
parent
16abe98fd9
commit
d09668b20b
@@ -29,6 +29,7 @@ export interface RebuildJobHandlers {
|
||||
|
||||
// Redis list holding pending rebuild jobs (RPUSH to enqueue, LPOP to dequeue — FIFO order).
|
||||
const QUEUE_KEY = "rebuild-client-associations:queue";
|
||||
const QUEUED_SET_KEY = "rebuild-client-associations:queued";
|
||||
|
||||
// Distributed lock that serialises queue consumption to a single server instance
|
||||
// at a time. TTL is generous enough to cover a full batch of expensive rebuilds.
|
||||
@@ -54,11 +55,23 @@ class RedisRebuildQueue {
|
||||
}
|
||||
|
||||
try {
|
||||
const dedupeKey = `${job.type}:${job.id}`;
|
||||
const added = await redis.sadd(QUEUED_SET_KEY, dedupeKey);
|
||||
if (added === 0) {
|
||||
logger.debug(
|
||||
`Rebuild queue: skipped duplicate queued job ${job.type}:${job.id}`
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
await redis.rpush(QUEUE_KEY, JSON.stringify(job));
|
||||
logger.debug(
|
||||
`Rebuild queue: enqueued ${job.type}:${job.id} (queue position: tail)`
|
||||
);
|
||||
} catch (err) {
|
||||
await redis
|
||||
?.srem(QUEUED_SET_KEY, `${job.type}:${job.id}`)
|
||||
.catch(() => undefined);
|
||||
logger.error(
|
||||
`Rebuild queue: failed to enqueue ${job.type}:${job.id}:`,
|
||||
err
|
||||
@@ -121,6 +134,12 @@ class RedisRebuildQueue {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Remove from dedupe set once dequeued so the same job
|
||||
// can be re-queued while this one is in progress.
|
||||
await redis
|
||||
.srem(QUEUED_SET_KEY, `${job.type}:${job.id}`)
|
||||
.catch(() => undefined);
|
||||
|
||||
logger.debug(
|
||||
`Rebuild queue: processing ${job.type}:${job.id}`
|
||||
);
|
||||
|
||||
Reference in New Issue
Block a user