diff --git a/server/lib/alerts/events/healthCheckEvents.ts b/server/lib/alerts/events/healthCheckEvents.ts index 500c535c9..ba79feb4b 100644 --- a/server/lib/alerts/events/healthCheckEvents.ts +++ b/server/lib/alerts/events/healthCheckEvents.ts @@ -4,7 +4,9 @@ export async function fireHealthCheckHealthyAlert( orgId: string, healthCheckId: number, healthCheckName?: string, - extra?: Record + healthCheckTargetId?: number | null, + extra?: Record, + trx?: unknown ): Promise { return; } @@ -13,7 +15,9 @@ export async function fireHealthCheckUnhealthyAlert( orgId: string, healthCheckId: number, healthCheckName?: string, - extra?: Record + healthCheckTargetId?: number | null, + extra?: Record, + trx?: unknown ): Promise { return; -} +} \ No newline at end of file diff --git a/server/lib/alerts/events/resourceEvents.ts b/server/lib/alerts/events/resourceEvents.ts index 2e90057d1..09dd7d8cf 100644 --- a/server/lib/alerts/events/resourceEvents.ts +++ b/server/lib/alerts/events/resourceEvents.ts @@ -2,19 +2,22 @@ export async function fireResourceHealthyAlert( orgId: string, resourceId: number, resourceName?: string | null, - extra?: Record + extra?: Record, + trx?: unknown ): Promise {} export async function fireResourceUnhealthyAlert( orgId: string, resourceId: number, resourceName?: string | null, - extra?: Record + extra?: Record, + trx?: unknown ): Promise {} export async function fireResourceToggleAlert( orgId: string, resourceId: number, resourceName?: string | null, - extra?: Record -): Promise {} + extra?: Record, + trx?: unknown +): Promise {} \ No newline at end of file diff --git a/server/lib/alerts/events/siteEvents.ts b/server/lib/alerts/events/siteEvents.ts index 8426fa9c2..1e96951cc 100644 --- a/server/lib/alerts/events/siteEvents.ts +++ b/server/lib/alerts/events/siteEvents.ts @@ -4,7 +4,8 @@ export async function fireSiteOnlineAlert( orgId: string, siteId: number, siteName?: string, - extra?: Record + extra?: Record, + trx?: unknown ): Promise { return; } @@ -13,7 +14,8 @@ export async function fireSiteOfflineAlert( orgId: string, siteId: number, siteName?: string, - extra?: Record + extra?: Record, + trx?: unknown ): Promise { return; -} +} \ No newline at end of file diff --git a/server/private/lib/alerts/events/healthCheckEvents.ts b/server/private/lib/alerts/events/healthCheckEvents.ts index 470788ad5..c2ba25b28 100644 --- a/server/private/lib/alerts/events/healthCheckEvents.ts +++ b/server/private/lib/alerts/events/healthCheckEvents.ts @@ -18,7 +18,8 @@ import { statusHistory, targetHealthCheck, targets, - resources + resources, + Transaction } from "@server/db"; import { eq } from "drizzle-orm"; import { @@ -46,10 +47,11 @@ export async function fireHealthCheckHealthyAlert( healthCheckId: number, healthCheckName?: string | null, healthCheckTargetId?: number | null, - extra?: Record + extra?: Record, + trx: Transaction | typeof db = db ): Promise { try { - await db.insert(statusHistory).values({ + await trx.insert(statusHistory).values({ entityType: "health_check", entityId: healthCheckId, orgId: orgId, @@ -57,7 +59,7 @@ export async function fireHealthCheckHealthyAlert( timestamp: Math.floor(Date.now() / 1000) }); - await handleResource(orgId, healthCheckTargetId); + await handleResource(orgId, healthCheckTargetId, trx); await processAlerts({ eventType: "health_check_healthy", @@ -102,10 +104,11 @@ export async function fireHealthCheckUnhealthyAlert( healthCheckId: number, healthCheckName?: string | null, healthCheckTargetId?: number | null, - extra?: Record + extra?: Record, + trx: Transaction | typeof db = db ): Promise { try { - await db.insert(statusHistory).values({ + await trx.insert(statusHistory).values({ entityType: "health_check", entityId: healthCheckId, orgId: orgId, @@ -113,7 +116,7 @@ export async function fireHealthCheckUnhealthyAlert( timestamp: Math.floor(Date.now() / 1000) }); - await handleResource(orgId, healthCheckTargetId); + await handleResource(orgId, healthCheckTargetId, trx); await processAlerts({ eventType: "health_check_unhealthy", @@ -142,12 +145,12 @@ export async function fireHealthCheckUnhealthyAlert( } } -async function handleResource(orgId: string, healthCheckTargetId?: number | null) { +async function handleResource(orgId: string, healthCheckTargetId?: number | null, trx: Transaction | typeof db = db) { if (!healthCheckTargetId) { return; } // we have resources lets get them - const [target] = await db + const [target] = await trx .select() .from(targets) .where(eq(targets.targetId, healthCheckTargetId)) @@ -156,7 +159,7 @@ async function handleResource(orgId: string, healthCheckTargetId?: number | null if (!target) { return; } - const [resource] = await db + const [resource] = await trx .select() .from(resources) .where(eq(resources.resourceId, target.resourceId)) @@ -165,7 +168,7 @@ async function handleResource(orgId: string, healthCheckTargetId?: number | null if (!resource) { return; } - const otherTargets = await db + const otherTargets = await trx .select({ hcHealth: targetHealthCheck.hcHealth }) .from(targets) .where(eq(targets.resourceId, resource.resourceId)); @@ -181,7 +184,7 @@ async function handleResource(orgId: string, healthCheckTargetId?: number | null if (health != resource.health) { // it changed - await db + await trx .update(resources) .set({ health }) .where(eq(resources.resourceId, resource.resourceId)); @@ -190,13 +193,17 @@ async function handleResource(orgId: string, healthCheckTargetId?: number | null await fireResourceUnhealthyAlert( orgId, resource.resourceId, - resource.name + resource.name, + undefined, + trx ); } else if (health === "healthy") { await fireResourceHealthyAlert( orgId, resource.resourceId, - resource.name + resource.name, + undefined, + trx ); } } diff --git a/server/private/lib/alerts/events/resourceEvents.ts b/server/private/lib/alerts/events/resourceEvents.ts index 9fd351047..c2d6d3725 100644 --- a/server/private/lib/alerts/events/resourceEvents.ts +++ b/server/private/lib/alerts/events/resourceEvents.ts @@ -13,7 +13,7 @@ import logger from "@server/logger"; import { processAlerts } from "../processAlerts"; -import { db, statusHistory } from "@server/db"; +import { db, statusHistory, Transaction } from "@server/db"; // --------------------------------------------------------------------------- // Public API @@ -34,10 +34,11 @@ export async function fireResourceHealthyAlert( orgId: string, resourceId: number, resourceName?: string | null, - extra?: Record + extra?: Record, + trx: Transaction | typeof db = db ): Promise { try { - await db.insert(statusHistory).values({ + await trx.insert(statusHistory).values({ entityType: "resource", entityId: resourceId, orgId: orgId, @@ -87,10 +88,11 @@ export async function fireResourceUnhealthyAlert( orgId: string, resourceId: number, resourceName?: string | null, - extra?: Record + extra?: Record, + trx: Transaction | typeof db = db ): Promise { try { - await db.insert(statusHistory).values({ + await trx.insert(statusHistory).values({ entityType: "resource", entityId: resourceId, orgId: orgId, @@ -140,7 +142,8 @@ export async function fireResourceToggleAlert( orgId: string, resourceId: number, resourceName?: string | null, - extra?: Record + extra?: Record, + trx: Transaction | typeof db = db ): Promise { try { await processAlerts({ diff --git a/server/private/lib/alerts/events/siteEvents.ts b/server/private/lib/alerts/events/siteEvents.ts index 66c813ab9..580e00848 100644 --- a/server/private/lib/alerts/events/siteEvents.ts +++ b/server/private/lib/alerts/events/siteEvents.ts @@ -13,7 +13,7 @@ import logger from "@server/logger"; import { processAlerts } from "../processAlerts"; -import { db, sites, statusHistory, targetHealthCheck } from "@server/db"; +import { db, sites, statusHistory, targetHealthCheck, Transaction } from "@server/db"; import { and, eq, inArray } from "drizzle-orm"; import { fireHealthCheckUnhealthyAlert } from "./healthCheckEvents"; @@ -36,10 +36,11 @@ export async function fireSiteOnlineAlert( orgId: string, siteId: number, siteName?: string, - extra?: Record + extra?: Record, + trx: Transaction | typeof db = db ): Promise { try { - await db.insert(statusHistory).values({ + await trx.insert(statusHistory).values({ entityType: "site", entityId: siteId, orgId: orgId, @@ -89,10 +90,11 @@ export async function fireSiteOfflineAlert( orgId: string, siteId: number, siteName?: string, - extra?: Record + extra?: Record, + trx: Transaction | typeof db = db ): Promise { try { - await db.insert(statusHistory).values({ + await trx.insert(statusHistory).values({ entityType: "site", entityId: siteId, orgId: orgId, @@ -100,7 +102,7 @@ export async function fireSiteOfflineAlert( timestamp: Math.floor(Date.now() / 1000) }); - const unhealthyHealthChecks = await db + const unhealthyHealthChecks = await trx .update(targetHealthCheck) .set({ hcHealth: "unhealthy" }) .where( @@ -119,7 +121,10 @@ export async function fireSiteOfflineAlert( await fireHealthCheckUnhealthyAlert( healthCheck.orgId, healthCheck.targetHealthCheckId, - healthCheck.name + healthCheck.name, + undefined, + undefined, + trx ); } diff --git a/server/routers/newt/handleNewtDisconnectingMessage.ts b/server/routers/newt/handleNewtDisconnectingMessage.ts index 82af2e6a3..15c7d3662 100644 --- a/server/routers/newt/handleNewtDisconnectingMessage.ts +++ b/server/routers/newt/handleNewtDisconnectingMessage.ts @@ -2,10 +2,7 @@ import { MessageHandler } from "@server/routers/ws"; import { db, Newt, - sites, - statusHistory, - targetHealthCheck, - targets + sites } from "@server/db"; import { eq } from "drizzle-orm"; import logger from "@server/logger"; @@ -32,15 +29,17 @@ export const handleNewtDisconnectingMessage: MessageHandler = async ( try { // Update the client's last ping timestamp - const [site] = await db - .update(sites) - .set({ - online: false - }) - .where(eq(sites.siteId, newt.siteId)) - .returning(); + await db.transaction(async (trx) => { + const [site] = await trx + .update(sites) + .set({ + online: false + }) + .where(eq(sites.siteId, newt.siteId!)) + .returning(); - await fireSiteOfflineAlert(site.orgId, site.siteId, site.name); + await fireSiteOfflineAlert(site.orgId, site.siteId, site.name, undefined, trx); + }); } catch (error) { logger.error("Error handling disconnecting message", { error }); } diff --git a/server/routers/newt/offlineChecker.ts b/server/routers/newt/offlineChecker.ts index fe6253581..1dc51d5da 100644 --- a/server/routers/newt/offlineChecker.ts +++ b/server/routers/newt/offlineChecker.ts @@ -77,16 +77,20 @@ export const startNewtOfflineChecker = (): void => { `Marking site ${staleSite.siteId} offline: newt ${staleSite.newtId} has no recent ping and no active WebSocket connection` ); - await db - .update(sites) - .set({ online: false }) - .where(eq(sites.siteId, staleSite.siteId)); + await db.transaction(async (trx) => { + await trx + .update(sites) + .set({ online: false }) + .where(eq(sites.siteId, staleSite.siteId)); - await fireSiteOfflineAlert( - staleSite.orgId, - staleSite.siteId, - staleSite.name - ); + await fireSiteOfflineAlert( + staleSite.orgId, + staleSite.siteId, + staleSite.name, + undefined, + trx + ); + }); } // this part only effects self hosted. Its not efficient but we dont expect people to have very many wireguard sites @@ -123,16 +127,20 @@ export const startNewtOfflineChecker = (): void => { `Marking wireguard site ${site.siteId} offline: no bandwidth update in over ${OFFLINE_THRESHOLD_BANDWIDTH_MS / 60000} minutes` ); - await db - .update(sites) - .set({ online: false }) - .where(eq(sites.siteId, site.siteId)); + await db.transaction(async (trx) => { + await trx + .update(sites) + .set({ online: false }) + .where(eq(sites.siteId, site.siteId)); - await fireSiteOfflineAlert( - site.orgId, - site.siteId, - site.name - ); + await fireSiteOfflineAlert( + site.orgId, + site.siteId, + site.name, + undefined, + trx + ); + }); } else if ( lastBandwidthUpdate >= wireguardOfflineThreshold && !site.online @@ -141,16 +149,20 @@ export const startNewtOfflineChecker = (): void => { `Marking wireguard site ${site.siteId} online: recent bandwidth update` ); - await db - .update(sites) - .set({ online: true }) - .where(eq(sites.siteId, site.siteId)); + await db.transaction(async (trx) => { + await trx + .update(sites) + .set({ online: true }) + .where(eq(sites.siteId, site.siteId)); - await fireSiteOnlineAlert( - site.orgId, - site.siteId, - site.name - ); + await fireSiteOnlineAlert( + site.orgId, + site.siteId, + site.name, + undefined, + trx + ); + }); } } } catch (error) { diff --git a/server/routers/newt/pingAccumulator.ts b/server/routers/newt/pingAccumulator.ts index 70d8afa9f..307565723 100644 --- a/server/routers/newt/pingAccumulator.ts +++ b/server/routers/newt/pingAccumulator.ts @@ -147,7 +147,9 @@ async function flushSitePingsToDb(): Promise { }, "flushSitePingsToDb"); for (const site of newlyOnlineSites) { - await fireSiteOnlineAlert(site.orgId, site.siteId, site.name); + await db.transaction(async (trx) => { + await fireSiteOnlineAlert(site.orgId, site.siteId, site.name, undefined, trx); + }); } } catch (error) { logger.error( diff --git a/server/routers/target/handleHealthcheckStatusMessage.ts b/server/routers/target/handleHealthcheckStatusMessage.ts index c6f2863f8..50331deb8 100644 --- a/server/routers/target/handleHealthcheckStatusMessage.ts +++ b/server/routers/target/handleHealthcheckStatusMessage.ts @@ -14,10 +14,7 @@ import { fireHealthCheckHealthyAlert, fireHealthCheckUnhealthyAlert } from "#dynamic/lib/alerts"; -import { - fireResourceHealthyAlert, - fireResourceUnhealthyAlert -} from "#dynamic/lib/alerts"; + interface TargetHealthStatus { status: string; @@ -125,33 +122,39 @@ export const handleHealthcheckStatusMessage: MessageHandler = async ( continue; } - // Update the target's health status in the database - await db - .update(targetHealthCheck) - .set({ - hcHealth: healthStatus.status as - | "unknown" - | "healthy" - | "unhealthy" - }) - .where(eq(targetHealthCheck.targetHealthCheckId, targetCheck.targetHealthCheckId)); + // Update the target's health status in the database and fire alert in a transaction + await db.transaction(async (trx) => { + await trx + .update(targetHealthCheck) + .set({ + hcHealth: healthStatus.status as + | "unknown" + | "healthy" + | "unhealthy" + }) + .where(eq(targetHealthCheck.targetHealthCheckId, targetCheck.targetHealthCheckId)); - // because we are checking above if there was a change we can fire the alert here because it changed - if (healthStatus.status === "unhealthy") { - await fireHealthCheckUnhealthyAlert( - targetCheck.orgId, - targetCheck.targetHealthCheckId, - targetCheck.name ?? undefined, - targetCheck.targetId - ); - } else if (healthStatus.status === "healthy") { - await fireHealthCheckHealthyAlert( - targetCheck.orgId, - targetCheck.targetHealthCheckId, - targetCheck.name ?? undefined, - targetCheck.targetId - ); - } + // because we are checking above if there was a change we can fire the alert here because it changed + if (healthStatus.status === "unhealthy") { + await fireHealthCheckUnhealthyAlert( + targetCheck.orgId, + targetCheck.targetHealthCheckId, + targetCheck.name ?? undefined, + targetCheck.targetId, + undefined, + trx + ); + } else if (healthStatus.status === "healthy") { + await fireHealthCheckHealthyAlert( + targetCheck.orgId, + targetCheck.targetHealthCheckId, + targetCheck.name ?? undefined, + targetCheck.targetId, + undefined, + trx + ); + } + }); logger.debug( `Updated health status for target ${targetId} to ${healthStatus.status}`