mirror of
https://github.com/fosrl/pangolin.git
synced 2026-06-17 21:01:53 +00:00
Compare commits
1 Commits
copilot/fi
...
queue
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
06fb3685e4 |
@@ -24,6 +24,7 @@ import license from "#dynamic/license/license";
|
|||||||
import { initLogCleanupInterval } from "@server/lib/cleanupLogs";
|
import { initLogCleanupInterval } from "@server/lib/cleanupLogs";
|
||||||
import { initAcmeCertSync } from "#dynamic/lib/acmeCertSync";
|
import { initAcmeCertSync } from "#dynamic/lib/acmeCertSync";
|
||||||
import { fetchServerIp } from "@server/lib/serverIpService";
|
import { fetchServerIp } from "@server/lib/serverIpService";
|
||||||
|
import { startRebuildQueueProcessor } from "@server/lib/rebuildClientAssociations";
|
||||||
|
|
||||||
async function startServers() {
|
async function startServers() {
|
||||||
await setHostMeta();
|
await setHostMeta();
|
||||||
@@ -41,6 +42,7 @@ async function startServers() {
|
|||||||
|
|
||||||
initLogCleanupInterval();
|
initLogCleanupInterval();
|
||||||
initAcmeCertSync();
|
initAcmeCertSync();
|
||||||
|
startRebuildQueueProcessor();
|
||||||
|
|
||||||
// Start all servers
|
// Start all servers
|
||||||
const apiServer = createApiServer();
|
const apiServer = createApiServer();
|
||||||
|
|||||||
@@ -8,6 +8,7 @@ import {
|
|||||||
exitNodes,
|
exitNodes,
|
||||||
newts,
|
newts,
|
||||||
olms,
|
olms,
|
||||||
|
primaryDb,
|
||||||
roleSiteResources,
|
roleSiteResources,
|
||||||
Site,
|
Site,
|
||||||
SiteResource,
|
SiteResource,
|
||||||
@@ -40,6 +41,7 @@ import {
|
|||||||
removeTargets as removeSubnetProxyTargets
|
removeTargets as removeSubnetProxyTargets
|
||||||
} from "@server/routers/client/targets";
|
} from "@server/routers/client/targets";
|
||||||
import { lockManager } from "#dynamic/lib/lock";
|
import { lockManager } from "#dynamic/lib/lock";
|
||||||
|
import { rebuildQueue } from "#dynamic/lib/rebuildQueue";
|
||||||
|
|
||||||
// TTL for rebuild-association locks. These functions can fan out into many
|
// TTL for rebuild-association locks. These functions can fan out into many
|
||||||
// peer/proxy updates, so give them a generous window.
|
// peer/proxy updates, so give them a generous window.
|
||||||
@@ -167,11 +169,32 @@ export async function rebuildClientAssociationsFromSiteResource(
|
|||||||
subnet: string | null;
|
subnet: string | null;
|
||||||
}[];
|
}[];
|
||||||
}> {
|
}> {
|
||||||
|
try {
|
||||||
return await lockManager.withLock(
|
return await lockManager.withLock(
|
||||||
`rebuild-client-associations:site-resource:${siteResource.siteResourceId}`,
|
`rebuild-client-associations:site-resource:${siteResource.siteResourceId}`,
|
||||||
() => rebuildClientAssociationsFromSiteResourceImpl(siteResource, trx),
|
() =>
|
||||||
|
rebuildClientAssociationsFromSiteResourceImpl(
|
||||||
|
siteResource,
|
||||||
|
trx
|
||||||
|
),
|
||||||
REBUILD_ASSOCIATIONS_LOCK_TTL_MS
|
REBUILD_ASSOCIATIONS_LOCK_TTL_MS
|
||||||
);
|
);
|
||||||
|
} catch (err: any) {
|
||||||
|
if (
|
||||||
|
typeof err?.message === "string" &&
|
||||||
|
err.message.startsWith("Failed to acquire lock")
|
||||||
|
) {
|
||||||
|
logger.warn(
|
||||||
|
`rebuildClientAssociations: could not acquire lock for site resource ${siteResource.siteResourceId}, queuing for deferred processing`
|
||||||
|
);
|
||||||
|
await rebuildQueue.enqueue({
|
||||||
|
type: "site-resource",
|
||||||
|
id: siteResource.siteResourceId
|
||||||
|
});
|
||||||
|
return { mergedAllClients: [] };
|
||||||
|
}
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async function rebuildClientAssociationsFromSiteResourceImpl(
|
async function rebuildClientAssociationsFromSiteResourceImpl(
|
||||||
@@ -956,11 +979,28 @@ export async function rebuildClientAssociationsFromClient(
|
|||||||
client: Client,
|
client: Client,
|
||||||
trx: Transaction | typeof db = db
|
trx: Transaction | typeof db = db
|
||||||
): Promise<void> {
|
): Promise<void> {
|
||||||
|
try {
|
||||||
return await lockManager.withLock(
|
return await lockManager.withLock(
|
||||||
`rebuild-client-associations:client:${client.clientId}`,
|
`rebuild-client-associations:client:${client.clientId}`,
|
||||||
() => rebuildClientAssociationsFromClientImpl(client, trx),
|
() => rebuildClientAssociationsFromClientImpl(client, trx),
|
||||||
REBUILD_ASSOCIATIONS_LOCK_TTL_MS
|
REBUILD_ASSOCIATIONS_LOCK_TTL_MS
|
||||||
);
|
);
|
||||||
|
} catch (err: any) {
|
||||||
|
if (
|
||||||
|
typeof err?.message === "string" &&
|
||||||
|
err.message.startsWith("Failed to acquire lock")
|
||||||
|
) {
|
||||||
|
logger.warn(
|
||||||
|
`rebuildClientAssociations: could not acquire lock for client ${client.clientId}, queuing for deferred processing`
|
||||||
|
);
|
||||||
|
await rebuildQueue.enqueue({
|
||||||
|
type: "client",
|
||||||
|
id: client.clientId
|
||||||
|
});
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async function rebuildClientAssociationsFromClientImpl(
|
async function rebuildClientAssociationsFromClientImpl(
|
||||||
@@ -1906,3 +1946,47 @@ export async function cleanupSiteAssociations(
|
|||||||
|
|
||||||
logger.debug(`cleanupSiteAssociations: DONE siteId=${siteId}`);
|
logger.debug(`cleanupSiteAssociations: DONE siteId=${siteId}`);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Start the background rebuild queue processor. This should be called once
|
||||||
|
* during server startup. Only one server instance at a time will actively
|
||||||
|
* consume the queue (enforced via a distributed Redis lock); all other
|
||||||
|
* instances will poll and wait until the lock becomes available.
|
||||||
|
*/
|
||||||
|
export function startRebuildQueueProcessor(): void {
|
||||||
|
rebuildQueue.startProcessing({
|
||||||
|
onSiteResource: async (siteResourceId: number) => {
|
||||||
|
const [siteResource] = await primaryDb
|
||||||
|
.select()
|
||||||
|
.from(siteResources)
|
||||||
|
.where(eq(siteResources.siteResourceId, siteResourceId));
|
||||||
|
|
||||||
|
if (!siteResource) {
|
||||||
|
logger.warn(
|
||||||
|
`Rebuild queue: site resource ${siteResourceId} not found, skipping`
|
||||||
|
);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
await rebuildClientAssociationsFromSiteResource(
|
||||||
|
siteResource,
|
||||||
|
primaryDb
|
||||||
|
);
|
||||||
|
},
|
||||||
|
onClient: async (clientId: number) => {
|
||||||
|
const [client] = await primaryDb
|
||||||
|
.select()
|
||||||
|
.from(clients)
|
||||||
|
.where(eq(clients.clientId, clientId));
|
||||||
|
|
||||||
|
if (!client) {
|
||||||
|
logger.warn(
|
||||||
|
`Rebuild queue: client ${clientId} not found, skipping`
|
||||||
|
);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
await rebuildClientAssociationsFromClient(client, primaryDb);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|||||||
23
server/lib/rebuildQueue.ts
Normal file
23
server/lib/rebuildQueue.ts
Normal file
@@ -0,0 +1,23 @@
|
|||||||
|
export type RebuildJobType = "site-resource" | "client";
|
||||||
|
|
||||||
|
export interface RebuildJob {
|
||||||
|
type: RebuildJobType;
|
||||||
|
id: number;
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface RebuildJobHandlers {
|
||||||
|
onSiteResource(siteResourceId: number): Promise<void>;
|
||||||
|
onClient(clientId: number): Promise<void>;
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface RebuildQueueManager {
|
||||||
|
enqueue(job: RebuildJob): Promise<void>;
|
||||||
|
startProcessing(handlers: RebuildJobHandlers): void;
|
||||||
|
}
|
||||||
|
|
||||||
|
class NoopRebuildQueue implements RebuildQueueManager {
|
||||||
|
async enqueue(_job: RebuildJob): Promise<void> {}
|
||||||
|
startProcessing(_handlers: RebuildJobHandlers): void {}
|
||||||
|
}
|
||||||
|
|
||||||
|
export const rebuildQueue: RebuildQueueManager = new NoopRebuildQueue();
|
||||||
169
server/private/lib/rebuildQueue.ts
Normal file
169
server/private/lib/rebuildQueue.ts
Normal file
@@ -0,0 +1,169 @@
|
|||||||
|
/*
|
||||||
|
* This file is part of a proprietary work.
|
||||||
|
*
|
||||||
|
* Copyright (c) 2025-2026 Fossorial, Inc.
|
||||||
|
* All rights reserved.
|
||||||
|
*
|
||||||
|
* This file is licensed under the Fossorial Commercial License.
|
||||||
|
* You may not use this file except in compliance with the License.
|
||||||
|
* Unauthorized use, copying, modification, or distribution is strictly prohibited.
|
||||||
|
*
|
||||||
|
* This file is not licensed under the AGPLv3.
|
||||||
|
*/
|
||||||
|
|
||||||
|
import { redis } from "#private/lib/redis";
|
||||||
|
import { lockManager } from "#dynamic/lib/lock";
|
||||||
|
import logger from "@server/logger";
|
||||||
|
|
||||||
|
export type RebuildJobType = "site-resource" | "client";
|
||||||
|
|
||||||
|
export interface RebuildJob {
|
||||||
|
type: RebuildJobType;
|
||||||
|
id: number;
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface RebuildJobHandlers {
|
||||||
|
onSiteResource(siteResourceId: number): Promise<void>;
|
||||||
|
onClient(clientId: number): Promise<void>;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Redis list holding pending rebuild jobs (RPUSH to enqueue, LPOP to dequeue — FIFO order).
|
||||||
|
const QUEUE_KEY = "rebuild-client-associations:queue";
|
||||||
|
|
||||||
|
// Distributed lock that serialises queue consumption to a single server instance
|
||||||
|
// at a time. TTL is generous enough to cover a full batch of expensive rebuilds.
|
||||||
|
const PROCESSOR_LOCK_KEY = "rebuild-client-associations:processor";
|
||||||
|
|
||||||
|
// Each rebuild can take up to REBUILD_ASSOCIATIONS_LOCK_TTL_MS (120 s) per
|
||||||
|
// resource. Allow BATCH_SIZE resources per processor-lock acquisition, plus a
|
||||||
|
// small buffer.
|
||||||
|
const BATCH_SIZE = 5;
|
||||||
|
const PROCESSOR_LOCK_TTL_MS = 120000 * BATCH_SIZE + 30000; // ~630 s
|
||||||
|
|
||||||
|
const POLL_INTERVAL_MS = 500;
|
||||||
|
|
||||||
|
class RedisRebuildQueue {
|
||||||
|
private processingStarted = false;
|
||||||
|
|
||||||
|
async enqueue(job: RebuildJob): Promise<void> {
|
||||||
|
if (!redis || redis.status !== "ready") {
|
||||||
|
logger.warn(
|
||||||
|
`Rebuild queue: Redis not available — rebuild for ${job.type}:${job.id} will not be retried`
|
||||||
|
);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
await redis.rpush(QUEUE_KEY, JSON.stringify(job));
|
||||||
|
logger.debug(
|
||||||
|
`Rebuild queue: enqueued ${job.type}:${job.id} (queue position: tail)`
|
||||||
|
);
|
||||||
|
} catch (err) {
|
||||||
|
logger.error(
|
||||||
|
`Rebuild queue: failed to enqueue ${job.type}:${job.id}:`,
|
||||||
|
err
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
startProcessing(handlers: RebuildJobHandlers): void {
|
||||||
|
if (this.processingStarted) return;
|
||||||
|
this.processingStarted = true;
|
||||||
|
|
||||||
|
this.processLoop(handlers).catch((err) => {
|
||||||
|
logger.error("Rebuild queue processor loop crashed:", err);
|
||||||
|
});
|
||||||
|
|
||||||
|
logger.info("Rebuild queue processor started");
|
||||||
|
}
|
||||||
|
|
||||||
|
private async processLoop(handlers: RebuildJobHandlers): Promise<void> {
|
||||||
|
while (true) {
|
||||||
|
try {
|
||||||
|
await this.tryProcessBatch(handlers);
|
||||||
|
} catch (err) {
|
||||||
|
logger.error(
|
||||||
|
"Rebuild queue: unhandled error in process loop:",
|
||||||
|
err
|
||||||
|
);
|
||||||
|
}
|
||||||
|
await new Promise((resolve) =>
|
||||||
|
setTimeout(resolve, POLL_INTERVAL_MS)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private async tryProcessBatch(handlers: RebuildJobHandlers): Promise<void> {
|
||||||
|
if (!redis || redis.status !== "ready") return;
|
||||||
|
|
||||||
|
// Peek before acquiring the processor lock to avoid unnecessary Redis
|
||||||
|
// round-trips and lock contention when the queue is idle.
|
||||||
|
const queueLength = await redis.llen(QUEUE_KEY).catch(() => 0);
|
||||||
|
if (queueLength === 0) return;
|
||||||
|
|
||||||
|
try {
|
||||||
|
await lockManager.withLock(
|
||||||
|
PROCESSOR_LOCK_KEY,
|
||||||
|
async () => {
|
||||||
|
for (let i = 0; i < BATCH_SIZE; i++) {
|
||||||
|
if (!redis || redis.status !== "ready") break;
|
||||||
|
|
||||||
|
const payload = await redis.lpop(QUEUE_KEY);
|
||||||
|
if (payload === null) break; // queue drained
|
||||||
|
|
||||||
|
let job: RebuildJob;
|
||||||
|
try {
|
||||||
|
job = JSON.parse(payload) as RebuildJob;
|
||||||
|
} catch {
|
||||||
|
logger.error(
|
||||||
|
`Rebuild queue: could not parse job payload, discarding: ${payload}`
|
||||||
|
);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.debug(
|
||||||
|
`Rebuild queue: processing ${job.type}:${job.id}`
|
||||||
|
);
|
||||||
|
|
||||||
|
try {
|
||||||
|
if (job.type === "site-resource") {
|
||||||
|
await handlers.onSiteResource(job.id);
|
||||||
|
} else if (job.type === "client") {
|
||||||
|
await handlers.onClient(job.id);
|
||||||
|
} else {
|
||||||
|
logger.warn(
|
||||||
|
`Rebuild queue: unknown job type "${(job as any).type}", discarding`
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.debug(
|
||||||
|
`Rebuild queue: completed ${job.type}:${job.id}`
|
||||||
|
);
|
||||||
|
} catch (err) {
|
||||||
|
logger.error(
|
||||||
|
`Rebuild queue: job ${job.type}:${job.id} threw an error:`,
|
||||||
|
err
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
PROCESSOR_LOCK_TTL_MS
|
||||||
|
);
|
||||||
|
} catch (err: any) {
|
||||||
|
if (
|
||||||
|
typeof err?.message === "string" &&
|
||||||
|
err.message.startsWith("Failed to acquire lock")
|
||||||
|
) {
|
||||||
|
// Another server instance currently holds the processor lock and
|
||||||
|
// is consuming the queue — nothing to do this cycle.
|
||||||
|
logger.debug(
|
||||||
|
"Rebuild queue: processor lock held by another instance, skipping this cycle"
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export const rebuildQueue: RedisRebuildQueue = new RedisRebuildQueue();
|
||||||
Reference in New Issue
Block a user