This commit is contained in:
Owen
2026-07-01 21:13:40 -04:00
parent 807613f28c
commit 4ab101f8a9
4 changed files with 199 additions and 96 deletions

View File

@@ -333,7 +333,7 @@ export async function getNextAvailableClientSubnet(
if (!acquired) {
throw new Error(`Failed to acquire lock: ${lockKey}`);
}
const release = () => lockManager.releaseLock(lockKey);
const release = () => lockManager.releaseLock(lockKey, acquired);
try {
const [org] = await transaction
@@ -395,7 +395,7 @@ export async function getNextAvailableAliasAddress(
if (!acquired) {
throw new Error(`Failed to acquire lock: ${lockKey}`);
}
const release = () => lockManager.releaseLock(lockKey);
const release = () => lockManager.releaseLock(lockKey, acquired);
try {
const [org] = await trx
@@ -463,7 +463,7 @@ export async function getNextAvailableOrgSubnet(): Promise<{
if (!acquired) {
throw new Error(`Failed to acquire lock: ${lockKey}`);
}
const release = () => lockManager.releaseLock(lockKey);
const release = () => lockManager.releaseLock(lockKey, acquired);
try {
const existingAddresses = await db

View File

@@ -1,3 +1,5 @@
import { randomUUID } from "crypto";
const instanceId = `local-${Math.random().toString(36).slice(2)}-${Date.now()}`;
type LocalLockRecord = {
@@ -15,58 +17,60 @@ export class LockManager {
}
}
private getLocalOwnerToken(): string {
return `${instanceId}:`;
}
/**
* Acquire a distributed lock using Redis SET with NX and PX options
* Acquire a local in-process lock using an optimistic Map-based check.
* @param lockKey - Unique identifier for the lock
* @param ttlMs - Time to live in milliseconds
* @returns Promise<boolean> - true if lock acquired, false otherwise
* @returns Promise<string | null> - a token identifying this specific acquisition
* (truthy) on success, or null if the lock could not be acquired.
*/
async acquireLock(
lockKey: string,
ttlMs: number = 30000,
maxRetries: number = 3,
retryDelayMs: number = 100
): Promise<boolean> {
): Promise<string | null> {
for (let attempt = 0; attempt < maxRetries; attempt++) {
this.clearExpiredLocalLock(lockKey);
const existing = localLocks.get(lockKey);
if (!existing) {
const token = `${instanceId}:${randomUUID()}`;
localLocks.set(lockKey, {
owner: this.getLocalOwnerToken(),
owner: token,
expiresAt: Date.now() + ttlMs
});
return true;
}
if (existing.owner === this.getLocalOwnerToken()) {
existing.expiresAt = Date.now() + ttlMs;
localLocks.set(lockKey, existing);
return true;
return token;
}
// The lock is currently held -- possibly by a different, unrelated
// caller in this same process. We intentionally do NOT treat
// same-process holders as automatically reentrant here: two
// independent logical operations (e.g. two different API requests)
// running concurrently in the same process must not both believe
// they hold the lock, or their writes under it can interleave
// unguarded. Just retry with backoff like any other contended lock.
if (attempt < maxRetries - 1) {
const delay = retryDelayMs * Math.pow(2, attempt);
await new Promise((resolve) => setTimeout(resolve, delay));
}
}
return false;
return null;
}
/**
* Release a lock using Lua script to ensure atomicity
* Release a lock previously acquired via acquireLock/acquireLockWithRetry.
* @param lockKey - Unique identifier for the lock
* @param token - the exact token returned by the acquisition being released.
* Required so a caller whose TTL already expired can't delete a
* different, currently-active holder's lock.
*/
async releaseLock(lockKey: string): Promise<void> {
async releaseLock(lockKey: string, token: string): Promise<void> {
this.clearExpiredLocalLock(lockKey);
const existing = localLocks.get(lockKey);
if (existing && existing.owner === this.getLocalOwnerToken()) {
if (existing && existing.owner === token) {
localLocks.delete(lockKey);
}
}
@@ -100,23 +104,29 @@ export class LockManager {
const ttl = Math.max(0, existing.expiresAt - Date.now());
return {
exists: true,
ownedByMe: existing.owner === this.getLocalOwnerToken(),
ownedByMe: existing.owner.startsWith(`${instanceId}:`),
ttl,
owner: existing.owner.split(":")[0]
};
}
/**
* Extend the TTL of an existing lock owned by this worker
* Extend the TTL of an existing lock, provided the token matches the
* acquisition currently holding it.
* @param lockKey - Unique identifier for the lock
* @param ttlMs - New TTL in milliseconds
* @param token - the token returned by the acquisition being extended
* @returns Promise<boolean> - true if extended successfully
*/
async extendLock(lockKey: string, ttlMs: number): Promise<boolean> {
async extendLock(
lockKey: string,
ttlMs: number,
token: string
): Promise<boolean> {
this.clearExpiredLocalLock(lockKey);
const existing = localLocks.get(lockKey);
if (!existing || existing.owner !== this.getLocalOwnerToken()) {
if (!existing || existing.owner !== token) {
return false;
}
@@ -131,14 +141,14 @@ export class LockManager {
* @param ttlMs - Time to live in milliseconds
* @param maxRetries - Maximum number of retry attempts
* @param baseDelayMs - Base delay between retries in milliseconds
* @returns Promise<boolean> - true if lock acquired
* @returns Promise<string | null> - token if acquired, null otherwise
*/
async acquireLockWithRetry(
lockKey: string,
ttlMs: number = 30000,
maxRetries: number = 5,
baseDelayMs: number = 100
): Promise<boolean> {
): Promise<string | null> {
for (let attempt = 0; attempt <= maxRetries; attempt++) {
const acquired = await this.acquireLock(
lockKey,
@@ -148,7 +158,7 @@ export class LockManager {
);
if (acquired) {
return true;
return acquired;
}
if (attempt < maxRetries) {
@@ -158,7 +168,7 @@ export class LockManager {
}
}
return false;
return null;
}
/**
@@ -173,16 +183,16 @@ export class LockManager {
fn: () => Promise<T>,
ttlMs: number = 30000
): Promise<T> {
const acquired = await this.acquireLock(lockKey, ttlMs);
const token = await this.acquireLock(lockKey, ttlMs);
if (!acquired) {
if (!token) {
throw new Error(`Failed to acquire lock: ${lockKey}`);
}
try {
return await fn();
} finally {
await this.releaseLock(lockKey);
await this.releaseLock(lockKey, token);
}
}
@@ -204,7 +214,7 @@ export class LockManager {
let locksOwnedByMe = 0;
for (const value of localLocks.values()) {
if (value.owner === this.getLocalOwnerToken()) {
if (value.owner.startsWith(`${instanceId}:`)) {
locksOwnedByMe++;
}
}

View File

@@ -1,3 +1,5 @@
import logger from "@server/logger";
export type RebuildJobType = "site-resource" | "client";
export interface RebuildJob {
@@ -16,12 +18,104 @@ export interface RebuildQueueManager {
isQueued(job: RebuildJob): Promise<boolean>;
}
class NoopRebuildQueue implements RebuildQueueManager {
async enqueue(_job: RebuildJob): Promise<void> {}
startProcessing(_handlers: RebuildJobHandlers): void {}
async isQueued(_job: RebuildJob): Promise<boolean> {
return false;
// In-process FIFO used when there is no Redis to back a distributed queue
// (OSS build, or Redis unavailable). A job that loses the per-resource
// rebuild lock race lands here instead of being silently dropped, and gets
// retried shortly after against fresh DB state.
const POLL_INTERVAL_MS = 500;
const BATCH_SIZE = 5;
function dedupeKey(job: RebuildJob): string {
return `${job.type}:${job.id}`;
}
class InMemoryRebuildQueue implements RebuildQueueManager {
private queue: RebuildJob[] = [];
private queuedSet = new Set<string>();
private processing = false;
private processingStarted = false;
private handlers: RebuildJobHandlers | null = null;
async isQueued(job: RebuildJob): Promise<boolean> {
return this.queuedSet.has(dedupeKey(job));
}
async enqueue(job: RebuildJob): Promise<void> {
const key = dedupeKey(job);
if (this.queuedSet.has(key)) {
logger.debug(
`Rebuild queue: skipped duplicate queued job ${job.type}:${job.id}`
);
return;
}
this.queuedSet.add(key);
this.queue.push(job);
logger.debug(
`Rebuild queue: enqueued ${job.type}:${job.id} (queue position: tail)`
);
}
startProcessing(handlers: RebuildJobHandlers): void {
if (this.processingStarted) return;
this.processingStarted = true;
this.handlers = handlers;
setInterval(() => {
this.tryProcessBatch().catch((err) => {
logger.error(
"Rebuild queue: unhandled error in process loop:",
err
);
});
}, POLL_INTERVAL_MS);
logger.info("Rebuild queue processor started (in-memory)");
}
private async tryProcessBatch(): Promise<void> {
if (this.processing || !this.handlers || this.queue.length === 0) {
return;
}
this.processing = true;
try {
for (let i = 0; i < BATCH_SIZE; i++) {
const job = this.queue.shift();
if (!job) break; // queue drained
// Remove from the dedupe set once dequeued so the same job
// can be re-queued while this one is in progress.
this.queuedSet.delete(dedupeKey(job));
logger.debug(
`Rebuild queue: processing ${job.type}:${job.id}`
);
try {
if (job.type === "site-resource") {
await this.handlers.onSiteResource(job.id);
} else if (job.type === "client") {
await this.handlers.onClient(job.id);
} else {
logger.warn(
`Rebuild queue: unknown job type "${(job as any).type}", discarding`
);
}
logger.debug(
`Rebuild queue: completed ${job.type}:${job.id}`
);
} catch (err) {
logger.error(
`Rebuild queue: job ${job.type}:${job.id} threw an error:`,
err
);
}
}
} finally {
this.processing = false;
}
}
}
export const rebuildQueue: RebuildQueueManager = new NoopRebuildQueue();
export const rebuildQueue: RebuildQueueManager = new InMemoryRebuildQueue();

View File

@@ -32,55 +32,59 @@ export class LockManager {
}
}
private getLocalOwnerToken(): string {
return `${instanceId}:`;
}
/**
* Acquire a distributed lock using Redis SET with NX and PX options
* @param lockKey - Unique identifier for the lock
* @param ttlMs - Time to live in milliseconds
* @returns Promise<boolean> - true if lock acquired, false otherwise
* @returns Promise<string | null> - a token identifying this specific acquisition
* (truthy) on success, or null if the lock could not be acquired.
*/
async acquireLock(
lockKey: string,
ttlMs: number = 30000,
maxRetries: number = 3,
retryDelayMs: number = 100
): Promise<boolean> {
): Promise<string | null> {
if (!redis || !redis.status || redis.status !== "ready") {
for (let attempt = 0; attempt < maxRetries; attempt++) {
this.clearExpiredLocalLock(lockKey);
const existing = localLocks.get(lockKey);
if (!existing) {
const token = `${instanceId}:${uuidv4()}`;
localLocks.set(lockKey, {
owner: this.getLocalOwnerToken(),
owner: token,
expiresAt: Date.now() + ttlMs
});
return true;
}
if (existing.owner === this.getLocalOwnerToken()) {
existing.expiresAt = Date.now() + ttlMs;
localLocks.set(lockKey, existing);
return true;
return token;
}
// Do not treat a same-process holder as automatically
// reentrant -- see the note in the Redis branch below.
if (attempt < maxRetries - 1) {
const delay = retryDelayMs * Math.pow(2, attempt);
await new Promise((resolve) => setTimeout(resolve, delay));
}
}
return false;
return null;
}
const lockValue = `${instanceId}:${Date.now()}`;
const redisKey = `lock:${lockKey}`;
for (let attempt = 0; attempt < maxRetries; attempt++) {
try {
// Every acquisition attempt gets its own unique token, even
// within the same process. Two independent logical operations
// (e.g. two different API requests handled by the same server)
// racing for this key must never both believe they hold the
// lock -- if we treated "existing value starts with my
// instanceId" as reentrant success, a second unrelated caller
// on this process could barge in while the first is still
// mid-flight, and their writes under the lock would interleave
// unguarded.
const lockValue = `${instanceId}:${uuidv4()}`;
// Use SET with NX (only set if not exists) and PX (expire in milliseconds)
// This is atomic and handles both setting and expiration
const result = await redis.set(
@@ -93,19 +97,7 @@ export class LockManager {
if (result === "OK") {
logger.debug(`Lock acquired: ${lockKey} by ${instanceId}`);
return true;
}
// Check if the existing lock is from this worker (reentrant behavior)
const existingValue = await redis.get(redisKey);
if (
existingValue &&
existingValue.startsWith(`${instanceId}:`)
) {
// Extend the lock TTL since it's the same worker
await redis.pexpire(redisKey, ttlMs);
logger.debug(`Lock extended: ${lockKey} by ${instanceId}`);
return true;
return lockValue;
}
// If this isn't our last attempt, wait before retrying with exponential backoff
@@ -132,18 +124,23 @@ export class LockManager {
logger.debug(
`Failed to acquire lock ${lockKey} after ${maxRetries} attempts`
);
return false;
return null;
}
/**
* Release a lock using Lua script to ensure atomicity
* Release a lock previously acquired via acquireLock/acquireLockWithRetry,
* using a Lua script to ensure we only delete it if it still matches the
* exact token from that acquisition (not just "owned by this process") --
* this ensures a caller whose TTL already expired can't delete a
* different, currently-active holder's lock.
* @param lockKey - Unique identifier for the lock
* @param token - the exact token returned by the acquisition being released
*/
async releaseLock(lockKey: string): Promise<void> {
async releaseLock(lockKey: string, token: string): Promise<void> {
if (!redis || !redis.status || redis.status !== "ready") {
this.clearExpiredLocalLock(lockKey);
const existing = localLocks.get(lockKey);
if (existing && existing.owner === this.getLocalOwnerToken()) {
if (existing && existing.owner === token) {
localLocks.delete(lockKey);
}
return;
@@ -151,13 +148,12 @@ export class LockManager {
const redisKey = `lock:${lockKey}`;
// Lua script to ensure we only delete the lock if it belongs to this worker
const luaScript = `
local key = KEYS[1]
local worker_prefix = ARGV[1]
local expected_value = ARGV[1]
local current_value = redis.call('GET', key)
if current_value and string.find(current_value, worker_prefix, 1, true) == 1 then
if current_value and current_value == expected_value then
return redis.call('DEL', key)
else
return 0
@@ -169,16 +165,14 @@ export class LockManager {
luaScript,
1,
redisKey,
`${instanceId}:`
token
)) as number;
if (result === 1) {
logger.debug(`Lock released: ${lockKey} by ${instanceId}`);
} else {
logger.warn(
`Lock not released - not owned by worker: ${lockKey} by ${
instanceId
}`
`Lock not released - token did not match current holder: ${lockKey} (attempted by ${instanceId})`
);
}
} catch (error) {
@@ -230,7 +224,7 @@ export class LockManager {
const ttl = Math.max(0, existing.expiresAt - Date.now());
return {
exists: true,
ownedByMe: existing.owner === this.getLocalOwnerToken(),
ownedByMe: existing.owner.startsWith(`${instanceId}:`),
ttl,
owner: existing.owner.split(":")[0]
};
@@ -261,17 +255,23 @@ export class LockManager {
}
/**
* Extend the TTL of an existing lock owned by this worker
* Extend the TTL of an existing lock, provided the token matches the
* acquisition currently holding it.
* @param lockKey - Unique identifier for the lock
* @param ttlMs - New TTL in milliseconds
* @param token - the token returned by the acquisition being extended
* @returns Promise<boolean> - true if extended successfully
*/
async extendLock(lockKey: string, ttlMs: number): Promise<boolean> {
async extendLock(
lockKey: string,
ttlMs: number,
token: string
): Promise<boolean> {
if (!redis || !redis.status || redis.status !== "ready") {
this.clearExpiredLocalLock(lockKey);
const existing = localLocks.get(lockKey);
if (!existing || existing.owner !== this.getLocalOwnerToken()) {
if (!existing || existing.owner !== token) {
return false;
}
@@ -282,14 +282,13 @@ export class LockManager {
const redisKey = `lock:${lockKey}`;
// Lua script to extend TTL only if lock is owned by this worker
const luaScript = `
local key = KEYS[1]
local worker_prefix = ARGV[1]
local expected_value = ARGV[1]
local ttl = tonumber(ARGV[2])
local current_value = redis.call('GET', key)
if current_value and string.find(current_value, worker_prefix, 1, true) == 1 then
if current_value and current_value == expected_value then
return redis.call('PEXPIRE', key, ttl)
else
return 0
@@ -301,7 +300,7 @@ export class LockManager {
luaScript,
1,
redisKey,
`${instanceId}:`,
token,
ttlMs.toString()
)) as number;
@@ -324,14 +323,14 @@ export class LockManager {
* @param ttlMs - Time to live in milliseconds
* @param maxRetries - Maximum number of retry attempts
* @param baseDelayMs - Base delay between retries in milliseconds
* @returns Promise<boolean> - true if lock acquired
* @returns Promise<string | null> - token if acquired, null otherwise
*/
async acquireLockWithRetry(
lockKey: string,
ttlMs: number = 30000,
maxRetries: number = 5,
baseDelayMs: number = 100
): Promise<boolean> {
): Promise<string | null> {
for (let attempt = 0; attempt <= maxRetries; attempt++) {
const acquired = await this.acquireLock(
lockKey,
@@ -341,7 +340,7 @@ export class LockManager {
);
if (acquired) {
return true;
return acquired;
}
if (attempt < maxRetries) {
@@ -355,7 +354,7 @@ export class LockManager {
logger.warn(
`Failed to acquire lock ${lockKey} after ${maxRetries + 1} attempts`
);
return false;
return null;
}
/**
@@ -370,16 +369,16 @@ export class LockManager {
fn: () => Promise<T>,
ttlMs: number = 30000
): Promise<T> {
const acquired = await this.acquireLock(lockKey, ttlMs);
const token = await this.acquireLock(lockKey, ttlMs);
if (!acquired) {
if (!token) {
throw new Error(`Failed to acquire lock: ${lockKey}`);
}
try {
return await fn();
} finally {
await this.releaseLock(lockKey);
await this.releaseLock(lockKey, token);
}
}
@@ -402,7 +401,7 @@ export class LockManager {
let locksOwnedByMe = 0;
for (const value of localLocks.values()) {
if (value.owner === this.getLocalOwnerToken()) {
if (value.owner.startsWith(`${instanceId}:`)) {
locksOwnedByMe++;
}
}