mirror of
https://github.com/fosrl/pangolin.git
synced 2026-06-11 01:53:58 +00:00
Compare commits
2 Commits
dependabot
...
exit-node-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f213e81cde | ||
|
|
7b6b1f0a4c |
@@ -0,0 +1,202 @@
|
||||
/*
|
||||
* This file is part of a proprietary work.
|
||||
*
|
||||
* Copyright (c) 2025-2026 Fossorial, Inc.
|
||||
* All rights reserved.
|
||||
*
|
||||
* This file is licensed under the Fossorial Commercial License.
|
||||
* You may not use this file except in compliance with the License.
|
||||
* Unauthorized use, copying, modification, or distribution is strictly prohibited.
|
||||
*
|
||||
* This file is not licensed under the AGPLv3.
|
||||
*/
|
||||
|
||||
import axios from "axios";
|
||||
import { db, exitNodes, newts, sites } from "@server/db";
|
||||
import { eq } from "drizzle-orm";
|
||||
import logger from "@server/logger";
|
||||
import redisManager from "#private/lib/redis";
|
||||
import { sendToClient } from "#private/routers/ws";
|
||||
|
||||
const INITIAL_DELAY_MS = 15 * 1000; // 15 seconds before first check
|
||||
const CHECK_INTERVAL_MS = 10 * 1000; // Check every 10 seconds
|
||||
const MAX_DURATION_MS = 5 * 60 * 1000; // Give up after 5 minutes
|
||||
const REDIS_PENDING_SET = "exit-node-reconnect-pending";
|
||||
const REDIS_HASH_PREFIX = "exit-node-reconnect:";
|
||||
|
||||
interface PendingReconnect {
|
||||
startTime: number;
|
||||
reachableAt: string;
|
||||
}
|
||||
|
||||
// In-memory tracking for this node
|
||||
const pendingReconnects = new Map<number, PendingReconnect>();
|
||||
|
||||
let schedulerInterval: NodeJS.Timeout | null = null;
|
||||
|
||||
/**
|
||||
* Schedules a reconnect check for newts connected to the given exit node.
|
||||
* Called when an exit node transitions from offline to online.
|
||||
*/
|
||||
export async function scheduleExitNodeReconnect(
|
||||
exitNodeId: number,
|
||||
reachableAt: string
|
||||
): Promise<void> {
|
||||
logger.info(
|
||||
`Scheduling newt reconnect for exit node ${exitNodeId} (reachableAt: ${reachableAt})`
|
||||
);
|
||||
|
||||
const entry: PendingReconnect = {
|
||||
startTime: Date.now(),
|
||||
reachableAt
|
||||
};
|
||||
|
||||
pendingReconnects.set(exitNodeId, entry);
|
||||
|
||||
// Store in Redis if available for cross-node coordination
|
||||
if (redisManager.isRedisEnabled()) {
|
||||
await redisManager.sadd(REDIS_PENDING_SET, exitNodeId.toString());
|
||||
await redisManager.hset(
|
||||
`${REDIS_HASH_PREFIX}${exitNodeId}`,
|
||||
"startTime",
|
||||
entry.startTime.toString()
|
||||
);
|
||||
await redisManager.hset(
|
||||
`${REDIS_HASH_PREFIX}${exitNodeId}`,
|
||||
"reachableAt",
|
||||
reachableAt
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Starts the background interval that checks pending exit node reconnects.
|
||||
*/
|
||||
export function startExitNodeReconnectScheduler(): void {
|
||||
if (schedulerInterval) {
|
||||
return;
|
||||
}
|
||||
|
||||
schedulerInterval = setInterval(async () => {
|
||||
try {
|
||||
await processPendingReconnects();
|
||||
} catch (error) {
|
||||
logger.error("Error in exit node reconnect scheduler", { error });
|
||||
}
|
||||
}, CHECK_INTERVAL_MS);
|
||||
|
||||
logger.debug("Started exit node reconnect scheduler");
|
||||
}
|
||||
|
||||
async function processPendingReconnects(): Promise<void> {
|
||||
// Merge in-memory and Redis-tracked pending reconnects
|
||||
const toProcess = new Map(pendingReconnects);
|
||||
|
||||
if (redisManager.isRedisEnabled()) {
|
||||
const redisIds = await redisManager.smembers(REDIS_PENDING_SET);
|
||||
for (const idStr of redisIds) {
|
||||
const id = parseInt(idStr, 10);
|
||||
if (!toProcess.has(id)) {
|
||||
const startTimeStr = await redisManager.hget(
|
||||
`${REDIS_HASH_PREFIX}${id}`,
|
||||
"startTime"
|
||||
);
|
||||
const reachableAt = await redisManager.hget(
|
||||
`${REDIS_HASH_PREFIX}${id}`,
|
||||
"reachableAt"
|
||||
);
|
||||
if (startTimeStr && reachableAt) {
|
||||
toProcess.set(id, {
|
||||
startTime: parseInt(startTimeStr, 10),
|
||||
reachableAt
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const now = Date.now();
|
||||
|
||||
for (const [exitNodeId, entry] of toProcess) {
|
||||
const elapsed = now - entry.startTime;
|
||||
|
||||
// Give up after max duration
|
||||
if (elapsed >= MAX_DURATION_MS) {
|
||||
logger.warn(
|
||||
`Exit node reconnect check timed out for exit node ${exitNodeId} after 5 minutes`
|
||||
);
|
||||
await removePending(exitNodeId);
|
||||
continue;
|
||||
}
|
||||
|
||||
// Respect initial delay
|
||||
if (elapsed < INITIAL_DELAY_MS) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Check if the exit node HTTP endpoint is reachable
|
||||
const pingUrl = `${entry.reachableAt}/ping`;
|
||||
try {
|
||||
await axios.get(pingUrl, { timeout: 5000 });
|
||||
} catch {
|
||||
logger.debug(
|
||||
`Exit node ${exitNodeId} not yet reachable at ${pingUrl}`
|
||||
);
|
||||
continue;
|
||||
}
|
||||
|
||||
// Node is reachable — send reconnect to all connected newts
|
||||
logger.info(
|
||||
`Exit node ${exitNodeId} is reachable. Sending newt/wg/reconnect to connected newts.`
|
||||
);
|
||||
|
||||
await sendReconnectToNewts(exitNodeId);
|
||||
await removePending(exitNodeId);
|
||||
}
|
||||
}
|
||||
|
||||
async function sendReconnectToNewts(exitNodeId: number): Promise<void> {
|
||||
try {
|
||||
const connectedNewts = await db
|
||||
.select({ newtId: newts.newtId })
|
||||
.from(newts)
|
||||
.innerJoin(sites, eq(newts.siteId, sites.siteId))
|
||||
.where(eq(sites.exitNodeId, exitNodeId));
|
||||
|
||||
if (connectedNewts.length === 0) {
|
||||
logger.debug(
|
||||
`No newts found for exit node ${exitNodeId}, nothing to reconnect`
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
logger.info(
|
||||
`Sending newt/wg/reconnect to ${connectedNewts.length} newt(s) for exit node ${exitNodeId}`
|
||||
);
|
||||
|
||||
const reconnectMessage = {
|
||||
type: "newt/wg/reconnect",
|
||||
data: {}
|
||||
};
|
||||
|
||||
await Promise.allSettled(
|
||||
connectedNewts.map(({ newtId }) =>
|
||||
sendToClient(newtId, reconnectMessage)
|
||||
)
|
||||
);
|
||||
} catch (error) {
|
||||
logger.error(
|
||||
`Failed to send reconnect messages for exit node ${exitNodeId}`,
|
||||
{ error }
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
async function removePending(exitNodeId: number): Promise<void> {
|
||||
pendingReconnects.delete(exitNodeId);
|
||||
|
||||
if (redisManager.isRedisEnabled()) {
|
||||
await redisManager.srem(REDIS_PENDING_SET, exitNodeId.toString());
|
||||
await redisManager.del(`${REDIS_HASH_PREFIX}${exitNodeId}`);
|
||||
}
|
||||
}
|
||||
@@ -16,6 +16,7 @@ import { MessageHandler } from "@server/routers/ws";
|
||||
import { RemoteExitNode } from "@server/db";
|
||||
import { eq } from "drizzle-orm";
|
||||
import logger from "@server/logger";
|
||||
import { scheduleExitNodeReconnect } from "./exitNodeReconnectScheduler";
|
||||
|
||||
/**
|
||||
* Handles ping messages from clients and responds with pong
|
||||
@@ -37,6 +38,13 @@ export const handleRemoteExitNodePingMessage: MessageHandler = async (
|
||||
}
|
||||
|
||||
try {
|
||||
// Fetch the current state before updating so we can detect the offline→online transition
|
||||
const [currentExitNode] = await db
|
||||
.select({ online: exitNodes.online, reachableAt: exitNodes.reachableAt })
|
||||
.from(exitNodes)
|
||||
.where(eq(exitNodes.exitNodeId, remoteExitNode.exitNodeId))
|
||||
.limit(1);
|
||||
|
||||
// Update the exit node's last ping timestamp
|
||||
await db
|
||||
.update(exitNodes)
|
||||
@@ -45,6 +53,16 @@ export const handleRemoteExitNodePingMessage: MessageHandler = async (
|
||||
online: true
|
||||
})
|
||||
.where(eq(exitNodes.exitNodeId, remoteExitNode.exitNodeId));
|
||||
|
||||
// If the exit node was offline and is now coming online, schedule newt reconnects
|
||||
if (currentExitNode && !currentExitNode.online && currentExitNode.reachableAt) {
|
||||
scheduleExitNodeReconnect(
|
||||
remoteExitNode.exitNodeId,
|
||||
currentExitNode.reachableAt
|
||||
).catch((error) => {
|
||||
logger.error("Failed to schedule exit node reconnect", { error });
|
||||
});
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error("Error handling ping message", { error });
|
||||
}
|
||||
|
||||
@@ -22,3 +22,4 @@ export * from "./listRemoteExitNodes";
|
||||
export * from "./pickRemoteExitNodeDefaults";
|
||||
export * from "./quickStartRemoteExitNode";
|
||||
export * from "./offlineChecker";
|
||||
export * from "./exitNodeReconnectScheduler";
|
||||
|
||||
@@ -14,7 +14,8 @@
|
||||
import {
|
||||
handleRemoteExitNodeRegisterMessage,
|
||||
handleRemoteExitNodePingMessage,
|
||||
startRemoteExitNodeOfflineChecker
|
||||
startRemoteExitNodeOfflineChecker,
|
||||
startExitNodeReconnectScheduler
|
||||
} from "#private/routers/remoteExitNode";
|
||||
import { MessageHandler } from "@server/routers/ws";
|
||||
import { build } from "@server/build";
|
||||
@@ -29,4 +30,5 @@ export const messageHandlers: Record<string, MessageHandler> = {
|
||||
|
||||
if (build != "saas") {
|
||||
startRemoteExitNodeOfflineChecker(); // this is to handle the offline check for remote exit nodes
|
||||
startExitNodeReconnectScheduler(); // check pending exit node reconnects and notify newts
|
||||
}
|
||||
|
||||
@@ -13,7 +13,7 @@ import {
|
||||
import { olms } from "@server/db";
|
||||
import HttpCode from "@server/types/HttpCode";
|
||||
import response from "@server/lib/response";
|
||||
import { and, eq, inArray } from "drizzle-orm";
|
||||
import { and, count, eq, inArray } from "drizzle-orm";
|
||||
import { NextFunction, Request, Response } from "express";
|
||||
import createHttpError from "http-errors";
|
||||
import { z } from "zod";
|
||||
@@ -24,6 +24,7 @@ import {
|
||||
EXPIRES
|
||||
} from "@server/auth/sessions/olm";
|
||||
import { getOrCreateCachedToken } from "#dynamic/lib/tokenCache";
|
||||
import { listExitNodes } from "#dynamic/lib/exitNodes";
|
||||
import { verifyPassword } from "@server/auth/password";
|
||||
import logger from "@server/logger";
|
||||
import config from "@server/lib/config";
|
||||
@@ -150,6 +151,7 @@ export async function getOlmToken(
|
||||
);
|
||||
|
||||
let clientIdToUse;
|
||||
let orgIdToUse: string;
|
||||
if (orgId) {
|
||||
// we did provide the org
|
||||
const [client] = await db
|
||||
@@ -183,6 +185,7 @@ export async function getOlmToken(
|
||||
}
|
||||
|
||||
clientIdToUse = client.clientId;
|
||||
orgIdToUse = orgId;
|
||||
} else {
|
||||
if (!existingOlm.clientId) {
|
||||
return next(
|
||||
@@ -209,6 +212,7 @@ export async function getOlmToken(
|
||||
}
|
||||
|
||||
clientIdToUse = client.clientId;
|
||||
orgIdToUse = client.orgId;
|
||||
}
|
||||
|
||||
// Get all exit nodes from sites where the client has peers
|
||||
@@ -265,7 +269,7 @@ export async function getOlmToken(
|
||||
}
|
||||
}
|
||||
|
||||
const exitNodesHpData = allExitNodes.map((exitNode: ExitNode) => {
|
||||
let exitNodesHpData = allExitNodes.map((exitNode: ExitNode) => {
|
||||
return {
|
||||
publicKey: exitNode.publicKey,
|
||||
relayPort: config.getRawConfig().gerbil.clients_start_port,
|
||||
@@ -274,6 +278,73 @@ export async function getOlmToken(
|
||||
};
|
||||
});
|
||||
|
||||
// If no exit nodes were found for the client's sites, fall back to
|
||||
// finding an available node in the same region (as newt does on ping).
|
||||
if (exitNodesHpData.length === 0) {
|
||||
logger.debug(
|
||||
`No exit nodes found for olm ${olmId} client sites; falling back to region node selection`
|
||||
);
|
||||
const fallbackNodes = await listExitNodes(orgIdToUse!, true);
|
||||
|
||||
const weightedNodes = await Promise.all(
|
||||
fallbackNodes.map(async (node) => {
|
||||
let weight = 1;
|
||||
const maxConnections = node.maxConnections;
|
||||
if (
|
||||
maxConnections !== null &&
|
||||
maxConnections !== undefined
|
||||
) {
|
||||
const [currentConnections] = await db
|
||||
.select({ count: count() })
|
||||
.from(sites)
|
||||
.where(
|
||||
and(
|
||||
eq(sites.exitNodeId, node.exitNodeId),
|
||||
eq(sites.online, true)
|
||||
)
|
||||
);
|
||||
if (currentConnections.count >= maxConnections) {
|
||||
return null;
|
||||
}
|
||||
weight =
|
||||
(maxConnections - currentConnections.count) /
|
||||
maxConnections;
|
||||
}
|
||||
return { node, weight };
|
||||
})
|
||||
);
|
||||
|
||||
const availableNodes = weightedNodes
|
||||
.filter(
|
||||
(
|
||||
n
|
||||
): n is {
|
||||
node: (typeof fallbackNodes)[0];
|
||||
weight: number;
|
||||
} => n !== null
|
||||
)
|
||||
.sort((a, b) => b.weight - a.weight);
|
||||
|
||||
if (availableNodes.length > 0) {
|
||||
const best = availableNodes[0].node;
|
||||
exitNodesHpData = [
|
||||
{
|
||||
publicKey: best.publicKey,
|
||||
relayPort:
|
||||
config.getRawConfig().gerbil.clients_start_port,
|
||||
endpoint: best.endpoint,
|
||||
siteIds: []
|
||||
// it should still HP without the site ids but it will get stuck in the client
|
||||
// if a site is removed or something because its not tied to a site which is okay for the session
|
||||
}
|
||||
];
|
||||
} else {
|
||||
logger.warn(
|
||||
`No available fallback exit nodes found for olm ${olmId}`
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
logger.debug("Token created successfully");
|
||||
|
||||
return response<{
|
||||
|
||||
Reference in New Issue
Block a user