Optimize this

This commit is contained in:
Owen
2026-05-14 11:34:09 -07:00
parent 92a06e0ea3
commit 0e0666cacf

View File

@@ -11,7 +11,7 @@ import {
ExitNode
} from "@server/db";
import { db } from "@server/db";
import { eq, and } from "drizzle-orm";
import { eq, and, inArray } from "drizzle-orm";
import HttpCode from "@server/types/HttpCode";
import createHttpError from "http-errors";
import logger from "@server/logger";
@@ -202,24 +202,29 @@ export async function updateAndGenerateEndpointDestinations(
)
);
// Update clientSites for each site on this exit node
// Format the endpoint properly for both IPv4 and IPv6
const formattedEndpoint = formatEndpoint(ip, port);
// Determine which rows actually need updating and whether the endpoint
// (as opposed to only the publicKey) changed for any of them.
const siteIdsToUpdate: number[] = [];
let endpointChanged = false;
for (const site of sitesOnExitNode) {
// logger.debug(
// `Updating site ${site.siteId} on exit node ${exitNode.exitNodeId}`
// );
// Format the endpoint properly for both IPv4 and IPv6
const formattedEndpoint = formatEndpoint(ip, port);
// if the public key or endpoint has changed, update it otherwise continue
if (
site.endpoint === formattedEndpoint &&
site.publicKey === publicKey
) {
continue;
}
siteIdsToUpdate.push(site.siteId);
if (site.endpoint !== formattedEndpoint) {
endpointChanged = true;
}
}
const [updatedClientSitesAssociationsCache] = await db
if (siteIdsToUpdate.length > 0) {
// Single bulk update for all affected rows for this client on this exit node
await db
.update(clientSitesAssociationsCache)
.set({
endpoint: formattedEndpoint,
@@ -228,24 +233,22 @@ export async function updateAndGenerateEndpointDestinations(
.where(
and(
eq(clientSitesAssociationsCache.clientId, olm.clientId),
eq(clientSitesAssociationsCache.siteId, site.siteId)
inArray(
clientSitesAssociationsCache.siteId,
siteIdsToUpdate
)
)
)
.returning();
);
if (
updatedClientSitesAssociationsCache.endpoint !==
site.endpoint && // this is the endpoint from the join table not the site
updatedClient.pubKey === publicKey // only trigger if the client's public key matches the current public key which means it has registered so we dont prematurely send the update
) {
// Only trigger downstream peer updates once per hole punch: the
// endpoint is the same for every site on this exit node, and
// handleClientEndpointChange already fans out to all connected
// sites for this client.
if (endpointChanged && updatedClient.pubKey === publicKey) {
logger.info(
`ClientSitesAssociationsCache for client ${olm.clientId} and site ${site.siteId} endpoint changed from ${site.endpoint} to ${updatedClientSitesAssociationsCache.endpoint}`
);
// Handle any additional logic for endpoint change
handleClientEndpointChange(
olm.clientId,
updatedClientSitesAssociationsCache.endpoint!
`ClientSitesAssociationsCache for client ${olm.clientId} endpoint changed to ${formattedEndpoint} for ${siteIdsToUpdate.length} site(s) on exit node ${exitNode.exitNodeId}`
);
handleClientEndpointChange(olm.clientId, formattedEndpoint);
}
}
@@ -456,11 +459,11 @@ async function handleSiteEndpointChange(siteId: number, newEndpoint: string) {
}
}
async function handleClientEndpointChange(
async function handleClientEndpointChange( // TODO: I THINK WE DONT NEED TO HIT EVERY SITE HERE BECAUSE WE ONLY NEED TO UPDATE THE SITES CONNECTED TO THIS NODE WHICH WE ALREADY HAVE FROM ABOVE
clientId: number,
newEndpoint: string
) {
// Alert all sites connected to this client that the endpoint has changed (only if NOT relayed)
// Alert all sites connected to this client that the endpoint has changed (only if NOT relayed and NOT JIT MODE)
try {
// Get client details
const [client] = await db
@@ -480,6 +483,7 @@ async function handleClientEndpointChange(
siteId: sites.siteId,
newtId: newts.newtId,
isRelayed: clientSitesAssociationsCache.isRelayed,
isJitMode: clientSitesAssociationsCache.isJitMode,
subnet: clients.subnet
})
.from(clientSitesAssociationsCache)
@@ -495,37 +499,47 @@ async function handleClientEndpointChange(
.where(
and(
eq(clientSitesAssociationsCache.clientId, clientId),
eq(clientSitesAssociationsCache.isRelayed, false)
eq(clientSitesAssociationsCache.isRelayed, false),
eq(clientSitesAssociationsCache.isJitMode, false)
)
);
// Update each non-relayed site with the new client endpoint
for (const siteData of connectedSites) {
try {
if (!siteData.subnet) {
if (connectedSites.length > 250) {
logger.warn(
`Client ${clientId} has ${connectedSites.length} connected sites so the client will be in jit mode anyway, skipping endpoint updates`
);
return;
}
// Update each non-relayed site with the new client endpoint (in parallel)
await Promise.allSettled(
connectedSites.map(async (siteData) => {
if (!siteData.subnet || !client.pubKey) {
logger.warn(
`Client ${clientId} has no subnet, skipping update for site ${siteData.siteId}`
`Client ${clientId} has no subnet or public key, skipping update for site ${siteData.siteId}`
);
continue;
return;
}
await updateNewtPeer(
siteData.siteId,
client.pubKey,
{
endpoint: newEndpoint
},
siteData.newtId
);
logger.debug(
`Updated site ${siteData.siteId} with new client ${clientId} endpoint: ${newEndpoint}`
);
} catch (error) {
logger.error(
`Failed to update site ${siteData.siteId} with new client endpoint: ${error}`
);
}
}
try {
await updateNewtPeer(
siteData.siteId,
client.pubKey,
{
endpoint: newEndpoint
},
siteData.newtId
);
logger.debug(
`Updated site ${siteData.siteId} with new client ${clientId} endpoint: ${newEndpoint}`
);
} catch (error) {
logger.error(
`Failed to update site ${siteData.siteId} with new client endpoint: ${error}`
);
}
})
);
} catch (error) {
logger.error(
`Error handling client endpoint change for client ${clientId}: ${error}`