Add missing queuing

This commit is contained in:
Owen
2026-06-22 16:47:52 -04:00
parent 6b56c00782
commit c3820a4e70
2 changed files with 118 additions and 61 deletions

View File

@@ -873,6 +873,20 @@ async function handleSubnetProxyTargetUpdates(
): Promise<void> { ): Promise<void> {
const proxyJobs: Promise<any>[] = []; const proxyJobs: Promise<any>[] = [];
const olmJobs: Promise<any>[] = []; const olmJobs: Promise<any>[] = [];
const targetsToAddBatch: {
newtId: string;
targets: NonNullable<
Awaited<ReturnType<typeof generateSubnetProxyTargetV2>>
>;
version: string | null;
}[] = [];
const targetsToRemoveBatch: {
newtId: string;
targets: NonNullable<
Awaited<ReturnType<typeof generateSubnetProxyTargetV2>>
>;
version: string | null;
}[] = [];
for (const siteData of sitesList) { for (const siteData of sitesList) {
const siteId = siteData.siteId; const siteId = siteData.siteId;
@@ -904,15 +918,11 @@ async function handleSubnetProxyTargetUpdates(
); );
if (targetsToAdd) { if (targetsToAdd) {
proxyJobs.push( targetsToAddBatch.push({
addSubnetProxyTargetsBatch([ newtId: newt.newtId,
{ targets: targetsToAdd,
newtId: newt.newtId, version: newt.version
targets: targetsToAdd, });
version: newt.version
}
])
);
} }
olmJobs.push( olmJobs.push(
@@ -945,15 +955,11 @@ async function handleSubnetProxyTargetUpdates(
); );
if (targetsToRemove) { if (targetsToRemove) {
proxyJobs.push( targetsToRemoveBatch.push({
removeSubnetProxyTargetsBatch([ newtId: newt.newtId,
{ targets: targetsToRemove,
newtId: newt.newtId, version: newt.version
targets: targetsToRemove, });
version: newt.version
}
])
);
} }
const peerDataRemovals: { const peerDataRemovals: {
@@ -1025,7 +1031,15 @@ async function handleSubnetProxyTargetUpdates(
} }
} }
await Promise.all(proxyJobs); if (targetsToAddBatch.length > 0) {
proxyJobs.push(addSubnetProxyTargetsBatch(targetsToAddBatch));
}
if (targetsToRemoveBatch.length > 0) {
proxyJobs.push(removeSubnetProxyTargetsBatch(targetsToRemoveBatch));
}
await Promise.all([...proxyJobs, ...olmJobs]);
} }
export async function rebuildClientAssociationsFromClient( export async function rebuildClientAssociationsFromClient(
@@ -2048,19 +2062,20 @@ export async function cleanupSiteAssociations(
// 7. Fire all removal messages in parallel. // 7. Fire all removal messages in parallel.
const jobs: Promise<any>[] = []; const jobs: Promise<any>[] = [];
const olmPeerDeletes: {
clientId: number;
siteId: number;
publicKey: string;
}[] = [];
for (const client of allClients) { for (const client of allClients) {
// Tell each olm to drop the site's WireGuard peer. // Tell each olm to drop the site's WireGuard peer.
if (site.publicKey) { if (site.publicKey) {
jobs.push( olmPeerDeletes.push({
olmDeletePeersBatch([ clientId: client.clientId,
{ siteId,
clientId: client.clientId, publicKey: site.publicKey
siteId, });
publicKey: site.publicKey
}
])
);
} }
// Recompute and push updated relay destinations (now excluding this site). // Recompute and push updated relay destinations (now excluding this site).
@@ -2069,6 +2084,10 @@ export async function cleanupSiteAssociations(
} }
} }
if (olmPeerDeletes.length > 0) {
jobs.push(olmDeletePeersBatch(olmPeerDeletes));
}
await Promise.all(jobs).catch((error) => { await Promise.all(jobs).catch((error) => {
logger.error( logger.error(
`cleanupSiteAssociations: error sending cleanup messages for siteId=${siteId}:`, `cleanupSiteAssociations: error sending cleanup messages for siteId=${siteId}:`,

View File

@@ -659,38 +659,52 @@ const broadcastToAllExceptLocal = async (
excludeClientId?: string, excludeClientId?: string,
options: SendMessageOptions = {} options: SendMessageOptions = {}
): Promise<void> => { ): Promise<void> => {
for (const [mapKey, clients] of connectedClients.entries()) { const sendPlans = await Promise.all(
const [type, id] = mapKey.split(":"); Array.from(connectedClients.entries()).map(
const clientId = mapKey; // mapKey is the clientId async ([mapKey, clients]) => {
if (!(excludeClientId && clientId === excludeClientId)) { const clientId = mapKey; // mapKey is the clientId
// Handle config version per client if (excludeClientId && clientId === excludeClientId) {
let configVersion = await getClientConfigVersion(clientId); return null;
if (options.incrementConfigVersion) { }
configVersion = await incrementClientConfigVersion(clientId);
}
// Add config version to message let configVersion = await getClientConfigVersion(clientId);
const messageWithVersion = { if (options.incrementConfigVersion) {
...message, configVersion =
configVersion await incrementClientConfigVersion(clientId);
}; }
if (options.compress) { return {
const compressed = zlib.gzipSync( clients,
Buffer.from(JSON.stringify(messageWithVersion), "utf8") messageWithVersion: {
); ...message,
clients.forEach((client) => { configVersion
if (client.readyState === WebSocket.OPEN) {
client.send(compressed);
} }
}); };
} else {
clients.forEach((client) => {
if (client.readyState === WebSocket.OPEN) {
client.send(JSON.stringify(messageWithVersion));
}
});
} }
)
);
for (const plan of sendPlans) {
if (!plan) {
continue;
}
if (options.compress) {
const compressed = zlib.gzipSync(
Buffer.from(JSON.stringify(plan.messageWithVersion), "utf8")
);
plan.clients.forEach((client) => {
if (client.readyState === WebSocket.OPEN) {
client.send(compressed);
}
});
} else {
const messageString = JSON.stringify(plan.messageWithVersion);
plan.clients.forEach((client) => {
if (client.readyState === WebSocket.OPEN) {
client.send(messageString);
}
});
} }
} }
}; };
@@ -711,7 +725,12 @@ const sendToClient = async (
); );
// Try to send locally first // Try to send locally first
const localSent = await sendToClientLocal(clientId, message, options); const localSent = await sendToClientLocal(
clientId,
message,
options,
configVersion
);
// Only send via Redis if the client is not connected locally and Redis is enabled // Only send via Redis if the client is not connected locally and Redis is enabled
if (!localSent && redisManager.isRedisEnabled()) { if (!localSent && redisManager.isRedisEnabled()) {
@@ -745,15 +764,34 @@ const sendToClientsBatch = async (
} }
const remoteEntries: { targetClientId: string; message: WSMessage }[] = []; const remoteEntries: { targetClientId: string; message: WSMessage }[] = [];
const clientsWithIncrement = new Set(
entries
.filter((entry) => !!entry.options?.incrementConfigVersion)
.map((entry) => entry.clientId)
);
const nonIncrementOnlyClientIds = Array.from(
new Set(
entries
.map((entry) => entry.clientId)
.filter((clientId) => !clientsWithIncrement.has(clientId))
)
);
const stableConfigVersionByClient = new Map<string, number | undefined>(
await Promise.all(
nonIncrementOnlyClientIds.map(
async (clientId) =>
[clientId, await getClientConfigVersion(clientId)] as const
)
)
);
for (const entry of entries) { for (const entry of entries) {
const options = entry.options || {}; const options = entry.options || {};
const { clientId, message } = entry; const { clientId, message } = entry;
let configVersion = await getClientConfigVersion(clientId); const configVersion = options.incrementConfigVersion
if (options.incrementConfigVersion) { ? await incrementClientConfigVersion(clientId)
configVersion = await incrementClientConfigVersion(clientId); : stableConfigVersionByClient.get(clientId);
}
logger.debug( logger.debug(
`sendToClientsBatch: Message type ${message.type} queued for clientId ${clientId} (new configVersion: ${configVersion})` `sendToClientsBatch: Message type ${message.type} queued for clientId ${clientId} (new configVersion: ${configVersion})`