This commit is contained in:
Owen
2026-06-24 11:54:56 -04:00
parent bc63747efe
commit 034bcbd271
3 changed files with 387 additions and 253 deletions

View File

@@ -38,7 +38,9 @@ import {
addPeerDataBatch,
addTargetsBatch as addSubnetProxyTargetsBatch,
removePeerDataBatch,
removeTargetsBatch as removeSubnetProxyTargetsBatch
removeTargetsBatch as removeSubnetProxyTargetsBatch,
updatePeerDataBatch,
updateTargets
} from "@server/routers/client/targets";
import { lockManager } from "#dynamic/lib/lock";
import { rebuildQueue } from "#dynamic/lib/rebuildQueue";
@@ -162,15 +164,10 @@ export async function getClientSiteResourceAccess(
export async function rebuildClientAssociationsFromSiteResource(
siteResource: SiteResource
) {
const trx = primaryDb;
try {
return await lockManager.withLock(
`rebuild-client-associations:site-resource:${siteResource.siteResourceId}`,
() =>
rebuildClientAssociationsFromSiteResourceImpl(
siteResource,
trx
),
() => rebuildClientAssociationsFromSiteResourceImpl(siteResource),
REBUILD_ASSOCIATIONS_LOCK_TTL_MS
);
} catch (err: any) {
@@ -192,15 +189,10 @@ export async function rebuildClientAssociationsFromSiteResource(
}
async function rebuildClientAssociationsFromSiteResourceImpl(
siteResource: SiteResource,
trx: Transaction | typeof db = db
): Promise<{
mergedAllClients: {
clientId: number;
pubKey: string | null;
subnet: string | null;
}[];
}> {
siteResource: SiteResource
) {
const trx = primaryDb;
logger.debug(
`rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] START siteResourceId=${siteResource.siteResourceId} networkId=${siteResource.networkId} orgId=${siteResource.orgId}`
);
@@ -485,10 +477,6 @@ async function rebuildClientAssociationsFromSiteResourceImpl(
clientSiteResourcesToRemove,
trx
);
return {
mergedAllClients
};
}
async function handleMessagesForSiteClients(
@@ -1042,6 +1030,312 @@ async function handleSubnetProxyTargetUpdates(
await Promise.all([...proxyJobs, ...olmJobs]);
}
export async function handleMessagingForUpdatedSiteResource(
existingSiteResource: SiteResource | undefined,
updatedSiteResource: SiteResource,
existingSiteIds: number[],
updatedSiteIds: number[],
trx: Transaction | typeof db = db
) {
logger.debug(
"handleMessagingForUpdatedSiteResource: existingSiteResource is: ",
existingSiteResource
);
logger.debug(
"handleMessagingForUpdatedSiteResource: updatedSiteResource is: ",
updatedSiteResource
);
const allSiteIds = [...new Set([...existingSiteIds, ...updatedSiteIds])];
const newtsForSites =
allSiteIds.length > 0
? await trx
.select()
.from(newts)
.where(inArray(newts.siteId, allSiteIds))
: [];
const newtBySiteId = new Map(
newtsForSites.map((newt) => [newt.siteId, newt])
);
// get all of the clients from the cache
const targets = await generateSubnetProxyTargetV2(
updatedSiteResource,
mergedAllClients
);
const oldDestinationStillInUseClientSitePairs = new Set<string>();
if (
existingSiteResource?.destination &&
allSiteIds.length > 0 &&
mergedAllClientIds.length > 0
) {
const oldDestinationStillInUseRows = await trx
.select({
clientId: clientSiteResourcesAssociationsCache.clientId,
siteId: siteNetworks.siteId
})
.from(siteResources)
.innerJoin(
clientSiteResourcesAssociationsCache,
eq(
clientSiteResourcesAssociationsCache.siteResourceId,
siteResources.siteResourceId
)
)
.innerJoin(
siteNetworks,
eq(siteNetworks.networkId, siteResources.networkId)
)
.where(
and(
inArray(
clientSiteResourcesAssociationsCache.clientId,
mergedAllClientIds
),
inArray(siteNetworks.siteId, allSiteIds),
eq(
siteResources.destination,
existingSiteResource.destination
),
ne(
siteResources.siteResourceId,
existingSiteResource.siteResourceId
)
)
);
for (const row of oldDestinationStillInUseRows) {
oldDestinationStillInUseClientSitePairs.add(
`${row.clientId}:${row.siteId}`
);
}
}
//////////////////////////// FROM HERE DOWN WE ARE DEALING WITH REMOVING SITES
const removedSiteIds = existingSiteIds.filter(
(id) => !updatedSiteIds.includes(id)
);
const targetsToRemoveBatch: {
newtId: string;
targets: any[];
version: string | null;
}[] = [];
const peerDataRemoves: {
clientId: number;
siteId: number;
remoteSubnets: string[];
aliases: ReturnType<typeof generateAliasConfig>;
}[] = [];
if (targets) {
for (const siteId of removedSiteIds) {
const newt = newtBySiteId.get(siteId);
if (!newt) {
continue;
}
targetsToRemoveBatch.push({
newtId: newt.newtId,
targets: targets,
version: newt.version
});
for (const client of mergedAllClients) {
const oldDestinationStillInUseByASite =
oldDestinationStillInUseClientSitePairs.has(
`${client.clientId}:${siteId}`
);
peerDataRemoves.push({
// this might happen twice after the rebuild function but that is okay
clientId: client.clientId,
siteId,
remoteSubnets: !oldDestinationStillInUseByASite
? generateRemoteSubnets([updatedSiteResource])
: [],
aliases: generateAliasConfig([updatedSiteResource])
});
}
}
}
removeSubnetProxyTargetsBatch(targetsToRemoveBatch);
removePeerDataBatch(peerDataRemoves);
//////////////////////////// FROM HERE DOWN WE ARE DEALING WITH ADDING NEW SITES
const addedSiteIds = updatedSiteIds.filter(
(id) => !existingSiteIds.includes(id)
);
const targetsToAddBatch: {
newtId: string;
targets: any[];
version: string | null;
}[] = [];
const peerDataAdds: {
clientId: number;
siteId: number;
remoteSubnets: string[];
aliases: ReturnType<typeof generateAliasConfig>;
}[] = [];
if (targets) {
for (const siteId of addedSiteIds) {
const newt = newtBySiteId.get(siteId);
if (!newt) {
continue;
}
targetsToAddBatch.push({
newtId: newt.newtId,
targets: targets,
version: newt.version
});
for (const client of mergedAllClients) {
peerDataAdds.push({
clientId: client.clientId,
siteId,
remoteSubnets: generateRemoteSubnets([updatedSiteResource]),
aliases: generateAliasConfig([updatedSiteResource])
});
}
}
}
addSubnetProxyTargetsBatch(targetsToAddBatch);
addPeerDataBatch(peerDataAdds);
//////////////////////////// FROM HERE DOWN WE ARE DEALING WITH UPDATING THE EXISTING SITES
const unchangedSiteIds = existingSiteIds.filter((id) =>
updatedSiteIds.includes(id)
);
// after everything is rebuilt above we still need to update the targets and remote subnets if the destination changed
const destinationChanged =
existingSiteResource &&
existingSiteResource.destination !== updatedSiteResource.destination;
const destinationPortChanged =
existingSiteResource &&
existingSiteResource.destinationPort !==
updatedSiteResource.destinationPort;
const aliasChanged =
existingSiteResource &&
existingSiteResource.alias !== updatedSiteResource.alias;
const fullDomainChanged =
existingSiteResource &&
existingSiteResource.fullDomain !== updatedSiteResource.fullDomain;
const sslChanged =
existingSiteResource &&
existingSiteResource.ssl !== updatedSiteResource.ssl;
const portRangesChanged =
existingSiteResource &&
(existingSiteResource.tcpPortRangeString !==
updatedSiteResource.tcpPortRangeString ||
existingSiteResource.udpPortRangeString !==
updatedSiteResource.udpPortRangeString ||
existingSiteResource.disableIcmp !==
updatedSiteResource.disableIcmp);
// if the existingSiteResource is undefined (new resource) we don't need to do anything here, the rebuild above handled it all
if (
destinationChanged ||
aliasChanged ||
fullDomainChanged ||
sslChanged ||
portRangesChanged ||
destinationPortChanged
) {
const shouldUpdateTargets =
destinationChanged ||
sslChanged ||
portRangesChanged ||
fullDomainChanged ||
destinationPortChanged;
const oldTargets = shouldUpdateTargets
? await generateSubnetProxyTargetV2(
existingSiteResource,
mergedAllClients
)
: [];
const newTargets = shouldUpdateTargets
? await generateSubnetProxyTargetV2(
updatedSiteResource,
mergedAllClients
)
: [];
const peerDataUpdateBatch: Parameters<typeof updatePeerDataBatch>[0] =
[];
for (const siteId of unchangedSiteIds) {
const newt = newtBySiteId.get(siteId);
if (!newt) {
throw new Error(
"Newt not found for site during site resource update"
);
}
// Only update targets on newt if these items change
if (shouldUpdateTargets) {
await updateTargets(
newt.newtId,
{
oldTargets: oldTargets ? oldTargets : [],
newTargets: newTargets ? newTargets : []
},
newt.version
);
}
for (const client of mergedAllClients) {
// does this client have access to another resource on this site that has the same destination still? if so we dont want to remove it from their olm yet
if (!existingSiteResource.destination) {
continue;
}
const oldDestinationStillInUseByASite =
oldDestinationStillInUseClientSitePairs.has(
`${client.clientId}:${siteId}`
);
// we also need to update the remote subnets on the olms for each client that has access to this site
peerDataUpdateBatch.push({
clientId: client.clientId,
siteId,
remoteSubnets: destinationChanged
? {
oldRemoteSubnets: !oldDestinationStillInUseByASite
? generateRemoteSubnets([
existingSiteResource
])
: [],
newRemoteSubnets: generateRemoteSubnets([
updatedSiteResource
])
}
: undefined,
aliases:
aliasChanged || fullDomainChanged // the full domain is sent down as an alias
? {
oldAliases: generateAliasConfig([
existingSiteResource
]),
newAliases: generateAliasConfig([
updatedSiteResource
])
}
: undefined
});
}
}
updatePeerDataBatch(peerDataUpdateBatch);
}
}
export async function rebuildClientAssociationsFromClient(
client: Client
): Promise<void> {

View File

@@ -438,6 +438,70 @@ export async function removePeerDataBatch(
await sendToClientsBatch(payloads);
}
export async function updatePeerDataBatch(
entries: {
clientId: number;
siteId: number;
remoteSubnets:
| {
oldRemoteSubnets: string[];
newRemoteSubnets: string[];
}
| undefined;
aliases:
| {
oldAliases: Alias[];
newAliases: Alias[];
}
| undefined;
olmId?: string;
version?: string | null;
}[]
) {
if (entries.length === 0) {
return;
}
const resolvedTargets = await resolveOlmTargets(entries);
if (resolvedTargets.length === 0) {
return;
}
const payloads = entries
.map((entry) => {
const resolved = resolvedTargets.find(
(target) => target.clientId === entry.clientId
);
if (!resolved) {
return null;
}
return {
clientId: resolved.olmId,
message: {
type: `olm/wg/peer/data/update`,
data: {
siteId: entry.siteId,
...entry.remoteSubnets,
...entry.aliases
}
},
options: {
incrementConfigVersion: true,
compress: canCompress(resolved.version, "olm")
}
};
})
.filter((entry) => entry !== null);
if (payloads.length === 0) {
return;
}
await sendToClientsBatch(payloads);
}
export async function updatePeerData(
clientId: number,
siteId: number,

View File

@@ -1,8 +1,6 @@
import {
clientSiteResources,
clientSiteResourcesAssociationsCache,
db,
newts,
orgs,
roles,
roleSiteResources,
@@ -10,10 +8,7 @@ import {
SiteResource,
siteResources,
sites,
networks,
Transaction,
userSiteResources,
primaryDb
userSiteResources
} from "@server/db";
import { isLicensedOrSubscribed } from "#dynamic/lib/isLicencedOrSubscribed";
import { TierFeature, tierMatrix } from "@server/lib/billing/tierMatrix";
@@ -21,18 +16,8 @@ import { validateAndConstructDomain } from "@server/lib/domainUtils";
import response from "@server/lib/response";
import { eq, and, ne, inArray } from "drizzle-orm";
import { OpenAPITags, registry } from "@server/openApi";
import { updatePeerData, updateTargets } from "@server/routers/client/targets";
import {
generateAliasConfig,
generateRemoteSubnets,
generateSubnetProxyTargetV2,
isIpInCidr,
portRangeStringSchema
} from "@server/lib/ip";
import {
getClientSiteResourceAccess,
rebuildClientAssociationsFromSiteResource
} from "@server/lib/rebuildClientAssociations";
import { isIpInCidr, portRangeStringSchema } from "@server/lib/ip";
import { rebuildClientAssociationsFromSiteResource } from "@server/lib/rebuildClientAssociations";
import logger from "@server/logger";
import HttpCode from "@server/types/HttpCode";
import { NextFunction, Request, Response } from "express";
@@ -390,7 +375,7 @@ export async function updateSiteResource(
);
}
const existingSiteIds = existingSiteResource.networkId
const existingSiteNetworks = existingSiteResource.networkId
? await db
.select()
.from(siteNetworks)
@@ -398,7 +383,7 @@ export async function updateSiteResource(
eq(siteNetworks.networkId, existingSiteResource.networkId)
)
: [];
const existingSiteIdSet = new Set(existingSiteIds.map((s) => s.siteId));
const existingSiteIds = existingSiteNetworks.map((sn) => sn.siteId);
let fullDomain: string | null = null;
let finalSubdomain: string | null = null;
@@ -464,6 +449,7 @@ export async function updateSiteResource(
}
let updatedSiteResource: SiteResource | undefined;
let updatedSiteIds: number[] = [];
await db.transaction(async (trx) => {
// Update the site resource
const sshPamSet =
@@ -534,6 +520,7 @@ export async function updateSiteResource(
siteId: siteId,
networkId: updatedSiteResource.networkId!
});
updatedSiteIds.push(siteId);
}
await trx
@@ -616,11 +603,8 @@ export async function updateSiteResource(
handleMessagingForUpdatedSiteResource(
existingSiteResource,
updatedSiteResource,
Array.from(existingSiteIdSet).map((siteId: number) => ({
// we already added to the new sites above in the rebuild function so we only need to update the ones that did not change
siteId,
orgId: existingSiteResource.orgId
}))
existingSiteIds,
updatedSiteIds
).catch((e) => {
logger.error(
`Failed to handle messaging for updated site resource ${siteResourceId}. Error: ${e}`
@@ -644,211 +628,3 @@ export async function updateSiteResource(
);
}
}
export async function handleMessagingForUpdatedSiteResource(
existingSiteResource: SiteResource | undefined,
updatedSiteResource: SiteResource,
sites: { siteId: number; orgId: string }[]
) {
const trx = primaryDb;
logger.debug(
"handleMessagingForUpdatedSiteResource: existingSiteResource is: ",
existingSiteResource
);
logger.debug(
"handleMessagingForUpdatedSiteResource: updatedSiteResource is: ",
updatedSiteResource
);
const { sitesList, mergedAllClients, mergedAllClientIds } =
await getClientSiteResourceAccess(
existingSiteResource || updatedSiteResource,
trx
);
const siteIds = sites.map((site) => site.siteId);
// after everything is rebuilt above we still need to update the targets and remote subnets if the destination changed
const destinationChanged =
existingSiteResource &&
existingSiteResource.destination !== updatedSiteResource.destination;
const destinationPortChanged =
existingSiteResource &&
existingSiteResource.destinationPort !==
updatedSiteResource.destinationPort;
const aliasChanged =
existingSiteResource &&
existingSiteResource.alias !== updatedSiteResource.alias;
const fullDomainChanged =
existingSiteResource &&
existingSiteResource.fullDomain !== updatedSiteResource.fullDomain;
const sslChanged =
existingSiteResource &&
existingSiteResource.ssl !== updatedSiteResource.ssl;
const portRangesChanged =
existingSiteResource &&
(existingSiteResource.tcpPortRangeString !==
updatedSiteResource.tcpPortRangeString ||
existingSiteResource.udpPortRangeString !==
updatedSiteResource.udpPortRangeString ||
existingSiteResource.disableIcmp !==
updatedSiteResource.disableIcmp);
// if the existingSiteResource is undefined (new resource) we don't need to do anything here, the rebuild above handled it all
if (
destinationChanged ||
aliasChanged ||
fullDomainChanged ||
sslChanged ||
portRangesChanged ||
destinationPortChanged
) {
const newtsForSites =
siteIds.length > 0
? await trx
.select()
.from(newts)
.where(inArray(newts.siteId, siteIds))
: [];
const newtBySiteId = new Map(
newtsForSites.map((newt) => [newt.siteId, newt])
);
const oldDestinationStillInUseClientSitePairs = new Set<string>();
if (
existingSiteResource?.destination &&
siteIds.length > 0 &&
mergedAllClientIds.length > 0
) {
const oldDestinationStillInUseRows = await trx
.select({
clientId: clientSiteResourcesAssociationsCache.clientId,
siteId: siteNetworks.siteId
})
.from(siteResources)
.innerJoin(
clientSiteResourcesAssociationsCache,
eq(
clientSiteResourcesAssociationsCache.siteResourceId,
siteResources.siteResourceId
)
)
.innerJoin(
siteNetworks,
eq(siteNetworks.networkId, siteResources.networkId)
)
.where(
and(
inArray(
clientSiteResourcesAssociationsCache.clientId,
mergedAllClientIds
),
inArray(siteNetworks.siteId, siteIds),
eq(
siteResources.destination,
existingSiteResource.destination
),
ne(
siteResources.siteResourceId,
existingSiteResource.siteResourceId
)
)
);
for (const row of oldDestinationStillInUseRows) {
oldDestinationStillInUseClientSitePairs.add(
`${row.clientId}:${row.siteId}`
);
}
}
const shouldUpdateTargets =
destinationChanged ||
sslChanged ||
portRangesChanged ||
fullDomainChanged ||
destinationPortChanged;
const oldTargets = shouldUpdateTargets
? await generateSubnetProxyTargetV2(
existingSiteResource,
mergedAllClients
)
: [];
const newTargets = shouldUpdateTargets
? await generateSubnetProxyTargetV2(
updatedSiteResource,
mergedAllClients
)
: [];
for (const site of sites) {
const newt = newtBySiteId.get(site.siteId);
if (!newt) {
throw new Error(
"Newt not found for site during site resource update"
);
}
// Only update targets on newt if these items change
if (shouldUpdateTargets) {
await updateTargets(
newt.newtId,
{
oldTargets: oldTargets ? oldTargets : [],
newTargets: newTargets ? newTargets : []
},
newt.version
);
}
const olmJobs: Promise<void>[] = [];
for (const client of mergedAllClients) {
// does this client have access to another resource on this site that has the same destination still? if so we dont want to remove it from their olm yet
if (!existingSiteResource.destination) {
continue;
}
const oldDestinationStillInUseByASite =
oldDestinationStillInUseClientSitePairs.has(
`${client.clientId}:${site.siteId}`
);
// we also need to update the remote subnets on the olms for each client that has access to this site
olmJobs.push(
updatePeerData(
// TODO: THIS SHOULD BE UPDATED TO WORK I A BATCH
client.clientId,
site.siteId,
destinationChanged
? {
oldRemoteSubnets:
!oldDestinationStillInUseByASite
? generateRemoteSubnets([
existingSiteResource
])
: [],
newRemoteSubnets: generateRemoteSubnets([
updatedSiteResource
])
}
: undefined,
aliasChanged || fullDomainChanged // the full domain is sent down as an alias
? {
oldAliases: generateAliasConfig([
existingSiteResource
]),
newAliases: generateAliasConfig([
updatedSiteResource
])
}
: undefined
)
);
}
await Promise.all(olmJobs);
}
}
}