mirror of
https://github.com/fosrl/pangolin.git
synced 2026-06-09 00:54:43 +00:00
Add resource column to hc and remove —
This commit is contained in:
@@ -88,11 +88,11 @@ async function dbQueryRows<T extends Record<string, unknown>>(
|
||||
): Promise<T[]> {
|
||||
const anyDb = db as any;
|
||||
if (typeof anyDb.execute === "function") {
|
||||
// PostgreSQL (node-postgres via Drizzle) — returns { rows: [...] } or an array
|
||||
// PostgreSQL (node-postgres via Drizzle) - returns { rows: [...] } or an array
|
||||
const result = await anyDb.execute(query);
|
||||
return (Array.isArray(result) ? result : (result.rows ?? [])) as T[];
|
||||
}
|
||||
// SQLite (better-sqlite3 via Drizzle) — returns an array directly
|
||||
// SQLite (better-sqlite3 via Drizzle) - returns an array directly
|
||||
return (await anyDb.all(query)) as T[];
|
||||
}
|
||||
|
||||
@@ -106,7 +106,7 @@ function isSQLite(): boolean {
|
||||
* Swaps out the accumulator before writing so that any bandwidth messages
|
||||
* received during the flush are captured in the new accumulator rather than
|
||||
* being lost or causing contention. Sites are updated in chunks via a single
|
||||
* batch UPDATE per chunk. Failed chunks are discarded — exact per-flush
|
||||
* batch UPDATE per chunk. Failed chunks are discarded - exact per-flush
|
||||
* accuracy is not critical and re-queuing is not worth the added complexity.
|
||||
*
|
||||
* This function is exported so that the application's graceful-shutdown
|
||||
@@ -125,7 +125,7 @@ export async function flushSiteBandwidthToDb(): Promise<void> {
|
||||
const currentTime = new Date().toISOString();
|
||||
|
||||
// Sort by publicKey for consistent lock ordering across concurrent
|
||||
// writers — deadlock-prevention strategy.
|
||||
// writers - deadlock-prevention strategy.
|
||||
const sortedEntries = [...snapshot.entries()].sort(([a], [b]) =>
|
||||
a.localeCompare(b)
|
||||
);
|
||||
@@ -150,7 +150,7 @@ export async function flushSiteBandwidthToDb(): Promise<void> {
|
||||
try {
|
||||
rows = await withDeadlockRetry(async () => {
|
||||
if (isSQLite()) {
|
||||
// SQLite: one UPDATE per row — no need for batch efficiency here.
|
||||
// SQLite: one UPDATE per row - no need for batch efficiency here.
|
||||
const results: { orgId: string; pubKey: string }[] = [];
|
||||
for (const [publicKey, { bytesIn, bytesOut }] of chunk) {
|
||||
const result = await dbQueryRows<{
|
||||
@@ -170,7 +170,7 @@ export async function flushSiteBandwidthToDb(): Promise<void> {
|
||||
return results;
|
||||
}
|
||||
|
||||
// PostgreSQL: batch UPDATE … FROM (VALUES …) — single round-trip per chunk.
|
||||
// PostgreSQL: batch UPDATE … FROM (VALUES …) - single round-trip per chunk.
|
||||
const valuesList = chunk.map(([publicKey, { bytesIn, bytesOut }]) =>
|
||||
sql`(${publicKey}::text, ${bytesIn}::real, ${bytesOut}::real)`
|
||||
);
|
||||
@@ -191,7 +191,7 @@ export async function flushSiteBandwidthToDb(): Promise<void> {
|
||||
`Failed to flush bandwidth chunk [${i}–${chunkEnd}], discarding ${chunk.length} site(s):`,
|
||||
error
|
||||
);
|
||||
// Discard the chunk — exact per-flush accuracy is not critical.
|
||||
// Discard the chunk - exact per-flush accuracy is not critical.
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -232,7 +232,7 @@ export async function flushSiteBandwidthToDb(): Promise<void> {
|
||||
totalBandwidth
|
||||
);
|
||||
if (bandwidthUsage) {
|
||||
// Fire-and-forget — don't block the flush on limit checking.
|
||||
// Fire-and-forget - don't block the flush on limit checking.
|
||||
usageService
|
||||
.checkLimitSet(
|
||||
orgId,
|
||||
@@ -298,7 +298,7 @@ export async function updateSiteBandwidth(
|
||||
exitNodeId?: number
|
||||
): Promise<void> {
|
||||
for (const { publicKey, bytesIn, bytesOut } of bandwidthData) {
|
||||
// Skip peers that haven't transferred any data — writing zeros to the
|
||||
// Skip peers that haven't transferred any data - writing zeros to the
|
||||
// database would be a no-op anyway.
|
||||
if (bytesIn <= 0 && bytesOut <= 0) {
|
||||
continue;
|
||||
|
||||
@@ -19,6 +19,9 @@ export type ListHealthChecksResponse = {
|
||||
hcTlsServerName: string | null;
|
||||
hcHealthyThreshold: number | null;
|
||||
hcUnhealthyThreshold: number | null;
|
||||
resourceId: number | null;
|
||||
resourceName: string | null;
|
||||
resourceNiceId: string | null;
|
||||
}[];
|
||||
pagination: {
|
||||
total: number;
|
||||
|
||||
@@ -88,7 +88,7 @@ export async function flushBandwidthToDb(): Promise<void> {
|
||||
const currentTime = new Date().toISOString();
|
||||
|
||||
// Sort by publicKey for consistent lock ordering across concurrent
|
||||
// writers — this is the same deadlock-prevention strategy used in the
|
||||
// writers - this is the same deadlock-prevention strategy used in the
|
||||
// original per-message implementation.
|
||||
const sortedEntries = [...snapshot.entries()].sort(([a], [b]) =>
|
||||
a.localeCompare(b)
|
||||
@@ -143,7 +143,7 @@ const flushTimer = setInterval(async () => {
|
||||
}, FLUSH_INTERVAL_MS);
|
||||
|
||||
// Calling unref() means this timer will not keep the Node.js event loop alive
|
||||
// on its own — the process can still exit normally when there is no other work
|
||||
// on its own - the process can still exit normally when there is no other work
|
||||
// left. The graceful-shutdown path (see server/cleanup.ts) will call
|
||||
// flushBandwidthToDb() explicitly before process.exit(), so no data is lost.
|
||||
flushTimer.unref();
|
||||
@@ -167,7 +167,7 @@ export const handleReceiveBandwidthMessage: MessageHandler = async (
|
||||
// Accumulate the incoming data in memory; the periodic timer (and the
|
||||
// shutdown hook) will take care of writing it to the database.
|
||||
for (const { publicKey, bytesIn, bytesOut } of bandwidthData) {
|
||||
// Skip peers that haven't transferred any data — writing zeros to the
|
||||
// Skip peers that haven't transferred any data - writing zeros to the
|
||||
// database would be a no-op anyway.
|
||||
if (bytesIn <= 0 && bytesOut <= 0) {
|
||||
continue;
|
||||
|
||||
@@ -16,7 +16,7 @@ const OFFLINE_THRESHOLD_BANDWIDTH_MS = 8 * 60 * 1000; // 8 minutes
|
||||
* Starts the background interval that checks for newt sites that haven't
|
||||
* pinged recently and marks them as offline. For backward compatibility,
|
||||
* a site is only marked offline when there is no active WebSocket connection
|
||||
* either — so older newt versions that don't send pings but remain connected
|
||||
* either - so older newt versions that don't send pings but remain connected
|
||||
* continue to be treated as online.
|
||||
*/
|
||||
export const startNewtOfflineChecker = (): void => {
|
||||
@@ -63,7 +63,7 @@ export const startNewtOfflineChecker = (): void => {
|
||||
);
|
||||
if (isConnected) {
|
||||
logger.debug(
|
||||
`Newt ${staleSite.newtId} has not pinged recently but is still connected via WebSocket — keeping site ${staleSite.siteId} online`
|
||||
`Newt ${staleSite.newtId} has not pinged recently but is still connected via WebSocket - keeping site ${staleSite.siteId} online`
|
||||
);
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -112,7 +112,7 @@ async function flushSitePingsToDb(): Promise<void> {
|
||||
|
||||
try {
|
||||
const newlyOnlineSites = await withRetry(async () => {
|
||||
// Only update sites that were offline — these are the
|
||||
// Only update sites that were offline - these are the
|
||||
// offline→online transitions. .returning() gives us exactly
|
||||
// the site IDs that changed state.
|
||||
const transitioned = await db
|
||||
@@ -249,7 +249,7 @@ async function flushClientPingsToDb(): Promise<void> {
|
||||
}
|
||||
|
||||
/**
|
||||
* Flush everything — called by the interval timer and during shutdown.
|
||||
* Flush everything - called by the interval timer and during shutdown.
|
||||
*/
|
||||
export async function flushPingsToDb(): Promise<void> {
|
||||
await flushSitePingsToDb();
|
||||
@@ -314,7 +314,7 @@ function isTransientError(error: any): boolean {
|
||||
return true;
|
||||
}
|
||||
|
||||
// PostgreSQL deadlock detected — always safe to retry (one winner guaranteed)
|
||||
// PostgreSQL deadlock detected - always safe to retry (one winner guaranteed)
|
||||
if (code === "40P01" || message.includes("deadlock")) {
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -249,7 +249,7 @@ export async function registerNewt(
|
||||
dateCreated: moment().toISOString()
|
||||
});
|
||||
|
||||
// Consume the provisioning key — cascade removes siteProvisioningKeyOrg
|
||||
// Consume the provisioning key - cascade removes siteProvisioningKeyOrg
|
||||
await trx
|
||||
.update(siteProvisioningKeys)
|
||||
.set({
|
||||
|
||||
@@ -211,7 +211,7 @@ export const handleOlmServerInitAddPeerHandshake: MessageHandler = async (
|
||||
continue;
|
||||
}
|
||||
|
||||
// Trigger the peer add handshake — if the peer was already added this will be a no-op
|
||||
// Trigger the peer add handshake - if the peer was already added this will be a no-op
|
||||
await initPeerAddHandshake(
|
||||
client.clientId,
|
||||
{
|
||||
@@ -236,4 +236,4 @@ export const handleOlmServerInitAddPeerHandshake: MessageHandler = async (
|
||||
}
|
||||
|
||||
return;
|
||||
};
|
||||
};
|
||||
|
||||
@@ -228,7 +228,7 @@ export async function createTarget(
|
||||
healthCheck = await db
|
||||
.insert(targetHealthCheck)
|
||||
.values({
|
||||
name: `${targetData.ip}:${targetData.port}`,
|
||||
orgId: resource.orgId,
|
||||
targetId: newTarget[0].targetId,
|
||||
hcEnabled: targetData.hcEnabled ?? false,
|
||||
hcPath: targetData.hcPath ?? null,
|
||||
|
||||
@@ -47,7 +47,7 @@ export const messageHandlers: Record<string, MessageHandler> = {
|
||||
"ws/round-trip/complete": handleRoundTripMessage
|
||||
};
|
||||
|
||||
// Start the ping accumulator for all builds — it batches per-site online/lastPing
|
||||
// Start the ping accumulator for all builds - it batches per-site online/lastPing
|
||||
// updates into periodic bulk writes, preventing connection pool exhaustion.
|
||||
startPingAccumulator();
|
||||
|
||||
|
||||
Reference in New Issue
Block a user