mirror of
https://github.com/fosrl/pangolin.git
synced 2026-06-17 12:57:17 +00:00
Compare commits
2 Commits
queue
...
copilot/ad
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b66140bfd2 | ||
|
|
45158ff45b |
@@ -29,6 +29,7 @@ export interface RebuildJobHandlers {
|
||||
|
||||
// Redis list holding pending rebuild jobs (RPUSH to enqueue, LPOP to dequeue — FIFO order).
|
||||
const QUEUE_KEY = "rebuild-client-associations:queue";
|
||||
const QUEUED_SET_KEY = "rebuild-client-associations:queued";
|
||||
|
||||
// Distributed lock that serialises queue consumption to a single server instance
|
||||
// at a time. TTL is generous enough to cover a full batch of expensive rebuilds.
|
||||
@@ -54,11 +55,28 @@ class RedisRebuildQueue {
|
||||
}
|
||||
|
||||
try {
|
||||
const dedupeKey = `${job.type}:${job.id}`;
|
||||
const added = await redis.sadd(QUEUED_SET_KEY, dedupeKey);
|
||||
if (added === 0) {
|
||||
logger.debug(
|
||||
`Rebuild queue: skipped duplicate queued job ${job.type}:${job.id}`
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
await redis.rpush(QUEUE_KEY, JSON.stringify(job));
|
||||
logger.debug(
|
||||
`Rebuild queue: enqueued ${job.type}:${job.id} (queue position: tail)`
|
||||
);
|
||||
} catch (err) {
|
||||
await redis
|
||||
.srem(QUEUED_SET_KEY, `${job.type}:${job.id}`)
|
||||
.catch((cleanupErr) =>
|
||||
logger.warn(
|
||||
`Rebuild queue: failed to cleanup dedupe key for ${job.type}:${job.id} after enqueue failure:`,
|
||||
cleanupErr
|
||||
)
|
||||
);
|
||||
logger.error(
|
||||
`Rebuild queue: failed to enqueue ${job.type}:${job.id}:`,
|
||||
err
|
||||
@@ -121,6 +139,17 @@ class RedisRebuildQueue {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Remove from dedupe set once dequeued so the same job
|
||||
// can be re-queued while this one is in progress.
|
||||
await redis
|
||||
.srem(QUEUED_SET_KEY, `${job.type}:${job.id}`)
|
||||
.catch((cleanupErr) =>
|
||||
logger.warn(
|
||||
`Rebuild queue: failed to remove dedupe key for ${job.type}:${job.id} on dequeue:`,
|
||||
cleanupErr
|
||||
)
|
||||
);
|
||||
|
||||
logger.debug(
|
||||
`Rebuild queue: processing ${job.type}:${job.id}`
|
||||
);
|
||||
|
||||
@@ -187,6 +187,8 @@ const wss: WebSocketServer = new WebSocketServer({ noServer: true });
|
||||
// Generate unique node ID for this instance
|
||||
const NODE_ID = uuidv4();
|
||||
const REDIS_CHANNEL = "websocket_messages";
|
||||
const REDIS_DIRECT_BATCH_SIZE = 250;
|
||||
const REDIS_DIRECT_FLUSH_INTERVAL_MS = 10;
|
||||
|
||||
// Client tracking map (local to this node)
|
||||
const connectedClients: Map<string, AuthenticatedWebSocket[]> = new Map();
|
||||
@@ -197,6 +199,15 @@ const clientConfigVersions: Map<string, number> = new Map();
|
||||
// Recovery tracking
|
||||
let isRedisRecoveryInProgress = false;
|
||||
|
||||
interface RedisDirectBatchEntry {
|
||||
targetClientId: string;
|
||||
message: WSMessage;
|
||||
resolve: () => void;
|
||||
}
|
||||
|
||||
let pendingRedisDirectMessages: RedisDirectBatchEntry[] = [];
|
||||
let redisDirectFlushTimer: NodeJS.Timeout | null = null;
|
||||
|
||||
// Helper to get map key
|
||||
const getClientMapKey = (clientId: string) => clientId;
|
||||
|
||||
@@ -207,6 +218,78 @@ const getNodeConnectionsKey = (nodeId: string, clientId: string) =>
|
||||
const getConfigVersionKey = (clientId: string) =>
|
||||
`ws:configVersion:${clientId}`;
|
||||
|
||||
const clearRedisDirectFlushTimer = (): void => {
|
||||
if (redisDirectFlushTimer) {
|
||||
clearTimeout(redisDirectFlushTimer);
|
||||
redisDirectFlushTimer = null;
|
||||
}
|
||||
};
|
||||
|
||||
const publishDirectBatch = async (
|
||||
entries: RedisDirectBatchEntry[]
|
||||
): Promise<void> => {
|
||||
const redisMessage: RedisMessage = {
|
||||
type: "direct-batch",
|
||||
messages: entries.map((entry) => ({
|
||||
targetClientId: entry.targetClientId,
|
||||
message: entry.message
|
||||
})),
|
||||
fromNodeId: NODE_ID
|
||||
};
|
||||
|
||||
await redisManager.publish(REDIS_CHANNEL, JSON.stringify(redisMessage));
|
||||
};
|
||||
|
||||
const flushPendingRedisDirectMessages = async (): Promise<void> => {
|
||||
clearRedisDirectFlushTimer();
|
||||
|
||||
if (pendingRedisDirectMessages.length === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
const entries = pendingRedisDirectMessages;
|
||||
pendingRedisDirectMessages = [];
|
||||
|
||||
if (!redisManager.isRedisEnabled()) {
|
||||
entries.forEach((entry) => entry.resolve());
|
||||
return;
|
||||
}
|
||||
|
||||
for (let i = 0; i < entries.length; i += REDIS_DIRECT_BATCH_SIZE) {
|
||||
const batch = entries.slice(i, i + REDIS_DIRECT_BATCH_SIZE);
|
||||
try {
|
||||
await publishDirectBatch(batch);
|
||||
} catch (error) {
|
||||
logger.error(
|
||||
"Failed to send batched direct messages via Redis, messages may be lost:",
|
||||
error
|
||||
);
|
||||
} finally {
|
||||
batch.forEach((entry) => entry.resolve());
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
const enqueueRedisDirectMessage = async (
|
||||
targetClientId: string,
|
||||
message: WSMessage
|
||||
): Promise<void> => {
|
||||
await new Promise<void>((resolve) => {
|
||||
pendingRedisDirectMessages.push({ targetClientId, message, resolve });
|
||||
|
||||
if (pendingRedisDirectMessages.length >= REDIS_DIRECT_BATCH_SIZE) {
|
||||
void flushPendingRedisDirectMessages();
|
||||
return;
|
||||
}
|
||||
|
||||
if (!redisDirectFlushTimer) {
|
||||
redisDirectFlushTimer = setTimeout(() => {
|
||||
void flushPendingRedisDirectMessages();
|
||||
}, REDIS_DIRECT_FLUSH_INTERVAL_MS);
|
||||
}
|
||||
});
|
||||
};
|
||||
|
||||
// Initialize Redis subscription for cross-node messaging
|
||||
const initializeRedisSubscription = async (): Promise<void> => {
|
||||
if (!redisManager.isRedisEnabled()) return;
|
||||
@@ -227,7 +310,16 @@ const initializeRedisSubscription = async (): Promise<void> => {
|
||||
// Send to specific client on this node
|
||||
await sendToClientLocal(
|
||||
redisMessage.targetClientId,
|
||||
redisMessage.message
|
||||
redisMessage.message,
|
||||
{},
|
||||
redisMessage.message.configVersion
|
||||
);
|
||||
} else if (
|
||||
redisMessage.type === "direct-batch" &&
|
||||
redisMessage.messages
|
||||
) {
|
||||
await sendRedisDirectBatchToLocalClients(
|
||||
redisMessage.messages
|
||||
);
|
||||
} else if (redisMessage.type === "broadcast") {
|
||||
// Broadcast to all clients on this node except excluded
|
||||
@@ -503,7 +595,8 @@ const incrementClientConfigVersion = async (
|
||||
const sendToClientLocal = async (
|
||||
clientId: string,
|
||||
message: WSMessage,
|
||||
options: SendMessageOptions = {}
|
||||
options: SendMessageOptions = {},
|
||||
preResolvedConfigVersion?: number
|
||||
): Promise<boolean> => {
|
||||
const mapKey = getClientMapKey(clientId);
|
||||
const clients = connectedClients.get(mapKey);
|
||||
@@ -512,7 +605,8 @@ const sendToClientLocal = async (
|
||||
}
|
||||
|
||||
// Handle config version
|
||||
const configVersion = await getClientConfigVersion(clientId);
|
||||
const configVersion =
|
||||
preResolvedConfigVersion ?? (await getClientConfigVersion(clientId));
|
||||
|
||||
// Add config version to message
|
||||
const messageWithVersion = {
|
||||
@@ -545,6 +639,20 @@ const sendToClientLocal = async (
|
||||
return true;
|
||||
};
|
||||
|
||||
const sendRedisDirectBatchToLocalClients = async (
|
||||
entries: { targetClientId: string; message: WSMessage }[]
|
||||
): Promise<void> => {
|
||||
const jobs = entries.map((entry) =>
|
||||
sendToClientLocal(
|
||||
entry.targetClientId,
|
||||
entry.message,
|
||||
{},
|
||||
entry.message.configVersion
|
||||
)
|
||||
);
|
||||
await Promise.all(jobs);
|
||||
};
|
||||
|
||||
const broadcastToAllExceptLocal = async (
|
||||
message: WSMessage,
|
||||
excludeClientId?: string,
|
||||
@@ -607,23 +715,13 @@ const sendToClient = async (
|
||||
// Only send via Redis if the client is not connected locally and Redis is enabled
|
||||
if (!localSent && redisManager.isRedisEnabled()) {
|
||||
try {
|
||||
const redisMessage: RedisMessage = {
|
||||
type: "direct",
|
||||
targetClientId: clientId,
|
||||
message: {
|
||||
...message,
|
||||
configVersion
|
||||
},
|
||||
fromNodeId: NODE_ID
|
||||
};
|
||||
|
||||
await redisManager.publish(
|
||||
REDIS_CHANNEL,
|
||||
JSON.stringify(redisMessage)
|
||||
);
|
||||
await enqueueRedisDirectMessage(clientId, {
|
||||
...message,
|
||||
configVersion
|
||||
});
|
||||
} catch (error) {
|
||||
logger.error(
|
||||
"Failed to send message via Redis, message may be lost:",
|
||||
"Failed to queue batched direct message for Redis delivery, message may be lost:",
|
||||
error
|
||||
);
|
||||
// Continue execution - local delivery already attempted
|
||||
@@ -1109,6 +1207,8 @@ const disconnectClient = async (clientId: string): Promise<boolean> => {
|
||||
// Cleanup function for graceful shutdown
|
||||
const cleanup = async (): Promise<void> => {
|
||||
try {
|
||||
await flushPendingRedisDirectMessages();
|
||||
|
||||
// Close all WebSocket connections
|
||||
connectedClients.forEach((clients) => {
|
||||
clients.forEach((client) => {
|
||||
|
||||
@@ -76,12 +76,26 @@ export interface SendMessageOptions {
|
||||
compress?: boolean;
|
||||
}
|
||||
|
||||
// Redis message type for cross-node communication
|
||||
export interface RedisMessage {
|
||||
type: "direct" | "broadcast";
|
||||
targetClientId?: string;
|
||||
excludeClientId?: string;
|
||||
message: WSMessage;
|
||||
fromNodeId: string;
|
||||
options?: SendMessageOptions;
|
||||
}
|
||||
// Redis message types for cross-node communication
|
||||
export type RedisMessage =
|
||||
| {
|
||||
type: "direct";
|
||||
targetClientId: string;
|
||||
message: WSMessage;
|
||||
fromNodeId: string;
|
||||
}
|
||||
| {
|
||||
type: "direct-batch";
|
||||
messages: {
|
||||
targetClientId: string;
|
||||
message: WSMessage;
|
||||
}[];
|
||||
fromNodeId: string;
|
||||
}
|
||||
| {
|
||||
type: "broadcast";
|
||||
excludeClientId?: string;
|
||||
message: WSMessage;
|
||||
fromNodeId: string;
|
||||
options?: SendMessageOptions;
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user