Compare commits

...

2 Commits

Author SHA1 Message Date
Owen
f213e81cde Reconnect newts when a exit node comes back online 2026-06-08 11:12:14 -07:00
Owen
7b6b1f0a4c Add exit node if the sites dont have one 2026-06-08 11:11:46 -07:00
5 changed files with 297 additions and 3 deletions

View File

@@ -0,0 +1,202 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025-2026 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.
* You may not use this file except in compliance with the License.
* Unauthorized use, copying, modification, or distribution is strictly prohibited.
*
* This file is not licensed under the AGPLv3.
*/
import axios from "axios";
import { db, exitNodes, newts, sites } from "@server/db";
import { eq } from "drizzle-orm";
import logger from "@server/logger";
import redisManager from "#private/lib/redis";
import { sendToClient } from "#private/routers/ws";
const INITIAL_DELAY_MS = 15 * 1000; // 15 seconds before first check
const CHECK_INTERVAL_MS = 10 * 1000; // Check every 10 seconds
const MAX_DURATION_MS = 5 * 60 * 1000; // Give up after 5 minutes
const REDIS_PENDING_SET = "exit-node-reconnect-pending";
const REDIS_HASH_PREFIX = "exit-node-reconnect:";
interface PendingReconnect {
startTime: number;
reachableAt: string;
}
// In-memory tracking for this node
const pendingReconnects = new Map<number, PendingReconnect>();
let schedulerInterval: NodeJS.Timeout | null = null;
/**
* Schedules a reconnect check for newts connected to the given exit node.
* Called when an exit node transitions from offline to online.
*/
export async function scheduleExitNodeReconnect(
exitNodeId: number,
reachableAt: string
): Promise<void> {
logger.info(
`Scheduling newt reconnect for exit node ${exitNodeId} (reachableAt: ${reachableAt})`
);
const entry: PendingReconnect = {
startTime: Date.now(),
reachableAt
};
pendingReconnects.set(exitNodeId, entry);
// Store in Redis if available for cross-node coordination
if (redisManager.isRedisEnabled()) {
await redisManager.sadd(REDIS_PENDING_SET, exitNodeId.toString());
await redisManager.hset(
`${REDIS_HASH_PREFIX}${exitNodeId}`,
"startTime",
entry.startTime.toString()
);
await redisManager.hset(
`${REDIS_HASH_PREFIX}${exitNodeId}`,
"reachableAt",
reachableAt
);
}
}
/**
* Starts the background interval that checks pending exit node reconnects.
*/
export function startExitNodeReconnectScheduler(): void {
if (schedulerInterval) {
return;
}
schedulerInterval = setInterval(async () => {
try {
await processPendingReconnects();
} catch (error) {
logger.error("Error in exit node reconnect scheduler", { error });
}
}, CHECK_INTERVAL_MS);
logger.debug("Started exit node reconnect scheduler");
}
async function processPendingReconnects(): Promise<void> {
// Merge in-memory and Redis-tracked pending reconnects
const toProcess = new Map(pendingReconnects);
if (redisManager.isRedisEnabled()) {
const redisIds = await redisManager.smembers(REDIS_PENDING_SET);
for (const idStr of redisIds) {
const id = parseInt(idStr, 10);
if (!toProcess.has(id)) {
const startTimeStr = await redisManager.hget(
`${REDIS_HASH_PREFIX}${id}`,
"startTime"
);
const reachableAt = await redisManager.hget(
`${REDIS_HASH_PREFIX}${id}`,
"reachableAt"
);
if (startTimeStr && reachableAt) {
toProcess.set(id, {
startTime: parseInt(startTimeStr, 10),
reachableAt
});
}
}
}
}
const now = Date.now();
for (const [exitNodeId, entry] of toProcess) {
const elapsed = now - entry.startTime;
// Give up after max duration
if (elapsed >= MAX_DURATION_MS) {
logger.warn(
`Exit node reconnect check timed out for exit node ${exitNodeId} after 5 minutes`
);
await removePending(exitNodeId);
continue;
}
// Respect initial delay
if (elapsed < INITIAL_DELAY_MS) {
continue;
}
// Check if the exit node HTTP endpoint is reachable
const pingUrl = `${entry.reachableAt}/ping`;
try {
await axios.get(pingUrl, { timeout: 5000 });
} catch {
logger.debug(
`Exit node ${exitNodeId} not yet reachable at ${pingUrl}`
);
continue;
}
// Node is reachable — send reconnect to all connected newts
logger.info(
`Exit node ${exitNodeId} is reachable. Sending newt/wg/reconnect to connected newts.`
);
await sendReconnectToNewts(exitNodeId);
await removePending(exitNodeId);
}
}
async function sendReconnectToNewts(exitNodeId: number): Promise<void> {
try {
const connectedNewts = await db
.select({ newtId: newts.newtId })
.from(newts)
.innerJoin(sites, eq(newts.siteId, sites.siteId))
.where(eq(sites.exitNodeId, exitNodeId));
if (connectedNewts.length === 0) {
logger.debug(
`No newts found for exit node ${exitNodeId}, nothing to reconnect`
);
return;
}
logger.info(
`Sending newt/wg/reconnect to ${connectedNewts.length} newt(s) for exit node ${exitNodeId}`
);
const reconnectMessage = {
type: "newt/wg/reconnect",
data: {}
};
await Promise.allSettled(
connectedNewts.map(({ newtId }) =>
sendToClient(newtId, reconnectMessage)
)
);
} catch (error) {
logger.error(
`Failed to send reconnect messages for exit node ${exitNodeId}`,
{ error }
);
}
}
async function removePending(exitNodeId: number): Promise<void> {
pendingReconnects.delete(exitNodeId);
if (redisManager.isRedisEnabled()) {
await redisManager.srem(REDIS_PENDING_SET, exitNodeId.toString());
await redisManager.del(`${REDIS_HASH_PREFIX}${exitNodeId}`);
}
}

View File

@@ -16,6 +16,7 @@ import { MessageHandler } from "@server/routers/ws";
import { RemoteExitNode } from "@server/db";
import { eq } from "drizzle-orm";
import logger from "@server/logger";
import { scheduleExitNodeReconnect } from "./exitNodeReconnectScheduler";
/**
* Handles ping messages from clients and responds with pong
@@ -37,6 +38,13 @@ export const handleRemoteExitNodePingMessage: MessageHandler = async (
}
try {
// Fetch the current state before updating so we can detect the offline→online transition
const [currentExitNode] = await db
.select({ online: exitNodes.online, reachableAt: exitNodes.reachableAt })
.from(exitNodes)
.where(eq(exitNodes.exitNodeId, remoteExitNode.exitNodeId))
.limit(1);
// Update the exit node's last ping timestamp
await db
.update(exitNodes)
@@ -45,6 +53,16 @@ export const handleRemoteExitNodePingMessage: MessageHandler = async (
online: true
})
.where(eq(exitNodes.exitNodeId, remoteExitNode.exitNodeId));
// If the exit node was offline and is now coming online, schedule newt reconnects
if (currentExitNode && !currentExitNode.online && currentExitNode.reachableAt) {
scheduleExitNodeReconnect(
remoteExitNode.exitNodeId,
currentExitNode.reachableAt
).catch((error) => {
logger.error("Failed to schedule exit node reconnect", { error });
});
}
} catch (error) {
logger.error("Error handling ping message", { error });
}

View File

@@ -22,3 +22,4 @@ export * from "./listRemoteExitNodes";
export * from "./pickRemoteExitNodeDefaults";
export * from "./quickStartRemoteExitNode";
export * from "./offlineChecker";
export * from "./exitNodeReconnectScheduler";

View File

@@ -14,7 +14,8 @@
import {
handleRemoteExitNodeRegisterMessage,
handleRemoteExitNodePingMessage,
startRemoteExitNodeOfflineChecker
startRemoteExitNodeOfflineChecker,
startExitNodeReconnectScheduler
} from "#private/routers/remoteExitNode";
import { MessageHandler } from "@server/routers/ws";
import { build } from "@server/build";
@@ -29,4 +30,5 @@ export const messageHandlers: Record<string, MessageHandler> = {
if (build != "saas") {
startRemoteExitNodeOfflineChecker(); // this is to handle the offline check for remote exit nodes
startExitNodeReconnectScheduler(); // check pending exit node reconnects and notify newts
}

View File

@@ -13,7 +13,7 @@ import {
import { olms } from "@server/db";
import HttpCode from "@server/types/HttpCode";
import response from "@server/lib/response";
import { and, eq, inArray } from "drizzle-orm";
import { and, count, eq, inArray } from "drizzle-orm";
import { NextFunction, Request, Response } from "express";
import createHttpError from "http-errors";
import { z } from "zod";
@@ -24,6 +24,7 @@ import {
EXPIRES
} from "@server/auth/sessions/olm";
import { getOrCreateCachedToken } from "#dynamic/lib/tokenCache";
import { listExitNodes } from "#dynamic/lib/exitNodes";
import { verifyPassword } from "@server/auth/password";
import logger from "@server/logger";
import config from "@server/lib/config";
@@ -150,6 +151,7 @@ export async function getOlmToken(
);
let clientIdToUse;
let orgIdToUse: string;
if (orgId) {
// we did provide the org
const [client] = await db
@@ -183,6 +185,7 @@ export async function getOlmToken(
}
clientIdToUse = client.clientId;
orgIdToUse = orgId;
} else {
if (!existingOlm.clientId) {
return next(
@@ -209,6 +212,7 @@ export async function getOlmToken(
}
clientIdToUse = client.clientId;
orgIdToUse = client.orgId;
}
// Get all exit nodes from sites where the client has peers
@@ -265,7 +269,7 @@ export async function getOlmToken(
}
}
const exitNodesHpData = allExitNodes.map((exitNode: ExitNode) => {
let exitNodesHpData = allExitNodes.map((exitNode: ExitNode) => {
return {
publicKey: exitNode.publicKey,
relayPort: config.getRawConfig().gerbil.clients_start_port,
@@ -274,6 +278,73 @@ export async function getOlmToken(
};
});
// If no exit nodes were found for the client's sites, fall back to
// finding an available node in the same region (as newt does on ping).
if (exitNodesHpData.length === 0) {
logger.debug(
`No exit nodes found for olm ${olmId} client sites; falling back to region node selection`
);
const fallbackNodes = await listExitNodes(orgIdToUse!, true);
const weightedNodes = await Promise.all(
fallbackNodes.map(async (node) => {
let weight = 1;
const maxConnections = node.maxConnections;
if (
maxConnections !== null &&
maxConnections !== undefined
) {
const [currentConnections] = await db
.select({ count: count() })
.from(sites)
.where(
and(
eq(sites.exitNodeId, node.exitNodeId),
eq(sites.online, true)
)
);
if (currentConnections.count >= maxConnections) {
return null;
}
weight =
(maxConnections - currentConnections.count) /
maxConnections;
}
return { node, weight };
})
);
const availableNodes = weightedNodes
.filter(
(
n
): n is {
node: (typeof fallbackNodes)[0];
weight: number;
} => n !== null
)
.sort((a, b) => b.weight - a.weight);
if (availableNodes.length > 0) {
const best = availableNodes[0].node;
exitNodesHpData = [
{
publicKey: best.publicKey,
relayPort:
config.getRawConfig().gerbil.clients_start_port,
endpoint: best.endpoint,
siteIds: []
// it should still HP without the site ids but it will get stuck in the client
// if a site is removed or something because its not tied to a site which is okay for the session
}
];
} else {
logger.warn(
`No available fallback exit nodes found for olm ${olmId}`
);
}
}
logger.debug("Token created successfully");
return response<{