Add batch messaging functions to rebuild function

This commit is contained in:
Owen
2026-06-21 17:20:07 -04:00
parent 22ac711dc6
commit ee42846c90
7 changed files with 767 additions and 119 deletions

View File

@@ -1,4 +1,4 @@
import { sendToClient } from "#dynamic/routers/ws";
import { sendToClient, sendToClientsBatch } from "#dynamic/routers/ws";
import { db, newts, olms } from "@server/db";
import {
Alias,
@@ -8,7 +8,7 @@ import {
} from "@server/lib/ip";
import { canCompress } from "@server/lib/clientVersionChecks";
import logger from "@server/logger";
import { eq } from "drizzle-orm";
import { eq, inArray } from "drizzle-orm";
import semver from "semver";
const NEWT_V2_TARGETS_VERSION = ">=1.10.3";
@@ -59,6 +59,42 @@ export async function addTargets(
);
}
export async function addTargetsBatch(
entries: {
newtId: string;
targets: SubnetProxyTarget[] | SubnetProxyTargetV2[];
version?: string | null;
}[]
) {
if (entries.length === 0) {
return;
}
const resolved = await Promise.all(
entries.map(async (entry) => ({
...entry,
targets: await convertTargetsIfNecessary(
entry.newtId,
entry.targets
)
}))
);
await sendToClientsBatch(
resolved.map((entry) => ({
clientId: entry.newtId,
message: {
type: `newt/wg/targets/add`,
data: entry.targets
},
options: {
incrementConfigVersion: true,
compress: canCompress(entry.version, "newt")
}
}))
);
}
export async function removeTargets(
newtId: string,
targets: SubnetProxyTarget[] | SubnetProxyTargetV2[],
@@ -76,6 +112,42 @@ export async function removeTargets(
);
}
export async function removeTargetsBatch(
entries: {
newtId: string;
targets: SubnetProxyTarget[] | SubnetProxyTargetV2[];
version?: string | null;
}[]
) {
if (entries.length === 0) {
return;
}
const resolved = await Promise.all(
entries.map(async (entry) => ({
...entry,
targets: await convertTargetsIfNecessary(
entry.newtId,
entry.targets
)
}))
);
await sendToClientsBatch(
resolved.map((entry) => ({
clientId: entry.newtId,
message: {
type: `newt/wg/targets/remove`,
data: entry.targets
},
options: {
incrementConfigVersion: true,
compress: canCompress(entry.version, "newt")
}
}))
);
}
export async function updateTargets(
newtId: string,
targets: {
@@ -201,6 +273,171 @@ export async function removePeerData(
});
}
const resolveOlmTargets = async (
entries: {
clientId: number;
olmId?: string;
version?: string | null;
}[]
) => {
const unresolvedClientIds = entries
.filter((entry) => !entry.olmId)
.map((entry) => entry.clientId);
const olmMap = new Map<number, { olmId: string; version: string | null }>();
if (unresolvedClientIds.length > 0) {
const olmRows = await db
.select({
clientId: olms.clientId,
olmId: olms.olmId,
version: olms.version
})
.from(olms)
.where(inArray(olms.clientId, unresolvedClientIds));
for (const row of olmRows) {
if (row.clientId !== null) {
olmMap.set(row.clientId, {
olmId: row.olmId,
version: row.version
});
}
}
}
return entries
.map((entry) => {
if (entry.olmId) {
return {
clientId: entry.clientId,
olmId: entry.olmId,
version: entry.version
};
}
const resolved = olmMap.get(entry.clientId);
if (!resolved) {
return null;
}
return {
clientId: entry.clientId,
olmId: resolved.olmId,
version: entry.version ?? resolved.version
};
})
.filter((entry) => entry !== null);
};
export async function addPeerDataBatch(
entries: {
clientId: number;
siteId: number;
remoteSubnets: string[];
aliases: Alias[];
olmId?: string;
version?: string | null;
}[]
) {
if (entries.length === 0) {
return;
}
const resolvedTargets = await resolveOlmTargets(entries);
if (resolvedTargets.length === 0) {
return;
}
const payloads = entries
.map((entry) => {
const resolved = resolvedTargets.find(
(target) => target.clientId === entry.clientId
);
if (!resolved) {
return null;
}
return {
clientId: resolved.olmId,
message: {
type: `olm/wg/peer/data/add`,
data: {
siteId: entry.siteId,
remoteSubnets: entry.remoteSubnets,
aliases: entry.aliases
}
},
options: {
incrementConfigVersion: true,
compress: canCompress(resolved.version, "olm")
}
};
})
.filter((entry) => entry !== null);
if (payloads.length === 0) {
return;
}
await sendToClientsBatch(payloads);
}
export async function removePeerDataBatch(
entries: {
clientId: number;
siteId: number;
remoteSubnets: string[];
aliases: Alias[];
olmId?: string;
version?: string | null;
}[]
) {
if (entries.length === 0) {
return;
}
const resolvedTargets = await resolveOlmTargets(entries);
if (resolvedTargets.length === 0) {
return;
}
const payloads = entries
.map((entry) => {
const resolved = resolvedTargets.find(
(target) => target.clientId === entry.clientId
);
if (!resolved) {
return null;
}
return {
clientId: resolved.olmId,
message: {
type: `olm/wg/peer/data/remove`,
data: {
siteId: entry.siteId,
remoteSubnets: entry.remoteSubnets,
aliases: entry.aliases
}
},
options: {
incrementConfigVersion: true,
compress: canCompress(resolved.version, "olm")
}
};
})
.filter((entry) => entry !== null);
if (payloads.length === 0) {
return;
}
await sendToClientsBatch(payloads);
}
export async function updatePeerData(
clientId: number,
siteId: number,

View File

@@ -1,7 +1,7 @@
import { db, Site } from "@server/db";
import { newts, sites } from "@server/db";
import { eq } from "drizzle-orm";
import { sendToClient } from "#dynamic/routers/ws";
import { sendToClient, sendToClientsBatch } from "#dynamic/routers/ws";
import logger from "@server/logger";
export async function addPeer(
@@ -36,10 +36,14 @@ export async function addPeer(
newtId = newt.newtId;
}
await sendToClient(newtId, {
type: "newt/wg/peer/add",
data: peer
}, { incrementConfigVersion: true }).catch((error) => {
await sendToClient(
newtId,
{
type: "newt/wg/peer/add",
data: peer
},
{ incrementConfigVersion: true }
).catch((error) => {
logger.warn(`Error sending message:`, error);
});
@@ -76,12 +80,16 @@ export async function deletePeer(
newtId = newt.newtId;
}
await sendToClient(newtId, {
type: "newt/wg/peer/remove",
data: {
publicKey
}
}, { incrementConfigVersion: true }).catch((error) => {
await sendToClient(
newtId,
{
type: "newt/wg/peer/remove",
data: {
publicKey
}
},
{ incrementConfigVersion: true }
).catch((error) => {
logger.warn(`Error sending message:`, error);
});
@@ -90,6 +98,35 @@ export async function deletePeer(
return site;
}
export async function deletePeersBatch(
peers: {
siteId: number;
publicKey: string;
newtId: string;
}[]
) {
if (peers.length === 0) {
return;
}
await sendToClientsBatch(
peers.map((peer) => ({
clientId: peer.newtId,
message: {
type: "newt/wg/peer/remove",
data: {
publicKey: peer.publicKey
}
},
options: { incrementConfigVersion: true }
}))
).catch((error) => {
logger.warn(`Error sending batched newt peer removals:`, error);
});
logger.info(`Deleted ${peers.length} peer(s) from newts (batch)`);
}
export async function updatePeer(
siteId: number,
publicKey: string,
@@ -122,13 +159,17 @@ export async function updatePeer(
newtId = newt.newtId;
}
await sendToClient(newtId, {
type: "newt/wg/peer/update",
data: {
publicKey,
...peer
}
}, { incrementConfigVersion: true }).catch((error) => {
await sendToClient(
newtId,
{
type: "newt/wg/peer/update",
data: {
publicKey,
...peer
}
},
{ incrementConfigVersion: true }
).catch((error) => {
logger.warn(`Error sending message:`, error);
});

View File

@@ -1,9 +1,9 @@
import { sendToClient } from "#dynamic/routers/ws";
import { sendToClient, sendToClientsBatch } from "#dynamic/routers/ws";
import { clientSitesAssociationsCache, db, olms } from "@server/db";
import { canCompress } from "@server/lib/clientVersionChecks";
import config from "@server/lib/config";
import logger from "@server/logger";
import { and, eq } from "drizzle-orm";
import { and, eq, inArray } from "drizzle-orm";
import { Alias } from "yaml";
export async function addPeer(
@@ -205,3 +205,150 @@ export async function initPeerAddHandshake(
`Initiated peer add handshake for site ${peer.siteId} to olm ${olmId}`
);
}
export async function deletePeersBatch(
peers: {
clientId: number;
siteId: number;
publicKey: string;
olmId?: string;
version?: string | null;
}[]
) {
if (peers.length === 0) {
return;
}
const unresolvedClientIds = peers
.filter((peer) => !peer.olmId)
.map((peer) => peer.clientId);
const olmByClientId = new Map<
number,
{ olmId: string; version: string | null }
>();
if (unresolvedClientIds.length > 0) {
const olmRows = await db
.select({
clientId: olms.clientId,
olmId: olms.olmId,
version: olms.version
})
.from(olms)
.where(inArray(olms.clientId, unresolvedClientIds));
for (const row of olmRows) {
if (row.clientId !== null) {
olmByClientId.set(row.clientId, {
olmId: row.olmId,
version: row.version
});
}
}
}
const batchPayloads = peers
.map((peer) => {
const resolved = peer.olmId
? { olmId: peer.olmId, version: peer.version ?? null }
: olmByClientId.get(peer.clientId);
if (!resolved) {
return null;
}
return {
clientId: resolved.olmId,
message: {
type: "olm/wg/peer/remove",
data: {
publicKey: peer.publicKey,
siteId: peer.siteId
}
},
options: {
incrementConfigVersion: true,
compress: canCompress(
peer.version ?? resolved.version,
"olm"
)
}
};
})
.filter((payload) => payload !== null);
if (batchPayloads.length === 0) {
return;
}
await sendToClientsBatch(batchPayloads).catch((error) => {
logger.warn(`Error sending batched olm peer removals:`, error);
});
logger.info(`Deleted ${batchPayloads.length} peer(s) from olms (batch)`);
}
export async function initPeerAddHandshakeBatch(
handshakes: {
clientId: number;
peer: {
siteId: number;
exitNode: {
publicKey: string;
endpoint: string;
};
};
olmId: string;
chainId?: string;
}[]
) {
if (handshakes.length === 0) {
return;
}
await sendToClientsBatch(
handshakes.map((item) => ({
clientId: item.olmId,
message: {
type: "olm/wg/peer/holepunch/site/add",
data: {
siteId: item.peer.siteId,
exitNode: {
publicKey: item.peer.exitNode.publicKey,
relayPort:
config.getRawConfig().gerbil.clients_start_port,
endpoint: item.peer.exitNode.endpoint
},
chainId: item.chainId
}
},
options: { incrementConfigVersion: true }
}))
).catch((error) => {
logger.warn(`Error sending batched olm handshakes:`, error);
});
await Promise.all(
handshakes.map((item) =>
db
.update(clientSitesAssociationsCache)
.set({ isJitMode: false })
.where(
and(
eq(
clientSitesAssociationsCache.clientId,
item.clientId
),
eq(
clientSitesAssociationsCache.siteId,
item.peer.siteId
)
)
)
)
);
logger.info(
`Initiated ${handshakes.length} peer add handshake(s) to olms (batch)`
);
}

View File

@@ -76,6 +76,12 @@ export interface SendMessageOptions {
compress?: boolean;
}
export interface BatchSendMessage {
clientId: string;
message: WSMessage;
options?: SendMessageOptions;
}
// Redis message types for cross-node communication
export type RedisMessage =
| {

View File

@@ -26,7 +26,8 @@ import {
WebSocketRequest,
WSMessage,
AuthenticatedWebSocket,
SendMessageOptions
SendMessageOptions,
BatchSendMessage
} from "./types";
import { validateSessionToken } from "@server/auth/sessions/app";
@@ -212,6 +213,20 @@ const sendToClient = async (
return localSent;
};
const sendToClientsBatch = async (
entries: BatchSendMessage[]
): Promise<void> => {
if (entries.length === 0) {
return;
}
await Promise.all(
entries.map((entry) =>
sendToClient(entry.clientId, entry.message, entry.options)
)
);
};
const broadcastToAllExcept = async (
message: WSMessage,
excludeClientId?: string,
@@ -552,6 +567,7 @@ export {
router,
handleWSUpgrade,
sendToClient,
sendToClientsBatch,
broadcastToAllExcept,
connectedClients,
hasActiveConnections,