From 242123b8753e05487ef7e84b09f180fb6d777808 Mon Sep 17 00:00:00 2001 From: Owen Date: Wed, 24 Jun 2026 16:00:54 -0400 Subject: [PATCH] Implement non-redis lock --- server/lib/lock.ts | 124 ++++++++++++++++++++++++++++-- server/private/lib/lock.ts | 153 +++++++++++++++++++++++++------------ 2 files changed, 222 insertions(+), 55 deletions(-) diff --git a/server/lib/lock.ts b/server/lib/lock.ts index 15d1f39e1..3cd1b8704 100644 --- a/server/lib/lock.ts +++ b/server/lib/lock.ts @@ -1,4 +1,24 @@ +const instanceId = `local-${Math.random().toString(36).slice(2)}-${Date.now()}`; + +type LocalLockRecord = { + owner: string; + expiresAt: number; +}; + +const localLocks = new Map(); + export class LockManager { + private clearExpiredLocalLock(lockKey: string): void { + const current = localLocks.get(lockKey); + if (current && current.expiresAt <= Date.now()) { + localLocks.delete(lockKey); + } + } + + private getLocalOwnerToken(): string { + return `${instanceId}:`; + } + /** * Acquire a distributed lock using Redis SET with NX and PX options * @param lockKey - Unique identifier for the lock @@ -7,22 +27,57 @@ export class LockManager { */ async acquireLock( lockKey: string, - ttlMs: number = 30000 + ttlMs: number = 30000, + maxRetries: number = 3, + retryDelayMs: number = 100 ): Promise { - return true; + for (let attempt = 0; attempt < maxRetries; attempt++) { + this.clearExpiredLocalLock(lockKey); + + const existing = localLocks.get(lockKey); + if (!existing) { + localLocks.set(lockKey, { + owner: this.getLocalOwnerToken(), + expiresAt: Date.now() + ttlMs + }); + return true; + } + + if (existing.owner === this.getLocalOwnerToken()) { + existing.expiresAt = Date.now() + ttlMs; + localLocks.set(lockKey, existing); + return true; + } + + if (attempt < maxRetries - 1) { + const delay = retryDelayMs * Math.pow(2, attempt); + await new Promise((resolve) => setTimeout(resolve, delay)); + } + } + + return false; } /** * Release a lock using Lua script to ensure atomicity * @param lockKey - Unique identifier for the lock */ - async releaseLock(lockKey: string): Promise {} + async releaseLock(lockKey: string): Promise { + this.clearExpiredLocalLock(lockKey); + const existing = localLocks.get(lockKey); + + if (existing && existing.owner === this.getLocalOwnerToken()) { + localLocks.delete(lockKey); + } + } /** * Force release a lock regardless of owner (use with caution) * @param lockKey - Unique identifier for the lock */ - async forceReleaseLock(lockKey: string): Promise {} + async forceReleaseLock(lockKey: string): Promise { + localLocks.delete(lockKey); + } /** * Check if a lock exists and get its info @@ -35,7 +90,20 @@ export class LockManager { ttl: number; owner?: string; }> { - return { exists: false, ownedByMe: false, ttl: 0 }; + this.clearExpiredLocalLock(lockKey); + const existing = localLocks.get(lockKey); + + if (!existing) { + return { exists: false, ownedByMe: false, ttl: 0 }; + } + + const ttl = Math.max(0, existing.expiresAt - Date.now()); + return { + exists: true, + ownedByMe: existing.owner === this.getLocalOwnerToken(), + ttl, + owner: existing.owner.split(":")[0] + }; } /** @@ -45,6 +113,15 @@ export class LockManager { * @returns Promise - true if extended successfully */ async extendLock(lockKey: string, ttlMs: number): Promise { + this.clearExpiredLocalLock(lockKey); + const existing = localLocks.get(lockKey); + + if (!existing || existing.owner !== this.getLocalOwnerToken()) { + return false; + } + + existing.expiresAt = Date.now() + ttlMs; + localLocks.set(lockKey, existing); return true; } @@ -62,7 +139,26 @@ export class LockManager { maxRetries: number = 5, baseDelayMs: number = 100 ): Promise { - return true; + for (let attempt = 0; attempt <= maxRetries; attempt++) { + const acquired = await this.acquireLock( + lockKey, + ttlMs, + 1, + baseDelayMs + ); + + if (acquired) { + return true; + } + + if (attempt < maxRetries) { + const delay = + baseDelayMs * Math.pow(2, attempt) + Math.random() * 100; + await new Promise((resolve) => setTimeout(resolve, delay)); + } + } + + return false; } /** @@ -99,7 +195,21 @@ export class LockManager { activeLocksCount: number; locksOwnedByMe: number; }> { - return { activeLocksCount: 0, locksOwnedByMe: 0 }; + const now = Date.now(); + for (const [key, value] of localLocks.entries()) { + if (value.expiresAt <= now) { + localLocks.delete(key); + } + } + + let locksOwnedByMe = 0; + for (const value of localLocks.values()) { + if (value.owner === this.getLocalOwnerToken()) { + locksOwnedByMe++; + } + } + + return { activeLocksCount: localLocks.size, locksOwnedByMe }; } /** diff --git a/server/private/lib/lock.ts b/server/private/lib/lock.ts index a59bbc051..26577b2b0 100644 --- a/server/private/lib/lock.ts +++ b/server/private/lib/lock.ts @@ -11,14 +11,31 @@ * This file is not licensed under the AGPLv3. */ -import { config } from "@server/lib/config"; import logger from "@server/logger"; import { redis } from "#private/lib/redis"; import { v4 as uuidv4 } from "uuid"; const instanceId = uuidv4(); +type LocalLockRecord = { + owner: string; + expiresAt: number; +}; + +const localLocks = new Map(); + export class LockManager { + private clearExpiredLocalLock(lockKey: string): void { + const current = localLocks.get(lockKey); + if (current && current.expiresAt <= Date.now()) { + localLocks.delete(lockKey); + } + } + + private getLocalOwnerToken(): string { + return `${instanceId}:`; + } + /** * Acquire a distributed lock using Redis SET with NX and PX options * @param lockKey - Unique identifier for the lock @@ -32,12 +49,34 @@ export class LockManager { retryDelayMs: number = 100 ): Promise { if (!redis || !redis.status || redis.status !== "ready") { - return true; + for (let attempt = 0; attempt < maxRetries; attempt++) { + this.clearExpiredLocalLock(lockKey); + + const existing = localLocks.get(lockKey); + if (!existing) { + localLocks.set(lockKey, { + owner: this.getLocalOwnerToken(), + expiresAt: Date.now() + ttlMs + }); + return true; + } + + if (existing.owner === this.getLocalOwnerToken()) { + existing.expiresAt = Date.now() + ttlMs; + localLocks.set(lockKey, existing); + return true; + } + + if (attempt < maxRetries - 1) { + const delay = retryDelayMs * Math.pow(2, attempt); + await new Promise((resolve) => setTimeout(resolve, delay)); + } + } + + return false; } - const lockValue = `${ - instanceId - }:${Date.now()}`; + const lockValue = `${instanceId}:${Date.now()}`; const redisKey = `lock:${lockKey}`; for (let attempt = 0; attempt < maxRetries; attempt++) { @@ -53,11 +92,7 @@ export class LockManager { ); if (result === "OK") { - logger.debug( - `Lock acquired: ${lockKey} by ${ - instanceId - }` - ); + logger.debug(`Lock acquired: ${lockKey} by ${instanceId}`); return true; } @@ -65,17 +100,11 @@ export class LockManager { const existingValue = await redis.get(redisKey); if ( existingValue && - existingValue.startsWith( - `${instanceId}:` - ) + existingValue.startsWith(`${instanceId}:`) ) { // Extend the lock TTL since it's the same worker await redis.pexpire(redisKey, ttlMs); - logger.debug( - `Lock extended: ${lockKey} by ${ - instanceId - }` - ); + logger.debug(`Lock extended: ${lockKey} by ${instanceId}`); return true; } @@ -88,7 +117,10 @@ export class LockManager { await new Promise((resolve) => setTimeout(resolve, delay)); } } catch (error) { - logger.error(`Failed to acquire lock ${lockKey} (attempt ${attempt + 1}/${maxRetries}):`, error); + logger.error( + `Failed to acquire lock ${lockKey} (attempt ${attempt + 1}/${maxRetries}):`, + error + ); // On error, still retry if we have attempts left if (attempt < maxRetries - 1) { const delay = retryDelayMs * Math.pow(2, attempt); @@ -109,6 +141,11 @@ export class LockManager { */ async releaseLock(lockKey: string): Promise { if (!redis || !redis.status || redis.status !== "ready") { + this.clearExpiredLocalLock(lockKey); + const existing = localLocks.get(lockKey); + if (existing && existing.owner === this.getLocalOwnerToken()) { + localLocks.delete(lockKey); + } return; } @@ -136,11 +173,7 @@ export class LockManager { )) as number; if (result === 1) { - logger.debug( - `Lock released: ${lockKey} by ${ - instanceId - }` - ); + logger.debug(`Lock released: ${lockKey} by ${instanceId}`); } else { logger.warn( `Lock not released - not owned by worker: ${lockKey} by ${ @@ -159,6 +192,7 @@ export class LockManager { */ async forceReleaseLock(lockKey: string): Promise { if (!redis || !redis.status || redis.status !== "ready") { + localLocks.delete(lockKey); return; } @@ -186,7 +220,20 @@ export class LockManager { owner?: string; }> { if (!redis || !redis.status || redis.status !== "ready") { - return { exists: false, ownedByMe: true, ttl: 0 }; + this.clearExpiredLocalLock(lockKey); + const existing = localLocks.get(lockKey); + + if (!existing) { + return { exists: false, ownedByMe: false, ttl: 0 }; + } + + const ttl = Math.max(0, existing.expiresAt - Date.now()); + return { + exists: true, + ownedByMe: existing.owner === this.getLocalOwnerToken(), + ttl, + owner: existing.owner.split(":")[0] + }; } const redisKey = `lock:${lockKey}`; @@ -198,11 +245,7 @@ export class LockManager { ]); const exists = value !== null; - const ownedByMe = - exists && - value!.startsWith( - `${instanceId}:` - ); + const ownedByMe = exists && value!.startsWith(`${instanceId}:`); const owner = exists ? value!.split(":")[0] : undefined; return { @@ -225,6 +268,15 @@ export class LockManager { */ async extendLock(lockKey: string, ttlMs: number): Promise { if (!redis || !redis.status || redis.status !== "ready") { + this.clearExpiredLocalLock(lockKey); + const existing = localLocks.get(lockKey); + + if (!existing || existing.owner !== this.getLocalOwnerToken()) { + return false; + } + + existing.expiresAt = Date.now() + ttlMs; + localLocks.set(lockKey, existing); return true; } @@ -255,9 +307,7 @@ export class LockManager { if (result === 1) { logger.debug( - `Lock extended: ${lockKey} by ${ - instanceId - } for ${ttlMs}ms` + `Lock extended: ${lockKey} by ${instanceId} for ${ttlMs}ms` ); return true; } @@ -282,12 +332,13 @@ export class LockManager { maxRetries: number = 5, baseDelayMs: number = 100 ): Promise { - if (!redis || !redis.status || redis.status !== "ready") { - return true; - } - for (let attempt = 0; attempt <= maxRetries; attempt++) { - const acquired = await this.acquireLock(lockKey, ttlMs); + const acquired = await this.acquireLock( + lockKey, + ttlMs, + 1, + baseDelayMs + ); if (acquired) { return true; @@ -319,10 +370,6 @@ export class LockManager { fn: () => Promise, ttlMs: number = 30000 ): Promise { - if (!redis || !redis.status || redis.status !== "ready") { - return await fn(); - } - const acquired = await this.acquireLock(lockKey, ttlMs); if (!acquired) { @@ -346,7 +393,21 @@ export class LockManager { locksOwnedByMe: number; }> { if (!redis || !redis.status || redis.status !== "ready") { - return { activeLocksCount: 0, locksOwnedByMe: 0 }; + const now = Date.now(); + for (const [key, value] of localLocks.entries()) { + if (value.expiresAt <= now) { + localLocks.delete(key); + } + } + + let locksOwnedByMe = 0; + for (const value of localLocks.values()) { + if (value.owner === this.getLocalOwnerToken()) { + locksOwnedByMe++; + } + } + + return { activeLocksCount: localLocks.size, locksOwnedByMe }; } try { @@ -356,11 +417,7 @@ export class LockManager { if (keys.length > 0) { const values = await redis.mget(...keys); locksOwnedByMe = values.filter( - (value) => - value && - value.startsWith( - `${instanceId}:` - ) + (value) => value && value.startsWith(`${instanceId}:`) ).length; }