Merge branch 'dev' into feat/paginate-user-roles-table

This commit is contained in:
miloschwartz
2026-04-20 21:27:51 -07:00
362 changed files with 13561 additions and 4286 deletions

View File

@@ -140,7 +140,11 @@ export enum ActionsEnum {
exportLogs = "exportLogs",
listApprovals = "listApprovals",
updateApprovals = "updateApprovals",
signSshKey = "signSshKey"
signSshKey = "signSshKey",
createEventStreamingDestination = "createEventStreamingDestination",
updateEventStreamingDestination = "updateEventStreamingDestination",
deleteEventStreamingDestination = "deleteEventStreamingDestination",
listEventStreamingDestinations = "listEventStreamingDestinations"
}
export async function checkUserActionPermission(

View File

@@ -60,8 +60,7 @@ function createDb() {
})
);
} else {
const maxReplicaConnections =
poolConfig?.max_replica_connections || 20;
const maxReplicaConnections = poolConfig?.max_replica_connections || 20;
for (const conn of replicaConnections) {
const replicaPool = createPool(
conn.connection_string,
@@ -91,4 +90,5 @@ export default db;
export const primaryDb = db.$primary;
export type Transaction = Parameters<
Parameters<(typeof db)["transaction"]>[0]
>[0];
>[0];
export const DB_TYPE: "pg" | "sqlite" = "pg";

View File

@@ -8,7 +8,8 @@ import {
real,
text,
index,
primaryKey
primaryKey,
uniqueIndex
} from "drizzle-orm/pg-core";
import { InferSelectModel } from "drizzle-orm";
import {
@@ -417,6 +418,46 @@ export const siteProvisioningKeyOrg = pgTable(
]
);
export const eventStreamingDestinations = pgTable(
"eventStreamingDestinations",
{
destinationId: serial("destinationId").primaryKey(),
orgId: varchar("orgId", { length: 255 })
.notNull()
.references(() => orgs.orgId, { onDelete: "cascade" }),
sendConnectionLogs: boolean("sendConnectionLogs").notNull().default(false),
sendRequestLogs: boolean("sendRequestLogs").notNull().default(false),
sendActionLogs: boolean("sendActionLogs").notNull().default(false),
sendAccessLogs: boolean("sendAccessLogs").notNull().default(false),
type: varchar("type", { length: 50 }).notNull(), // e.g. "http", "kafka", etc.
config: text("config").notNull(), // JSON string with the configuration for the destination
enabled: boolean("enabled").notNull().default(true),
createdAt: bigint("createdAt", { mode: "number" }).notNull(),
updatedAt: bigint("updatedAt", { mode: "number" }).notNull()
}
);
export const eventStreamingCursors = pgTable(
"eventStreamingCursors",
{
cursorId: serial("cursorId").primaryKey(),
destinationId: integer("destinationId")
.notNull()
.references(() => eventStreamingDestinations.destinationId, {
onDelete: "cascade"
}),
logType: varchar("logType", { length: 50 }).notNull(), // "request" | "action" | "access" | "connection"
lastSentId: bigint("lastSentId", { mode: "number" }).notNull().default(0),
lastSentAt: bigint("lastSentAt", { mode: "number" }) // epoch milliseconds, null if never sent
},
(table) => [
uniqueIndex("idx_eventStreamingCursors_dest_type").on(
table.destinationId,
table.logType
)
]
);
export type Approval = InferSelectModel<typeof approvals>;
export type Limit = InferSelectModel<typeof limits>;
export type Account = InferSelectModel<typeof account>;
@@ -439,3 +480,18 @@ export type LoginPageBranding = InferSelectModel<typeof loginPageBranding>;
export type ActionAuditLog = InferSelectModel<typeof actionAuditLog>;
export type AccessAuditLog = InferSelectModel<typeof accessAuditLog>;
export type ConnectionAuditLog = InferSelectModel<typeof connectionAuditLog>;
export type SessionTransferToken = InferSelectModel<
typeof sessionTransferToken
>;
export type BannedEmail = InferSelectModel<typeof bannedEmails>;
export type BannedIp = InferSelectModel<typeof bannedIps>;
export type SiteProvisioningKey = InferSelectModel<typeof siteProvisioningKeys>;
export type SiteProvisioningKeyOrg = InferSelectModel<
typeof siteProvisioningKeyOrg
>;
export type EventStreamingDestination = InferSelectModel<
typeof eventStreamingDestinations
>;
export type EventStreamingCursor = InferSelectModel<
typeof eventStreamingCursors
>;

View File

@@ -101,7 +101,7 @@ export const sites = pgTable("sites", {
lastHolePunch: bigint("lastHolePunch", { mode: "number" }),
listenPort: integer("listenPort"),
dockerSocketEnabled: boolean("dockerSocketEnabled").notNull().default(true),
status: varchar("status").$type<"pending" | "approved">()
status: varchar("status").$type<"pending" | "approved">().default("approved")
});
export const resources = pgTable("resources", {
@@ -1080,6 +1080,7 @@ export type ResourceWhitelist = InferSelectModel<typeof resourceWhitelist>;
export type VersionMigration = InferSelectModel<typeof versionMigrations>;
export type ResourceRule = InferSelectModel<typeof resourceRules>;
export type Domain = InferSelectModel<typeof domains>;
export type DnsRecord = InferSelectModel<typeof dnsRecords>;
export type SupporterKey = InferSelectModel<typeof supporterKey>;
export type Idp = InferSelectModel<typeof idp>;
export type ApiKey = InferSelectModel<typeof apiKeys>;

View File

@@ -23,7 +23,8 @@ export default db;
export const primaryDb = db;
export type Transaction = Parameters<
Parameters<(typeof db)["transaction"]>[0]
>[0];
>[0];
export const DB_TYPE: "pg" | "sqlite" = "sqlite";
function checkFileExists(filePath: string): boolean {
try {

View File

@@ -5,9 +5,19 @@ import {
primaryKey,
real,
sqliteTable,
text
text,
uniqueIndex
} from "drizzle-orm/sqlite-core";
import { clients, domains, exitNodes, orgs, sessions, siteResources, sites, users } from "./schema";
import {
clients,
domains,
exitNodes,
orgs,
sessions,
siteResources,
sites,
users
} from "./schema";
export const certificates = sqliteTable("certificates", {
certId: integer("certId").primaryKey({ autoIncrement: true }),
@@ -401,6 +411,50 @@ export const siteProvisioningKeyOrg = sqliteTable(
]
);
export const eventStreamingDestinations = sqliteTable(
"eventStreamingDestinations",
{
destinationId: integer("destinationId").primaryKey({
autoIncrement: true
}),
orgId: text("orgId")
.notNull()
.references(() => orgs.orgId, { onDelete: "cascade" }),
sendConnectionLogs: integer("sendConnectionLogs", { mode: "boolean" }).notNull().default(false),
sendRequestLogs: integer("sendRequestLogs", { mode: "boolean" }).notNull().default(false),
sendActionLogs: integer("sendActionLogs", { mode: "boolean" }).notNull().default(false),
sendAccessLogs: integer("sendAccessLogs", { mode: "boolean" }).notNull().default(false),
type: text("type").notNull(), // e.g. "http", "kafka", etc.
config: text("config").notNull(), // JSON string with the configuration for the destination
enabled: integer("enabled", { mode: "boolean" })
.notNull()
.default(true),
createdAt: integer("createdAt").notNull(),
updatedAt: integer("updatedAt").notNull()
}
);
export const eventStreamingCursors = sqliteTable(
"eventStreamingCursors",
{
cursorId: integer("cursorId").primaryKey({ autoIncrement: true }),
destinationId: integer("destinationId")
.notNull()
.references(() => eventStreamingDestinations.destinationId, {
onDelete: "cascade"
}),
logType: text("logType").notNull(), // "request" | "action" | "access" | "connection"
lastSentId: integer("lastSentId").notNull().default(0),
lastSentAt: integer("lastSentAt") // epoch milliseconds, null if never sent
},
(table) => [
uniqueIndex("idx_eventStreamingCursors_dest_type").on(
table.destinationId,
table.logType
)
]
);
export type Approval = InferSelectModel<typeof approvals>;
export type Limit = InferSelectModel<typeof limits>;
export type Account = InferSelectModel<typeof account>;
@@ -423,3 +477,12 @@ export type LoginPageBranding = InferSelectModel<typeof loginPageBranding>;
export type ActionAuditLog = InferSelectModel<typeof actionAuditLog>;
export type AccessAuditLog = InferSelectModel<typeof accessAuditLog>;
export type ConnectionAuditLog = InferSelectModel<typeof connectionAuditLog>;
export type BannedEmail = InferSelectModel<typeof bannedEmails>;
export type BannedIp = InferSelectModel<typeof bannedIps>;
export type SiteProvisioningKey = InferSelectModel<typeof siteProvisioningKeys>;
export type EventStreamingDestination = InferSelectModel<
typeof eventStreamingDestinations
>;
export type EventStreamingCursor = InferSelectModel<
typeof eventStreamingCursors
>;

View File

@@ -111,7 +111,7 @@ export const sites = sqliteTable("sites", {
dockerSocketEnabled: integer("dockerSocketEnabled", { mode: "boolean" })
.notNull()
.default(true),
status: text("status").$type<"pending" | "approved">()
status: text("status").$type<"pending" | "approved">().default("approved")
});
export const resources = sqliteTable("resources", {

View File

@@ -9,8 +9,8 @@ export type LicensePriceSet = {
export const licensePriceSet: LicensePriceSet = {
// Free license matches the freeLimitSet
[LicenseId.SMALL_LICENSE]: "price_1SxKHiD3Ee2Ir7WmvtEh17A8",
[LicenseId.BIG_LICENSE]: "price_1SxKHiD3Ee2Ir7WmMUiP0H6Y"
[LicenseId.SMALL_LICENSE]: "price_1TMJzmD3Ee2Ir7Wm05NlGImT",
[LicenseId.BIG_LICENSE]: "price_1TMJzzD3Ee2Ir7WmzJw9TerS"
};
export const licensePriceSetSandbox: LicensePriceSet = {

View File

@@ -18,7 +18,9 @@ export enum TierFeature {
AutoProvisioning = "autoProvisioning", // handle downgrade by disabling auto provisioning
SshPam = "sshPam",
FullRbac = "fullRbac",
SiteProvisioningKeys = "siteProvisioningKeys" // handle downgrade by revoking keys if needed
SiteProvisioningKeys = "siteProvisioningKeys", // handle downgrade by revoking keys if needed
SIEM = "siem", // handle downgrade by disabling SIEM integrations
DomainNamespaces = "domainNamespaces" // handle downgrade by removing custom domain namespaces
}
export const tierMatrix: Record<TierFeature, Tier[]> = {
@@ -54,5 +56,7 @@ export const tierMatrix: Record<TierFeature, Tier[]> = {
[TierFeature.AutoProvisioning]: ["tier1", "tier3", "enterprise"],
[TierFeature.SshPam]: ["tier1", "tier3", "enterprise"],
[TierFeature.FullRbac]: ["tier1", "tier2", "tier3", "enterprise"],
[TierFeature.SiteProvisioningKeys]: ["enterprise"]
[TierFeature.SiteProvisioningKeys]: ["tier3", "enterprise"],
[TierFeature.SIEM]: ["enterprise"],
[TierFeature.DomainNamespaces]: ["tier1", "tier2", "tier3", "enterprise"]
};

View File

@@ -591,7 +591,7 @@ export function generateSubnetProxyTargetV2(
pubKey: string | null;
subnet: string | null;
}[]
): SubnetProxyTargetV2 | undefined {
): SubnetProxyTargetV2[] | undefined {
if (clients.length === 0) {
logger.debug(
`No clients have access to site resource ${siteResource.siteResourceId}, skipping target generation.`
@@ -599,7 +599,7 @@ export function generateSubnetProxyTargetV2(
return;
}
let target: SubnetProxyTargetV2 | null = null;
let targets: SubnetProxyTargetV2[] = [];
const portRange = [
...parsePortRangeString(siteResource.tcpPortRangeString, "tcp"),
@@ -614,52 +614,54 @@ export function generateSubnetProxyTargetV2(
if (ipSchema.safeParse(destination).success) {
destination = `${destination}/32`;
target = {
targets.push({
sourcePrefixes: [],
destPrefix: destination,
portRange,
disableIcmp,
resourceId: siteResource.siteResourceId,
};
resourceId: siteResource.siteResourceId
});
}
if (siteResource.alias && siteResource.aliasAddress) {
// also push a match for the alias address
target = {
targets.push({
sourcePrefixes: [],
destPrefix: `${siteResource.aliasAddress}/32`,
rewriteTo: destination,
portRange,
disableIcmp,
resourceId: siteResource.siteResourceId,
};
resourceId: siteResource.siteResourceId
});
}
} else if (siteResource.mode == "cidr") {
target = {
targets.push({
sourcePrefixes: [],
destPrefix: siteResource.destination,
portRange,
disableIcmp,
resourceId: siteResource.siteResourceId,
};
resourceId: siteResource.siteResourceId
});
}
if (!target) {
if (targets.length == 0) {
return;
}
for (const clientSite of clients) {
if (!clientSite.subnet) {
logger.debug(
`Client ${clientSite.clientId} has no subnet, skipping for site resource ${siteResource.siteResourceId}.`
);
continue;
for (const target of targets) {
for (const clientSite of clients) {
if (!clientSite.subnet) {
logger.debug(
`Client ${clientSite.clientId} has no subnet, skipping for site resource ${siteResource.siteResourceId}.`
);
continue;
}
const clientPrefix = `${clientSite.subnet.split("/")[0]}/32`;
// add client prefix to source prefixes
target.sourcePrefixes.push(clientPrefix);
}
const clientPrefix = `${clientSite.subnet.split("/")[0]}/32`;
// add client prefix to source prefixes
target.sourcePrefixes.push(clientPrefix);
}
// print a nice representation of the targets
@@ -667,36 +669,34 @@ export function generateSubnetProxyTargetV2(
// `Generated subnet proxy targets for: ${JSON.stringify(targets, null, 2)}`
// );
return target;
return targets;
}
/**
* Converts a SubnetProxyTargetV2 to an array of SubnetProxyTarget (v1)
* by expanding each source prefix into its own target entry.
* @param targetV2 - The v2 target to convert
* @returns Array of v1 SubnetProxyTarget objects
*/
export function convertSubnetProxyTargetsV2ToV1(
targetsV2: SubnetProxyTargetV2[]
): SubnetProxyTarget[] {
return targetsV2.flatMap((targetV2) =>
targetV2.sourcePrefixes.map((sourcePrefix) => ({
sourcePrefix,
destPrefix: targetV2.destPrefix,
...(targetV2.disableIcmp !== undefined && {
disableIcmp: targetV2.disableIcmp
}),
...(targetV2.rewriteTo !== undefined && {
rewriteTo: targetV2.rewriteTo
}),
...(targetV2.portRange !== undefined && {
portRange: targetV2.portRange
})
}))
);
}
export function convertSubnetProxyTargetsV2ToV1(
targetsV2: SubnetProxyTargetV2[]
): SubnetProxyTarget[] {
return targetsV2.flatMap((targetV2) =>
targetV2.sourcePrefixes.map((sourcePrefix) => ({
sourcePrefix,
destPrefix: targetV2.destPrefix,
...(targetV2.disableIcmp !== undefined && {
disableIcmp: targetV2.disableIcmp
}),
...(targetV2.rewriteTo !== undefined && {
rewriteTo: targetV2.rewriteTo
}),
...(targetV2.portRange !== undefined && {
portRange: targetV2.portRange
})
}))
);
}
// Custom schema for validating port range strings
// Format: "80,443,8000-9000" or "*" for all ports, or empty string

View File

@@ -1,3 +1,5 @@
// Normalizes
/**
* Normalizes a post-authentication path for safe use when building redirect URLs.
* Returns a path that starts with / and does not allow open redirects (no //, no :).

View File

@@ -661,16 +661,16 @@ async function handleSubnetProxyTargetUpdates(
);
if (addedClients.length > 0) {
const targetToAdd = generateSubnetProxyTargetV2(
const targetsToAdd = generateSubnetProxyTargetV2(
siteResource,
addedClients
);
if (targetToAdd) {
if (targetsToAdd) {
proxyJobs.push(
addSubnetProxyTargets(
newt.newtId,
[targetToAdd],
targetsToAdd,
newt.version
)
);
@@ -698,16 +698,16 @@ async function handleSubnetProxyTargetUpdates(
);
if (removedClients.length > 0) {
const targetToRemove = generateSubnetProxyTargetV2(
const targetsToRemove = generateSubnetProxyTargetV2(
siteResource,
removedClients
);
if (targetToRemove) {
if (targetsToRemove) {
proxyJobs.push(
removeSubnetProxyTargets(
newt.newtId,
[targetToRemove],
targetsToRemove,
newt.version
)
);
@@ -1164,7 +1164,7 @@ async function handleMessagesForClientResources(
}
for (const resource of resources) {
const target = generateSubnetProxyTargetV2(resource, [
const targets = generateSubnetProxyTargetV2(resource, [
{
clientId: client.clientId,
pubKey: client.pubKey,
@@ -1172,11 +1172,11 @@ async function handleMessagesForClientResources(
}
]);
if (target) {
if (targets) {
proxyJobs.push(
addSubnetProxyTargets(
newt.newtId,
[target],
targets,
newt.version
)
);
@@ -1241,7 +1241,7 @@ async function handleMessagesForClientResources(
}
for (const resource of resources) {
const target = generateSubnetProxyTargetV2(resource, [
const targets = generateSubnetProxyTargetV2(resource, [
{
clientId: client.clientId,
pubKey: client.pubKey,
@@ -1249,11 +1249,11 @@ async function handleMessagesForClientResources(
}
]);
if (target) {
if (targets) {
proxyJobs.push(
removeSubnetProxyTargets(
newt.newtId,
[target],
targets,
newt.version
)
);

View File

@@ -1,3 +1,5 @@
// Sanitizes
/**
* Sanitize a string field before inserting into a database TEXT column.
*
@@ -37,4 +39,4 @@ export function sanitizeString(
// Strip null bytes, C0 control chars (except HT/LF/CR), and DEL.
.replace(/[\x00-\x08\x0B\x0C\x0E-\x1F\x7F]/g, "")
);
}
}

View File

@@ -1,3 +1,5 @@
// tokenCache
/**
* Returns a cached plaintext token from Redis if one exists and decrypts
* cleanly, otherwise calls `createSession` to mint a fresh token, stores the

View File

@@ -19,6 +19,7 @@ export class TraefikConfigManager {
private timeoutId: NodeJS.Timeout | null = null;
private lastCertificateFetch: Date | null = null;
private lastKnownDomains = new Set<string>();
private pendingDeletion = new Map<string, number>(); // domain -> cycles remaining before delete
private lastLocalCertificateState = new Map<
string,
{
@@ -1004,33 +1005,62 @@ export class TraefikConfigManager {
const dirName = dirent.name;
// Only delete if NO current domain is exactly the same or ends with `.${dirName}`
const shouldDelete = !Array.from(currentActiveDomains).some(
const isUnused = !Array.from(currentActiveDomains).some(
(domain) =>
domain === dirName || domain.endsWith(`.${dirName}`)
);
if (shouldDelete) {
const domainDir = path.join(certsPath, dirName);
logger.info(
`Cleaning up unused certificate directory: ${dirName}`
);
fs.rmSync(domainDir, { recursive: true, force: true });
// Remove from local state tracking
this.lastLocalCertificateState.delete(dirName);
// Remove from dynamic config
const certFilePath = path.join(domainDir, "cert.pem");
const keyFilePath = path.join(domainDir, "key.pem");
const before = dynamicConfig.tls.certificates.length;
dynamicConfig.tls.certificates =
dynamicConfig.tls.certificates.filter(
(entry: any) =>
entry.certFile !== certFilePath &&
entry.keyFile !== keyFilePath
if (!isUnused) {
// Domain is still active — remove from pending deletion if it was queued
if (this.pendingDeletion.has(dirName)) {
logger.info(
`Certificate ${dirName} is active again, cancelling pending deletion`
);
if (dynamicConfig.tls.certificates.length !== before) {
configChanged = true;
this.pendingDeletion.delete(dirName);
}
continue;
}
// Domain is unused — add to pending deletion or decrement its counter
if (!this.pendingDeletion.has(dirName)) {
const graceCycles = 3;
logger.info(
`Certificate ${dirName} is no longer in use. Will delete after ${graceCycles} more cycles.`
);
this.pendingDeletion.set(dirName, graceCycles);
} else {
const remaining = this.pendingDeletion.get(dirName)! - 1;
if (remaining > 0) {
logger.info(
`Certificate ${dirName} pending deletion: ${remaining} cycle(s) remaining`
);
this.pendingDeletion.set(dirName, remaining);
} else {
// Grace period expired — actually delete now
this.pendingDeletion.delete(dirName);
const domainDir = path.join(certsPath, dirName);
logger.info(
`Cleaning up unused certificate directory: ${dirName}`
);
fs.rmSync(domainDir, { recursive: true, force: true });
// Remove from local state tracking
this.lastLocalCertificateState.delete(dirName);
// Remove from dynamic config
const certFilePath = path.join(domainDir, "cert.pem");
const keyFilePath = path.join(domainDir, "key.pem");
const before = dynamicConfig.tls.certificates.length;
dynamicConfig.tls.certificates =
dynamicConfig.tls.certificates.filter(
(entry: any) =>
entry.certFile !== certFilePath &&
entry.keyFile !== keyFilePath
);
if (dynamicConfig.tls.certificates.length !== before) {
configChanged = true;
}
}
}
}

View File

@@ -479,10 +479,7 @@ export async function getTraefikConfig(
// TODO: HOW TO HANDLE ^^^^^^ BETTER
const anySitesOnline = targets.some(
(target) =>
target.site.online ||
target.site.type === "local" ||
target.site.type === "wireguard"
(target) => target.site.online
);
return (
@@ -495,7 +492,7 @@ export async function getTraefikConfig(
if (target.health == "unhealthy") {
return false;
}
// If any sites are online, exclude offline sites
if (anySitesOnline && !target.site.online) {
return false;
@@ -610,10 +607,7 @@ export async function getTraefikConfig(
servers: (() => {
// Check if any sites are online
const anySitesOnline = targets.some(
(target) =>
target.site.online ||
target.site.type === "local" ||
target.site.type === "wireguard"
(target) => target.site.online
);
return targets
@@ -621,7 +615,7 @@ export async function getTraefikConfig(
if (!target.enabled) {
return false;
}
// If any sites are online, exclude offline sites
if (anySitesOnline && !target.site.online) {
return false;

View File

@@ -1,7 +1,7 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025 Fossorial, Inc.
* Copyright (c) 2025-2026 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.

View File

@@ -1,7 +1,7 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025 Fossorial, Inc.
* Copyright (c) 2025-2026 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.
@@ -12,6 +12,7 @@
*/
import { rateLimitService } from "#private/lib/rateLimit";
import { logStreamingManager } from "#private/lib/logStreaming";
import { cleanup as wsCleanup } from "#private/routers/ws";
import { flushBandwidthToDb } from "@server/routers/newt/handleReceiveBandwidthMessage";
import { flushConnectionLogToDb } from "#private/routers/newt";
@@ -25,6 +26,7 @@ async function cleanup() {
await flushSiteBandwidthToDb();
await rateLimitService.cleanup();
await wsCleanup();
await logStreamingManager.shutdown();
process.exit(0);
}

View File

@@ -1,7 +1,7 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025 Fossorial, Inc.
* Copyright (c) 2025-2026 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.

View File

@@ -1,7 +1,7 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025 Fossorial, Inc.
* Copyright (c) 2025-2026 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.

View File

@@ -1,7 +1,7 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025 Fossorial, Inc.
* Copyright (c) 2025-2026 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.

View File

@@ -1,7 +1,7 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025 Fossorial, Inc.
* Copyright (c) 2025-2026 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.

View File

@@ -1,7 +1,7 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025 Fossorial, Inc.
* Copyright (c) 2025-2026 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.

View File

@@ -1,7 +1,7 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025 Fossorial, Inc.
* Copyright (c) 2025-2026 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.

View File

@@ -1,7 +1,7 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025 Fossorial, Inc.
* Copyright (c) 2025-2026 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.

View File

@@ -1,7 +1,7 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025 Fossorial, Inc.
* Copyright (c) 2025-2026 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.

View File

@@ -1,7 +1,7 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025 Fossorial, Inc.
* Copyright (c) 2025-2026 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.

View File

@@ -1,7 +1,7 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025 Fossorial, Inc.
* Copyright (c) 2025-2026 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.

View File

@@ -1,7 +1,7 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025 Fossorial, Inc.
* Copyright (c) 2025-2026 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.

View File

@@ -1,7 +1,7 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025 Fossorial, Inc.
* Copyright (c) 2025-2026 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.

View File

@@ -1,7 +1,7 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025 Fossorial, Inc.
* Copyright (c) 2025-2026 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.

View File

@@ -1,7 +1,7 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025 Fossorial, Inc.
* Copyright (c) 2025-2026 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.

View File

@@ -1,7 +1,7 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025 Fossorial, Inc.
* Copyright (c) 2025-2026 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.

View File

@@ -0,0 +1,234 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025-2026 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.
* You may not use this file except in compliance with the License.
* Unauthorized use, copying, modification, or distribution is strictly prohibited.
*
* This file is not licensed under the AGPLv3.
*/
import { logsDb, connectionAuditLog } from "@server/db";
import logger from "@server/logger";
import { and, eq, lt } from "drizzle-orm";
import { calculateCutoffTimestamp } from "@server/lib/cleanupLogs";
// ---------------------------------------------------------------------------
// Retry configuration for deadlock handling
// ---------------------------------------------------------------------------
const MAX_RETRIES = 3;
const BASE_DELAY_MS = 50;
// ---------------------------------------------------------------------------
// Buffer / flush configuration
// ---------------------------------------------------------------------------
/** How often to flush accumulated connection log data to the database. */
const FLUSH_INTERVAL_MS = 30_000; // 30 seconds
/** Maximum number of records to buffer before forcing a flush. */
const MAX_BUFFERED_RECORDS = 500;
/** Maximum number of records to insert in a single database batch. */
const INSERT_BATCH_SIZE = 100;
// ---------------------------------------------------------------------------
// Types
// ---------------------------------------------------------------------------
export interface ConnectionLogRecord {
sessionId: string;
siteResourceId: number;
orgId: string;
siteId: number;
clientId: number | null;
userId: string | null;
sourceAddr: string;
destAddr: string;
protocol: string;
startedAt: number; // epoch seconds
endedAt: number | null;
bytesTx: number | null;
bytesRx: number | null;
}
// ---------------------------------------------------------------------------
// In-memory buffer
// ---------------------------------------------------------------------------
let buffer: ConnectionLogRecord[] = [];
// ---------------------------------------------------------------------------
// Deadlock helpers
// ---------------------------------------------------------------------------
function isDeadlockError(error: any): boolean {
return (
error?.code === "40P01" ||
error?.cause?.code === "40P01" ||
(error?.message && error.message.includes("deadlock"))
);
}
async function withDeadlockRetry<T>(
operation: () => Promise<T>,
context: string
): Promise<T> {
let attempt = 0;
while (true) {
try {
return await operation();
} catch (error: any) {
if (isDeadlockError(error) && attempt < MAX_RETRIES) {
attempt++;
const baseDelay = Math.pow(2, attempt - 1) * BASE_DELAY_MS;
const jitter = Math.random() * baseDelay;
const delay = baseDelay + jitter;
logger.warn(
`Deadlock detected in ${context}, retrying attempt ${attempt}/${MAX_RETRIES} after ${delay.toFixed(0)}ms`
);
await new Promise((resolve) => setTimeout(resolve, delay));
continue;
}
throw error;
}
}
}
// ---------------------------------------------------------------------------
// Flush
// ---------------------------------------------------------------------------
/**
* Flush all buffered connection log records to the database.
*
* Swaps out the buffer before writing so that any records added during the
* flush are captured in the new buffer rather than being lost. Entries that
* fail to write are re-queued back into the buffer so they will be retried
* on the next flush.
*
* This function is exported so that the application's graceful-shutdown
* cleanup handler can call it before the process exits.
*/
export async function flushConnectionLogToDb(): Promise<void> {
if (buffer.length === 0) {
return;
}
// Atomically swap out the buffer so new data keeps flowing in
const snapshot = buffer;
buffer = [];
logger.debug(
`Flushing ${snapshot.length} connection log record(s) to the database`
);
for (let i = 0; i < snapshot.length; i += INSERT_BATCH_SIZE) {
const batch = snapshot.slice(i, i + INSERT_BATCH_SIZE);
try {
await withDeadlockRetry(async () => {
await logsDb.insert(connectionAuditLog).values(batch);
}, `flush connection log batch (${batch.length} records)`);
} catch (error) {
logger.error(
`Failed to flush connection log batch of ${batch.length} records:`,
error
);
// Re-queue the failed batch so it is retried on the next flush
buffer = [...batch, ...buffer];
// Cap buffer to prevent unbounded growth if the DB is unreachable
const hardLimit = MAX_BUFFERED_RECORDS * 5;
if (buffer.length > hardLimit) {
const dropped = buffer.length - hardLimit;
buffer = buffer.slice(0, hardLimit);
logger.warn(
`Connection log buffer overflow, dropped ${dropped} oldest records`
);
}
// Stop processing further batches from this snapshot — they will
// be picked up via the re-queued records on the next flush.
const remaining = snapshot.slice(i + INSERT_BATCH_SIZE);
if (remaining.length > 0) {
buffer = [...remaining, ...buffer];
}
break;
}
}
}
// ---------------------------------------------------------------------------
// Periodic flush timer
// ---------------------------------------------------------------------------
const flushTimer = setInterval(async () => {
try {
await flushConnectionLogToDb();
} catch (error) {
logger.error(
"Unexpected error during periodic connection log flush:",
error
);
}
}, FLUSH_INTERVAL_MS);
// Calling unref() means this timer will not keep the Node.js event loop alive
// on its own — the process can still exit normally when there is no other work
// left. The graceful-shutdown path will call flushConnectionLogToDb() explicitly
// before process.exit(), so no data is lost.
flushTimer.unref();
// ---------------------------------------------------------------------------
// Cleanup
// ---------------------------------------------------------------------------
export async function cleanUpOldLogs(
orgId: string,
retentionDays: number
): Promise<void> {
const cutoffTimestamp = calculateCutoffTimestamp(retentionDays);
try {
await logsDb
.delete(connectionAuditLog)
.where(
and(
lt(connectionAuditLog.startedAt, cutoffTimestamp),
eq(connectionAuditLog.orgId, orgId)
)
);
} catch (error) {
logger.error("Error cleaning up old connection audit logs:", error);
}
}
// ---------------------------------------------------------------------------
// Public logging entry-point
// ---------------------------------------------------------------------------
/**
* Buffer a single connection log record for eventual persistence.
*
* Records are written to the database in batches either when the buffer
* reaches MAX_BUFFERED_RECORDS or when the periodic flush timer fires.
*/
export function logConnectionAudit(record: ConnectionLogRecord): void {
buffer.push(record);
if (buffer.length >= MAX_BUFFERED_RECORDS) {
// Fire and forget — errors are handled inside flushConnectionLogToDb
flushConnectionLogToDb().catch((error) => {
logger.error(
"Unexpected error during size-triggered connection log flush:",
error
);
});
}
}

View File

@@ -0,0 +1,776 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025-2026 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.
* You may not use this file except in compliance with the License.
* Unauthorized use, copying, modification, or distribution is strictly prohibited.
*
* This file is not licensed under the AGPLv3.
*/
import {
db,
logsDb,
eventStreamingDestinations,
eventStreamingCursors,
requestAuditLog,
actionAuditLog,
accessAuditLog,
connectionAuditLog
} from "@server/db";
import logger from "@server/logger";
import { and, eq, gt, desc, max, sql } from "drizzle-orm";
import { decrypt } from "@server/lib/crypto";
import config from "@server/lib/config";
import {
LogType,
LOG_TYPES,
LogEvent,
DestinationFailureState,
HttpConfig
} from "./types";
import { LogDestinationProvider } from "./providers/LogDestinationProvider";
import { HttpLogDestination } from "./providers/HttpLogDestination";
import type { EventStreamingDestination } from "@server/db";
// ---------------------------------------------------------------------------
// Configuration
// ---------------------------------------------------------------------------
/**
* How often (ms) the manager polls all destinations for new log records.
* Destinations that were behind (full batch returned) will be re-polled
* immediately without waiting for this interval.
*/
const POLL_INTERVAL_MS = 30_000;
/**
* Maximum number of log records fetched from the DB in a single query.
* This also controls the maximum size of one HTTP POST body.
*/
const BATCH_SIZE = 250;
/**
* Minimum delay (ms) between consecutive HTTP requests to the same destination
* during a catch-up run. Prevents bursting thousands of requests back-to-back
* when a destination has fallen behind.
*/
const INTER_BATCH_DELAY_MS = 100;
/**
* Maximum number of consecutive back-to-back batches to process for a single
* destination per poll cycle. After this limit the destination will wait for
* the next scheduled poll before continuing, giving other destinations a turn.
*/
const MAX_CATCHUP_BATCHES = 20;
/**
* Back-off schedule (ms) indexed by consecutive failure count.
* After the last entry the max value is re-used.
*/
const BACKOFF_SCHEDULE_MS = [
60_000, // 1 min (failure 1)
2 * 60_000, // 2 min (failure 2)
5 * 60_000, // 5 min (failure 3)
10 * 60_000, // 10 min (failure 4)
30 * 60_000 // 30 min (failure 5+)
];
/**
* If a destination has been continuously unreachable for this long, its
* cursors are advanced to the current max row id and the backlog is silently
* discarded. This prevents unbounded queue growth when a webhook endpoint is
* down for an extended period. A prominent warning is logged so operators are
* aware logs were dropped.
*
* Default: 24 hours.
*/
const MAX_BACKLOG_DURATION_MS = 24 * 60 * 60_000;
// ---------------------------------------------------------------------------
// LogStreamingManager
// ---------------------------------------------------------------------------
/**
* Orchestrates periodic polling of the four audit-log tables and forwards new
* records to every enabled event-streaming destination.
*
* ### Design
* - **Interval-based**: a timer fires every `POLL_INTERVAL_MS`. On each tick
* every enabled destination is processed in sequence.
* - **Cursor-based**: the last successfully forwarded row `id` is persisted in
* the `eventStreamingCursors` table so state survives restarts.
* - **Catch-up**: if a full batch is returned the destination is immediately
* re-queried (up to `MAX_CATCHUP_BATCHES` times) before yielding.
* - **Smoothing**: `INTER_BATCH_DELAY_MS` is inserted between consecutive
* catch-up batches to avoid hammering the remote endpoint.
* - **Back-off**: consecutive send failures trigger exponential back-off
* (tracked in-memory per destination). Successful sends reset the counter.
* - **Backlog abandonment**: if a destination remains unreachable for longer
* than `MAX_BACKLOG_DURATION_MS`, all cursors for that destination are
* advanced to the current max id so the backlog is discarded and streaming
* resumes from the present moment on recovery.
*/
export class LogStreamingManager {
private pollTimer: ReturnType<typeof setTimeout> | null = null;
private isRunning = false;
private isPolling = false;
/** In-memory back-off state keyed by destinationId. */
private readonly failures = new Map<number, DestinationFailureState>();
// -------------------------------------------------------------------------
// Lifecycle
// -------------------------------------------------------------------------
start(): void {
if (this.isRunning) return;
this.isRunning = true;
logger.debug("LogStreamingManager: started");
this.schedulePoll(POLL_INTERVAL_MS);
}
// -------------------------------------------------------------------------
// Cursor initialisation (call this when a destination is first created)
// -------------------------------------------------------------------------
/**
* Eagerly seed cursors for every log type at the **current** max row id of
* each table, scoped to the destination's org.
*
* Call this immediately after inserting a new row into
* `eventStreamingDestinations` so the destination only receives events
* that were written *after* it was created. If a cursor row already exists
* (e.g. the method is called twice) it is left untouched.
*
* The manager also has a lazy fallback inside `getOrCreateCursor` for
* destinations that existed before this method was introduced.
*/
async initializeCursorsForDestination(
destinationId: number,
orgId: string
): Promise<void> {
for (const logType of LOG_TYPES) {
const currentMaxId = await this.getCurrentMaxId(logType, orgId);
try {
await db
.insert(eventStreamingCursors)
.values({
destinationId,
logType,
lastSentId: currentMaxId,
lastSentAt: null
})
.onConflictDoNothing();
} catch (err) {
logger.warn(
`LogStreamingManager: could not initialise cursor for ` +
`destination ${destinationId} logType="${logType}"`,
err
);
}
}
logger.debug(
`LogStreamingManager: cursors initialised for destination ${destinationId} ` +
`(org=${orgId})`
);
}
async shutdown(): Promise<void> {
this.isRunning = false;
if (this.pollTimer !== null) {
clearTimeout(this.pollTimer);
this.pollTimer = null;
}
// Wait for any in-progress poll to finish before returning so that
// callers (graceful-shutdown handlers) can safely exit afterward.
const deadline = Date.now() + 15_000;
while (this.isPolling && Date.now() < deadline) {
await sleep(100);
}
logger.info("LogStreamingManager: stopped");
}
// -------------------------------------------------------------------------
// Scheduling
// -------------------------------------------------------------------------
private schedulePoll(delayMs: number): void {
this.pollTimer = setTimeout(() => {
this.pollTimer = null;
this.runPoll()
.catch((err) =>
logger.error("LogStreamingManager: unexpected poll error", err)
)
.finally(() => {
if (this.isRunning) {
this.schedulePoll(POLL_INTERVAL_MS);
}
});
}, delayMs);
// Do not keep the event loop alive just for the poll timer the
// graceful-shutdown path calls shutdown() explicitly.
this.pollTimer.unref?.();
}
// -------------------------------------------------------------------------
// Poll cycle
// -------------------------------------------------------------------------
private async runPoll(): Promise<void> {
if (this.isPolling) return; // previous poll still running skip
this.isPolling = true;
try {
const destinations = await this.loadEnabledDestinations();
if (destinations.length === 0) return;
for (const dest of destinations) {
if (!this.isRunning) break;
await this.processDestination(dest).catch((err) => {
// Individual destination errors must never abort the whole cycle
logger.error(
`LogStreamingManager: unhandled error for destination ${dest.destinationId}`,
err
);
});
}
} finally {
this.isPolling = false;
}
}
// -------------------------------------------------------------------------
// Per-destination processing
// -------------------------------------------------------------------------
private async processDestination(
dest: EventStreamingDestination
): Promise<void> {
const failState = this.failures.get(dest.destinationId);
// Check whether this destination has been unreachable long enough that
// we should give up on the accumulated backlog.
if (failState) {
const failingForMs = Date.now() - failState.firstFailedAt;
if (failingForMs >= MAX_BACKLOG_DURATION_MS) {
await this.abandonBacklog(dest, failState);
this.failures.delete(dest.destinationId);
// Cursors now point to the current head retry on next poll.
return;
}
}
// Check regular exponential back-off window
if (failState && Date.now() < failState.nextRetryAt) {
logger.debug(
`LogStreamingManager: destination ${dest.destinationId} in back-off, skipping`
);
return;
}
// Decrypt and parse config skip destination if either step fails
let configFromDb: HttpConfig;
try {
const decryptedConfig = decrypt(dest.config, config.getRawConfig().server.secret!);
configFromDb = JSON.parse(decryptedConfig) as HttpConfig;
} catch (err) {
logger.error(
`LogStreamingManager: destination ${dest.destinationId} has invalid or undecryptable config`,
err
);
return;
}
const provider = this.createProvider(dest.type, configFromDb);
if (!provider) {
logger.warn(
`LogStreamingManager: unsupported destination type "${dest.type}" ` +
`for destination ${dest.destinationId} skipping`
);
return;
}
const enabledTypes: LogType[] = [];
if (dest.sendRequestLogs) enabledTypes.push("request");
if (dest.sendActionLogs) enabledTypes.push("action");
if (dest.sendAccessLogs) enabledTypes.push("access");
if (dest.sendConnectionLogs) enabledTypes.push("connection");
if (enabledTypes.length === 0) return;
let anyFailure = false;
for (const logType of enabledTypes) {
if (!this.isRunning) break;
try {
await this.processLogType(dest, provider, logType);
} catch (err) {
anyFailure = true;
logger.error(
`LogStreamingManager: failed to process "${logType}" logs ` +
`for destination ${dest.destinationId}`,
err
);
}
}
if (anyFailure) {
this.recordFailure(dest.destinationId);
} else {
// Any success resets the failure/back-off state
if (this.failures.has(dest.destinationId)) {
this.failures.delete(dest.destinationId);
logger.info(
`LogStreamingManager: destination ${dest.destinationId} recovered`
);
}
}
}
/**
* Advance every cursor for the destination to the current max row id,
* effectively discarding the accumulated backlog. Called when the
* destination has been unreachable for longer than MAX_BACKLOG_DURATION_MS.
*/
private async abandonBacklog(
dest: EventStreamingDestination,
failState: DestinationFailureState
): Promise<void> {
const failingForHours = (
(Date.now() - failState.firstFailedAt) /
3_600_000
).toFixed(1);
let totalDropped = 0;
for (const logType of LOG_TYPES) {
try {
const currentMaxId = await this.getCurrentMaxId(
logType,
dest.orgId
);
// Find out how many rows are being skipped for this type
const cursor = await db
.select({ lastSentId: eventStreamingCursors.lastSentId })
.from(eventStreamingCursors)
.where(
and(
eq(eventStreamingCursors.destinationId, dest.destinationId),
eq(eventStreamingCursors.logType, logType)
)
)
.limit(1);
const prevId = cursor[0]?.lastSentId ?? currentMaxId;
totalDropped += Math.max(0, currentMaxId - prevId);
await this.updateCursor(
dest.destinationId,
logType,
currentMaxId
);
} catch (err) {
logger.error(
`LogStreamingManager: failed to advance cursor for ` +
`destination ${dest.destinationId} logType="${logType}" ` +
`during backlog abandonment`,
err
);
}
}
logger.warn(
`LogStreamingManager: destination ${dest.destinationId} has been ` +
`unreachable for ${failingForHours}h ` +
`(${failState.consecutiveFailures} consecutive failures). ` +
`Discarding backlog of ~${totalDropped} log event(s) and ` +
`resuming from the current position. ` +
`Verify the destination URL and credentials.`
);
}
/**
* Forward all pending log records of a specific type for a destination.
*
* Fetches up to `BATCH_SIZE` records at a time. If the batch is full
* (indicating more records may exist) it loops immediately, inserting a
* short delay between consecutive requests to the remote endpoint.
* The loop is capped at `MAX_CATCHUP_BATCHES` to keep the poll cycle
* bounded.
*/
private async processLogType(
dest: EventStreamingDestination,
provider: LogDestinationProvider,
logType: LogType
): Promise<void> {
// Ensure a cursor row exists (creates one pointing at the current max
// id so we do not replay historical logs on first run)
const cursor = await this.getOrCreateCursor(
dest.destinationId,
logType,
dest.orgId
);
let lastSentId = cursor.lastSentId;
let batchCount = 0;
while (batchCount < MAX_CATCHUP_BATCHES) {
const rows = await this.fetchLogs(
logType,
dest.orgId,
lastSentId,
BATCH_SIZE
);
if (rows.length === 0) break;
const events = rows.map((row) =>
this.rowToLogEvent(logType, row)
);
// Throws on failure caught by the caller which applies back-off
await provider.send(events);
lastSentId = rows[rows.length - 1].id;
await this.updateCursor(dest.destinationId, logType, lastSentId);
batchCount++;
if (rows.length < BATCH_SIZE) {
// Partial batch means we have caught up
break;
}
// Full batch there are likely more records; pause briefly before
// fetching the next batch to smooth out the HTTP request rate
if (batchCount < MAX_CATCHUP_BATCHES) {
await sleep(INTER_BATCH_DELAY_MS);
}
}
}
// -------------------------------------------------------------------------
// Cursor management
// -------------------------------------------------------------------------
private async getOrCreateCursor(
destinationId: number,
logType: LogType,
orgId: string
): Promise<{ lastSentId: number }> {
// Try to read an existing cursor
const existing = await db
.select({
lastSentId: eventStreamingCursors.lastSentId
})
.from(eventStreamingCursors)
.where(
and(
eq(eventStreamingCursors.destinationId, destinationId),
eq(eventStreamingCursors.logType, logType)
)
)
.limit(1);
if (existing.length > 0) {
return { lastSentId: existing[0].lastSentId };
}
// No cursor yet this destination pre-dates the eager initialisation
// path (initializeCursorsForDestination). Seed at the current max id
// so we do not replay historical logs.
const initialId = await this.getCurrentMaxId(logType, orgId);
// Use onConflictDoNothing in case of a rare race between two poll
// cycles both hitting this branch simultaneously.
await db
.insert(eventStreamingCursors)
.values({
destinationId,
logType,
lastSentId: initialId,
lastSentAt: null
})
.onConflictDoNothing();
logger.debug(
`LogStreamingManager: lazily initialised cursor for destination ${destinationId} ` +
`logType="${logType}" at id=${initialId} ` +
`(prefer initializeCursorsForDestination at creation time)`
);
return { lastSentId: initialId };
}
private async updateCursor(
destinationId: number,
logType: LogType,
lastSentId: number
): Promise<void> {
await db
.update(eventStreamingCursors)
.set({
lastSentId,
lastSentAt: Date.now()
})
.where(
and(
eq(eventStreamingCursors.destinationId, destinationId),
eq(eventStreamingCursors.logType, logType)
)
);
}
/**
* Returns the current maximum `id` in the given log table for the org.
* Returns 0 when the table is empty.
*/
private async getCurrentMaxId(
logType: LogType,
orgId: string
): Promise<number> {
try {
switch (logType) {
case "request": {
const [row] = await logsDb
.select({ maxId: max(requestAuditLog.id) })
.from(requestAuditLog)
.where(eq(requestAuditLog.orgId, orgId));
return row?.maxId ?? 0;
}
case "action": {
const [row] = await logsDb
.select({ maxId: max(actionAuditLog.id) })
.from(actionAuditLog)
.where(eq(actionAuditLog.orgId, orgId));
return row?.maxId ?? 0;
}
case "access": {
const [row] = await logsDb
.select({ maxId: max(accessAuditLog.id) })
.from(accessAuditLog)
.where(eq(accessAuditLog.orgId, orgId));
return row?.maxId ?? 0;
}
case "connection": {
const [row] = await logsDb
.select({ maxId: max(connectionAuditLog.id) })
.from(connectionAuditLog)
.where(eq(connectionAuditLog.orgId, orgId));
return row?.maxId ?? 0;
}
}
} catch (err) {
logger.warn(
`LogStreamingManager: could not determine current max id for ` +
`logType="${logType}", defaulting to 0`,
err
);
return 0;
}
}
// -------------------------------------------------------------------------
// Log fetching
// -------------------------------------------------------------------------
/**
* Fetch up to `limit` log rows with `id > afterId`, ordered by id ASC,
* filtered to the given organisation.
*/
private async fetchLogs(
logType: LogType,
orgId: string,
afterId: number,
limit: number
): Promise<Array<Record<string, unknown> & { id: number }>> {
switch (logType) {
case "request":
return (await logsDb
.select()
.from(requestAuditLog)
.where(
and(
eq(requestAuditLog.orgId, orgId),
gt(requestAuditLog.id, afterId)
)
)
.orderBy(requestAuditLog.id)
.limit(limit)) as Array<
Record<string, unknown> & { id: number }
>;
case "action":
return (await logsDb
.select()
.from(actionAuditLog)
.where(
and(
eq(actionAuditLog.orgId, orgId),
gt(actionAuditLog.id, afterId)
)
)
.orderBy(actionAuditLog.id)
.limit(limit)) as Array<
Record<string, unknown> & { id: number }
>;
case "access":
return (await logsDb
.select()
.from(accessAuditLog)
.where(
and(
eq(accessAuditLog.orgId, orgId),
gt(accessAuditLog.id, afterId)
)
)
.orderBy(accessAuditLog.id)
.limit(limit)) as Array<
Record<string, unknown> & { id: number }
>;
case "connection":
return (await logsDb
.select()
.from(connectionAuditLog)
.where(
and(
eq(connectionAuditLog.orgId, orgId),
gt(connectionAuditLog.id, afterId)
)
)
.orderBy(connectionAuditLog.id)
.limit(limit)) as Array<
Record<string, unknown> & { id: number }
>;
}
}
// -------------------------------------------------------------------------
// Row → LogEvent conversion
// -------------------------------------------------------------------------
private rowToLogEvent(
logType: LogType,
row: Record<string, unknown> & { id: number }
): LogEvent {
// Determine the epoch-seconds timestamp for this row type
let timestamp: number;
switch (logType) {
case "request":
case "action":
case "access":
timestamp =
typeof row.timestamp === "number" ? row.timestamp : 0;
break;
case "connection":
timestamp =
typeof row.startedAt === "number" ? row.startedAt : 0;
break;
}
const orgId =
typeof row.orgId === "string" ? row.orgId : "";
return {
id: row.id,
logType,
orgId,
timestamp,
data: row as Record<string, unknown>
};
}
// -------------------------------------------------------------------------
// Provider factory
// -------------------------------------------------------------------------
/**
* Instantiate the correct LogDestinationProvider for the given destination
* type string. Returns `null` for unknown types.
*
* To add a new provider:
* 1. Implement `LogDestinationProvider` in a new file under `providers/`
* 2. Add a `case` here
*/
private createProvider(
type: string,
config: unknown
): LogDestinationProvider | null {
switch (type) {
case "http":
return new HttpLogDestination(config as HttpConfig);
// Future providers:
// case "datadog": return new DatadogLogDestination(config as DatadogConfig);
default:
return null;
}
}
// -------------------------------------------------------------------------
// Back-off tracking
// -------------------------------------------------------------------------
private recordFailure(destinationId: number): void {
const current = this.failures.get(destinationId) ?? {
consecutiveFailures: 0,
nextRetryAt: 0,
// Stamp the very first failure so we can measure total outage duration
firstFailedAt: Date.now()
};
current.consecutiveFailures += 1;
const scheduleIdx = Math.min(
current.consecutiveFailures - 1,
BACKOFF_SCHEDULE_MS.length - 1
);
const backoffMs = BACKOFF_SCHEDULE_MS[scheduleIdx];
current.nextRetryAt = Date.now() + backoffMs;
this.failures.set(destinationId, current);
logger.warn(
`LogStreamingManager: destination ${destinationId} failed ` +
`(consecutive #${current.consecutiveFailures}), ` +
`backing off for ${backoffMs / 1000}s`
);
}
// -------------------------------------------------------------------------
// DB helpers
// -------------------------------------------------------------------------
private async loadEnabledDestinations(): Promise<
EventStreamingDestination[]
> {
try {
return await db
.select()
.from(eventStreamingDestinations)
.where(eq(eventStreamingDestinations.enabled, true));
} catch (err) {
logger.error(
"LogStreamingManager: failed to load destinations",
err
);
return [];
}
}
}
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}

View File

@@ -0,0 +1,34 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025-2026 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.
* You may not use this file except in compliance with the License.
* Unauthorized use, copying, modification, or distribution is strictly prohibited.
*
* This file is not licensed under the AGPLv3.
*/
import { build } from "@server/build";
import { LogStreamingManager } from "./LogStreamingManager";
/**
* Module-level singleton. Importing this module is sufficient to start the
* streaming manager no explicit init call required by the caller.
*
* The manager registers a non-blocking timer (unref'd) so it will not keep
* the Node.js event loop alive on its own. Call `logStreamingManager.shutdown()`
* during graceful shutdown to drain any in-progress poll and release resources.
*/
export const logStreamingManager = new LogStreamingManager();
if (build != "saas") { // this is handled separately in the saas build, so we don't want to start it here
logStreamingManager.start();
}
export { LogStreamingManager } from "./LogStreamingManager";
export type { LogDestinationProvider } from "./providers/LogDestinationProvider";
export { HttpLogDestination } from "./providers/HttpLogDestination";
export * from "./types";

View File

@@ -0,0 +1,322 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025-2026 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.
* You may not use this file except in compliance with the License.
* Unauthorized use, copying, modification, or distribution is strictly prohibited.
*
* This file is not licensed under the AGPLv3.
*/
import logger from "@server/logger";
import { LogEvent, HttpConfig, PayloadFormat } from "../types";
import { LogDestinationProvider } from "./LogDestinationProvider";
// ---------------------------------------------------------------------------
// Constants
// ---------------------------------------------------------------------------
/** Maximum time (ms) to wait for a single HTTP response. */
const REQUEST_TIMEOUT_MS = 30_000;
/** Default payload format when none is specified in the config. */
const DEFAULT_FORMAT: PayloadFormat = "json_array";
// ---------------------------------------------------------------------------
// HttpLogDestination
// ---------------------------------------------------------------------------
/**
* Forwards a batch of log events to an arbitrary HTTP endpoint via a single
* POST request per batch.
*
* **Payload format**
*
* **Payload formats** (controlled by `config.format`):
*
* - `json_array` (default) — one POST per batch, body is a JSON array:
* ```json
* [
* { "event": "request", "timestamp": "2024-01-01T00:00:00.000Z", "data": { … } },
* …
* ]
* ```
* `Content-Type: application/json`
*
* - `ndjson` — one POST per batch, body is newline-delimited JSON (one object
* per line, no outer array). Required by Splunk HEC, Elastic/OpenSearch,
* and Grafana Loki:
* ```
* {"event":"request","timestamp":"…","data":{…}}
* {"event":"action","timestamp":"…","data":{…}}
* ```
* `Content-Type: application/x-ndjson`
*
* - `json_single` — one POST **per event**, body is a plain JSON object.
* Use only for endpoints that cannot handle batches at all.
*
* With a body template each event is rendered through the template before
* serialisation. Template placeholders:
* - `{{event}}` → the LogType string ("request", "action", etc.)
* - `{{timestamp}}` → ISO-8601 UTC datetime string
* - `{{data}}` → raw inline JSON object (**no surrounding quotes**)
*
* Example template:
* ```
* { "event": "{{event}}", "ts": "{{timestamp}}", "payload": {{data}} }
* ```
*/
export class HttpLogDestination implements LogDestinationProvider {
readonly type = "http";
private readonly config: HttpConfig;
constructor(config: HttpConfig) {
this.config = config;
}
// -----------------------------------------------------------------------
// LogDestinationProvider implementation
// -----------------------------------------------------------------------
async send(events: LogEvent[]): Promise<void> {
if (events.length === 0) return;
const format = this.config.format ?? DEFAULT_FORMAT;
if (format === "json_single") {
// One HTTP POST per event send sequentially so a failure on one
// event throws and lets the manager retry the whole batch from the
// same cursor position.
for (const event of events) {
await this.postRequest(
this.buildSingleBody(event),
"application/json"
);
}
return;
}
if (format === "ndjson") {
const body = this.buildNdjsonBody(events);
await this.postRequest(body, "application/x-ndjson");
return;
}
// json_array (default)
const body = JSON.stringify(this.buildArrayPayload(events));
await this.postRequest(body, "application/json");
}
// -----------------------------------------------------------------------
// Internal HTTP sender
// -----------------------------------------------------------------------
private async postRequest(
body: string,
contentType: string
): Promise<void> {
const headers = this.buildHeaders(contentType);
const controller = new AbortController();
const timeoutHandle = setTimeout(
() => controller.abort(),
REQUEST_TIMEOUT_MS
);
let response: Response;
try {
response = await fetch(this.config.url, {
method: "POST",
headers,
body,
signal: controller.signal
});
} catch (err: unknown) {
const isAbort =
err instanceof Error && err.name === "AbortError";
if (isAbort) {
throw new Error(
`HttpLogDestination: request to "${this.config.url}" timed out after ${REQUEST_TIMEOUT_MS} ms`
);
}
const msg = err instanceof Error ? err.message : String(err);
throw new Error(
`HttpLogDestination: request to "${this.config.url}" failed ${msg}`
);
} finally {
clearTimeout(timeoutHandle);
}
if (!response.ok) {
// Try to include a snippet of the response body in the error so
// operators can diagnose auth or schema rejections.
let responseSnippet = "";
try {
const text = await response.text();
responseSnippet = text.slice(0, 300);
} catch {
// ignore best effort
}
throw new Error(
`HttpLogDestination: server at "${this.config.url}" returned ` +
`HTTP ${response.status} ${response.statusText}` +
(responseSnippet ? ` ${responseSnippet}` : "")
);
}
}
// -----------------------------------------------------------------------
// Header construction
// -----------------------------------------------------------------------
private buildHeaders(contentType: string): Record<string, string> {
const headers: Record<string, string> = {
"Content-Type": contentType
};
// Authentication
switch (this.config.authType) {
case "bearer": {
const token = this.config.bearerToken?.trim();
if (token) {
headers["Authorization"] = `Bearer ${token}`;
}
break;
}
case "basic": {
const creds = this.config.basicCredentials?.trim();
if (creds) {
const encoded = Buffer.from(creds).toString("base64");
headers["Authorization"] = `Basic ${encoded}`;
}
break;
}
case "custom": {
const name = this.config.customHeaderName?.trim();
const value = this.config.customHeaderValue ?? "";
if (name) {
headers[name] = value;
}
break;
}
case "none":
default:
// No Authorization header
break;
}
// Additional static headers (user-defined; may override Content-Type
// if the operator explicitly sets it, which is intentional).
for (const { key, value } of this.config.headers ?? []) {
const trimmedKey = key?.trim();
if (trimmedKey) {
headers[trimmedKey] = value ?? "";
}
}
return headers;
}
// -----------------------------------------------------------------------
// Payload construction
// -----------------------------------------------------------------------
/** Single default event object (no surrounding array). */
private buildEventObject(event: LogEvent): unknown {
if (this.config.useBodyTemplate && this.config.bodyTemplate?.trim()) {
return this.renderTemplate(this.config.bodyTemplate!, event);
}
return {
event: event.logType,
timestamp: epochSecondsToIso(event.timestamp),
data: event.data
};
}
/** JSON array payload used for `json_array` format. */
private buildArrayPayload(events: LogEvent[]): unknown[] {
return events.map((e) => this.buildEventObject(e));
}
/**
* NDJSON payload one JSON object per line, no outer array.
* Each line must be a complete, valid JSON object.
*/
private buildNdjsonBody(events: LogEvent[]): string {
return events
.map((e) => JSON.stringify(this.buildEventObject(e)))
.join("\n");
}
/** Single-event body used for `json_single` format. */
private buildSingleBody(event: LogEvent): string {
return JSON.stringify(this.buildEventObject(event));
}
/**
* Render a single event through the body template.
*
* The three placeholder tokens are replaced in a specific order to avoid
* accidental double-replacement:
*
* 1. `{{data}}` → raw JSON (may contain `{{` characters in values)
* 2. `{{event}}` → safe string
* 3. `{{timestamp}}` → safe ISO string
*
* If the rendered string is not valid JSON we fall back to returning it as
* a plain string so the batch still makes it out and the operator can
* inspect the template.
*/
private renderTemplate(template: string, event: LogEvent): unknown {
const isoTimestamp = epochSecondsToIso(event.timestamp);
const dataJson = JSON.stringify(event.data);
// Replace {{data}} first because its JSON value might legitimately
// contain the substrings "{{event}}" or "{{timestamp}}" inside string
// fields those should NOT be re-expanded.
const rendered = template
.replace(/\{\{data\}\}/g, dataJson)
.replace(/\{\{event\}\}/g, escapeJsonString(event.logType))
.replace(
/\{\{timestamp\}\}/g,
escapeJsonString(isoTimestamp)
);
try {
return JSON.parse(rendered);
} catch {
logger.warn(
`HttpLogDestination: body template produced invalid JSON for ` +
`event type "${event.logType}" destined for "${this.config.url}". ` +
`Sending rendered template as a raw string. ` +
`Check your template syntax specifically that {{data}} is ` +
`NOT wrapped in quotes.`
);
return rendered;
}
}
}
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
function epochSecondsToIso(epochSeconds: number): string {
return new Date(epochSeconds * 1000).toISOString();
}
/**
* Escape a string value so it can be safely substituted into the interior of
* a JSON string literal (i.e. between existing `"` quotes in the template).
* This prevents a crafted logType or timestamp from breaking out of its
* string context in the rendered template.
*/
function escapeJsonString(value: string): string {
// JSON.stringify produces `"<escaped>"` strip the outer quotes.
return JSON.stringify(value).slice(1, -1);
}

View File

@@ -0,0 +1,44 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025-2026 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.
* You may not use this file except in compliance with the License.
* Unauthorized use, copying, modification, or distribution is strictly prohibited.
*
* This file is not licensed under the AGPLv3.
*/
import { LogEvent } from "../types";
/**
* Common interface that every log-forwarding backend must implement.
*
* Adding a new destination type (e.g. Datadog, Splunk, Kafka) is as simple as
* creating a class that satisfies this interface and registering it inside
* LogStreamingManager.createProvider().
*/
export interface LogDestinationProvider {
/**
* The string identifier that matches the `type` column in the
* `eventStreamingDestinations` table (e.g. "http", "datadog").
*/
readonly type: string;
/**
* Forward a batch of log events to the destination.
*
* Implementations should:
* - Treat the call as atomic: either all events are accepted or an error
* is thrown so the caller can retry / back off.
* - Respect the timeout contract expected by the manager (default 30 s).
* - NOT swallow errors the manager relies on thrown exceptions to track
* failure state and apply exponential back-off.
*
* @param events A non-empty array of normalised log events to forward.
* @throws Any network, authentication, or serialisation error.
*/
send(events: LogEvent[]): Promise<void>;
}

View File

@@ -0,0 +1,134 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025-2026 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.
* You may not use this file except in compliance with the License.
* Unauthorized use, copying, modification, or distribution is strictly prohibited.
*
* This file is not licensed under the AGPLv3.
*/
// ---------------------------------------------------------------------------
// Log type identifiers
// ---------------------------------------------------------------------------
export type LogType = "request" | "action" | "access" | "connection";
export const LOG_TYPES: LogType[] = [
"request",
"action",
"access",
"connection"
];
// ---------------------------------------------------------------------------
// A normalised event ready to be forwarded to a destination
// ---------------------------------------------------------------------------
export interface LogEvent {
/** The auto-increment primary key from the source table */
id: number;
/** Which log table this event came from */
logType: LogType;
/** The organisation that owns this event */
orgId: string;
/** Unix epoch seconds taken from the record's own timestamp field */
timestamp: number;
/** Full row data from the source table, serialised as a plain object */
data: Record<string, unknown>;
}
// ---------------------------------------------------------------------------
// A batch of events destined for a single streaming target
// ---------------------------------------------------------------------------
export interface LogBatch {
destinationId: number;
logType: LogType;
events: LogEvent[];
}
// ---------------------------------------------------------------------------
// HTTP destination configuration (mirrors HttpConfig in the UI component)
// ---------------------------------------------------------------------------
export type AuthType = "none" | "bearer" | "basic" | "custom";
/**
* Controls how the batch of events is serialised into the HTTP request body.
*
* - `json_array` `[{…}, {…}]` — default; one POST per batch wrapped in a
* JSON array. Works with most generic webhooks and Datadog.
* - `ndjson` `{…}\n{…}` — newline-delimited JSON, one object per
* line. Required by Splunk HEC, Elastic/OpenSearch, Loki.
* - `json_single` one HTTP POST per event, body is a plain JSON object.
* Use only for endpoints that cannot handle batches at all.
*/
export type PayloadFormat = "json_array" | "ndjson" | "json_single";
export interface HttpConfig {
/** Human-readable label for the destination */
name: string;
/** Target URL that will receive POST requests */
url: string;
/** Authentication strategy to use */
authType: AuthType;
/** Used when authType === "bearer" */
bearerToken?: string;
/** Used when authType === "basic" must be "username:password" */
basicCredentials?: string;
/** Used when authType === "custom" header name */
customHeaderName?: string;
/** Used when authType === "custom" header value */
customHeaderValue?: string;
/** Additional static headers appended to every request */
headers: Array<{ key: string; value: string }>;
/** Whether to render a custom body template instead of the default shape */
/**
* How events are serialised into the request body.
* Defaults to `"json_array"` when absent.
*/
format?: PayloadFormat;
useBodyTemplate: boolean;
/**
* Handlebars-style template for the JSON body of each event.
*
* Supported placeholders:
* {{event}} the LogType string ("request", "action", etc.)
* {{timestamp}} ISO-8601 UTC string derived from the event's timestamp
* {{data}} raw JSON object (no surrounding quotes) of the full row
*
* Example:
* { "event": "{{event}}", "ts": "{{timestamp}}", "payload": {{data}} }
*/
bodyTemplate?: string;
}
// ---------------------------------------------------------------------------
// Per-destination per-log-type cursor (reflects the DB table)
// ---------------------------------------------------------------------------
export interface StreamingCursor {
destinationId: number;
logType: LogType;
/** The `id` of the last row that was successfully forwarded */
lastSentId: number;
/** Epoch milliseconds of the last successful send (or null if never sent) */
lastSentAt: number | null;
}
// ---------------------------------------------------------------------------
// In-memory failure / back-off state tracked per destination
// ---------------------------------------------------------------------------
export interface DestinationFailureState {
/** How many consecutive send failures have occurred */
consecutiveFailures: number;
/** Date.now() value after which the destination may be retried */
nextRetryAt: number;
/** Date.now() value of the very first failure in the current streak */
firstFailedAt: number;
}

View File

@@ -1,7 +1,7 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025 Fossorial, Inc.
* Copyright (c) 2025-2026 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.

View File

@@ -1,7 +1,7 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025 Fossorial, Inc.
* Copyright (c) 2025-2026 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.

View File

@@ -1,7 +1,7 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025 Fossorial, Inc.
* Copyright (c) 2025-2026 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.

View File

@@ -1,7 +1,7 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025 Fossorial, Inc.
* Copyright (c) 2025-2026 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.

View File

@@ -1,7 +1,7 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025 Fossorial, Inc.
* Copyright (c) 2025-2026 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.

View File

@@ -1,7 +1,7 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025 Fossorial, Inc.
* Copyright (c) 2025-2026 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.

View File

@@ -1,7 +1,7 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025 Fossorial, Inc.
* Copyright (c) 2025-2026 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.

View File

@@ -1,7 +1,7 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025 Fossorial, Inc.
* Copyright (c) 2025-2026 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.

View File

@@ -1,7 +1,7 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025 Fossorial, Inc.
* Copyright (c) 2025-2026 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.
@@ -671,10 +671,7 @@ export async function getTraefikConfig(
// TODO: HOW TO HANDLE ^^^^^^ BETTER
const anySitesOnline = targets.some(
(target) =>
target.site.online ||
target.site.type === "local" ||
target.site.type === "wireguard"
(target) => target.site.online
);
return (
@@ -802,10 +799,7 @@ export async function getTraefikConfig(
servers: (() => {
// Check if any sites are online
const anySitesOnline = targets.some(
(target) =>
target.site.online ||
target.site.type === "local" ||
target.site.type === "wireguard"
(target) => target.site.online
);
return targets

View File

@@ -1,7 +1,7 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025 Fossorial, Inc.
* Copyright (c) 2025-2026 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.

View File

@@ -1,7 +1,7 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025 Fossorial, Inc.
* Copyright (c) 2025-2026 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.

View File

@@ -1,7 +1,7 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025 Fossorial, Inc.
* Copyright (c) 2025-2026 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.

View File

@@ -1,7 +1,7 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025 Fossorial, Inc.
* Copyright (c) 2025-2026 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.

View File

@@ -1,7 +1,7 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025 Fossorial, Inc.
* Copyright (c) 2025-2026 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.

View File

@@ -1,7 +1,7 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025 Fossorial, Inc.
* Copyright (c) 2025-2026 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.

View File

@@ -1,7 +1,7 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025 Fossorial, Inc.
* Copyright (c) 2025-2026 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.

View File

@@ -1,7 +1,7 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025 Fossorial, Inc.
* Copyright (c) 2025-2026 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.

View File

@@ -1,7 +1,7 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025 Fossorial, Inc.
* Copyright (c) 2025-2026 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.

View File

@@ -1,7 +1,7 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025 Fossorial, Inc.
* Copyright (c) 2025-2026 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.

View File

@@ -1,7 +1,7 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025 Fossorial, Inc.
* Copyright (c) 2025-2026 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.

View File

@@ -1,7 +1,7 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025 Fossorial, Inc.
* Copyright (c) 2025-2026 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.

View File

@@ -1,7 +1,7 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025 Fossorial, Inc.
* Copyright (c) 2025-2026 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.

View File

@@ -1,7 +1,7 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025 Fossorial, Inc.
* Copyright (c) 2025-2026 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.

View File

@@ -1,7 +1,7 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025 Fossorial, Inc.
* Copyright (c) 2025-2026 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.

View File

@@ -1,7 +1,7 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025 Fossorial, Inc.
* Copyright (c) 2025-2026 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.

View File

@@ -1,7 +1,7 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025 Fossorial, Inc.
* Copyright (c) 2025-2026 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.

View File

@@ -1,7 +1,7 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025 Fossorial, Inc.
* Copyright (c) 2025-2026 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.

View File

@@ -1,7 +1,7 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025 Fossorial, Inc.
* Copyright (c) 2025-2026 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.

View File

@@ -1,7 +1,7 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025 Fossorial, Inc.
* Copyright (c) 2025-2026 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.

View File

@@ -1,7 +1,7 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025 Fossorial, Inc.
* Copyright (c) 2025-2026 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.

View File

@@ -1,7 +1,7 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025 Fossorial, Inc.
* Copyright (c) 2025-2026 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.

View File

@@ -1,7 +1,7 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025 Fossorial, Inc.
* Copyright (c) 2025-2026 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.

View File

@@ -1,7 +1,7 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025 Fossorial, Inc.
* Copyright (c) 2025-2026 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.

View File

@@ -1,7 +1,7 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025 Fossorial, Inc.
* Copyright (c) 2025-2026 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.

View File

@@ -1,7 +1,7 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025 Fossorial, Inc.
* Copyright (c) 2025-2026 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.

View File

@@ -1,7 +1,7 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025 Fossorial, Inc.
* Copyright (c) 2025-2026 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.

View File

@@ -1,7 +1,7 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025 Fossorial, Inc.
* Copyright (c) 2025-2026 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.

View File

@@ -1,7 +1,7 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025 Fossorial, Inc.
* Copyright (c) 2025-2026 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.

View File

@@ -1,7 +1,7 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025 Fossorial, Inc.
* Copyright (c) 2025-2026 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.

View File

@@ -1,7 +1,7 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025 Fossorial, Inc.
* Copyright (c) 2025-2026 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.

View File

@@ -1,7 +1,7 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025 Fossorial, Inc.
* Copyright (c) 2025-2026 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.

View File

@@ -1,7 +1,7 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025 Fossorial, Inc.
* Copyright (c) 2025-2026 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.

View File

@@ -1,7 +1,7 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025 Fossorial, Inc.
* Copyright (c) 2025-2026 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.

View File

@@ -1,7 +1,7 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025 Fossorial, Inc.
* Copyright (c) 2025-2026 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.

View File

@@ -1,7 +1,7 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025 Fossorial, Inc.
* Copyright (c) 2025-2026 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.

View File

@@ -1,7 +1,7 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025 Fossorial, Inc.
* Copyright (c) 2025-2026 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.
@@ -217,7 +217,7 @@ export async function handleSubscriptionCreated(
subscriptionPriceId === priceSet[LicenseId.BIG_LICENSE]
) {
numUsers = 50;
numSites = 50;
numSites = 100;
} else {
logger.error(
`Unknown price ID ${subscriptionPriceId} for subscription ${subscription.id}`

View File

@@ -1,7 +1,7 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025 Fossorial, Inc.
* Copyright (c) 2025-2026 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.

View File

@@ -1,7 +1,7 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025 Fossorial, Inc.
* Copyright (c) 2025-2026 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.

View File

@@ -1,7 +1,7 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025 Fossorial, Inc.
* Copyright (c) 2025-2026 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.

View File

@@ -1,7 +1,7 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025 Fossorial, Inc.
* Copyright (c) 2025-2026 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.

View File

@@ -1,7 +1,7 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025 Fossorial, Inc.
* Copyright (c) 2025-2026 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.

View File

@@ -1,7 +1,7 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025 Fossorial, Inc.
* Copyright (c) 2025-2026 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.

View File

@@ -1,7 +1,7 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025 Fossorial, Inc.
* Copyright (c) 2025-2026 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.

View File

@@ -1,7 +1,7 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025 Fossorial, Inc.
* Copyright (c) 2025-2026 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.

View File

@@ -1,7 +1,7 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025 Fossorial, Inc.
* Copyright (c) 2025-2026 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.

View File

@@ -1,7 +1,7 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025 Fossorial, Inc.
* Copyright (c) 2025-2026 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.

View File

@@ -1,7 +1,7 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025 Fossorial, Inc.
* Copyright (c) 2025-2026 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.
@@ -22,11 +22,15 @@ import { OpenAPITags, registry } from "@server/openApi";
import { db, domainNamespaces, resources } from "@server/db";
import { inArray } from "drizzle-orm";
import { CheckDomainAvailabilityResponse } from "@server/routers/domain/types";
import { build } from "@server/build";
import { isSubscribed } from "#private/lib/isSubscribed";
import { tierMatrix } from "@server/lib/billing/tierMatrix";
const paramsSchema = z.strictObject({});
const querySchema = z.strictObject({
subdomain: z.string()
subdomain: z.string(),
// orgId: build === "saas" ? z.string() : z.string().optional() // Required for saas, optional otherwise
});
registry.registerPath({
@@ -58,6 +62,23 @@ export async function checkDomainNamespaceAvailability(
}
const { subdomain } = parsedQuery.data;
// if (
// build == "saas" &&
// !isSubscribed(orgId!, tierMatrix.domainNamespaces)
// ) {
// // return not available
// return response<CheckDomainAvailabilityResponse>(res, {
// data: {
// available: false,
// options: []
// },
// success: true,
// error: false,
// message: "Your current subscription does not support custom domain namespaces. Please upgrade to access this feature.",
// status: HttpCode.OK
// });
// }
const namespaces = await db.select().from(domainNamespaces);
let possibleDomains = namespaces.map((ns) => {
const desired = `${subdomain}.${ns.domainNamespaceId}`;

View File

@@ -1,7 +1,7 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025 Fossorial, Inc.
* Copyright (c) 2025-2026 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.

View File

@@ -1,7 +1,7 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025 Fossorial, Inc.
* Copyright (c) 2025-2026 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.
@@ -22,6 +22,9 @@ import { eq, sql } from "drizzle-orm";
import logger from "@server/logger";
import { fromError } from "zod-validation-error";
import { OpenAPITags, registry } from "@server/openApi";
import { isSubscribed } from "#private/lib/isSubscribed";
import { build } from "@server/build";
import { tierMatrix } from "@server/lib/billing/tierMatrix";
const paramsSchema = z.strictObject({});
@@ -37,7 +40,8 @@ const querySchema = z.strictObject({
.optional()
.default("0")
.transform(Number)
.pipe(z.int().nonnegative())
.pipe(z.int().nonnegative()),
// orgId: build === "saas" ? z.string() : z.string().optional() // Required for saas, optional otherwise
});
async function query(limit: number, offset: number) {
@@ -99,6 +103,26 @@ export async function listDomainNamespaces(
);
}
// if (
// build == "saas" &&
// !isSubscribed(orgId!, tierMatrix.domainNamespaces)
// ) {
// return response<ListDomainNamespacesResponse>(res, {
// data: {
// domainNamespaces: [],
// pagination: {
// total: 0,
// limit,
// offset
// }
// },
// success: true,
// error: false,
// message: "No namespaces found. Your current subscription does not support custom domain namespaces. Please upgrade to access this feature.",
// status: HttpCode.OK
// });
// }
const domainNamespacesList = await query(limit, offset);
const [{ count }] = await db

View File

@@ -0,0 +1,143 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025-2026 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.
* You may not use this file except in compliance with the License.
* Unauthorized use, copying, modification, or distribution is strictly prohibited.
*
* This file is not licensed under the AGPLv3.
*/
import { Request, Response, NextFunction } from "express";
import { z } from "zod";
import { db } from "@server/db";
import { eventStreamingDestinations } from "@server/db";
import { logStreamingManager } from "#private/lib/logStreaming";
import response from "@server/lib/response";
import HttpCode from "@server/types/HttpCode";
import createHttpError from "http-errors";
import logger from "@server/logger";
import { fromError } from "zod-validation-error";
import { OpenAPITags, registry } from "@server/openApi";
import { encrypt } from "@server/lib/crypto";
import config from "@server/lib/config";
const paramsSchema = z.strictObject({
orgId: z.string().nonempty()
});
const bodySchema = z.strictObject({
type: z.string().nonempty(),
config: z.string().nonempty(),
enabled: z.boolean().optional().default(true),
sendConnectionLogs: z.boolean().optional().default(false),
sendRequestLogs: z.boolean().optional().default(false),
sendActionLogs: z.boolean().optional().default(false),
sendAccessLogs: z.boolean().optional().default(false)
});
export type CreateEventStreamingDestinationResponse = {
destinationId: number;
};
registry.registerPath({
method: "put",
path: "/org/{orgId}/event-streaming-destination",
description: "Create an event streaming destination for a specific organization.",
tags: [OpenAPITags.Org],
request: {
params: paramsSchema,
body: {
content: {
"application/json": {
schema: bodySchema
}
}
}
},
responses: {}
});
export async function createEventStreamingDestination(
req: Request,
res: Response,
next: NextFunction
): Promise<any> {
try {
const parsedParams = paramsSchema.safeParse(req.params);
if (!parsedParams.success) {
return next(
createHttpError(
HttpCode.BAD_REQUEST,
fromError(parsedParams.error).toString()
)
);
}
const { orgId } = parsedParams.data;
const parsedBody = bodySchema.safeParse(req.body);
if (!parsedBody.success) {
return next(
createHttpError(
HttpCode.BAD_REQUEST,
fromError(parsedBody.error).toString()
)
);
}
const { type, config: configToSet, enabled } = parsedBody.data;
const key = config.getRawConfig().server.secret!;
const encryptedConfig = encrypt(configToSet, key);
const now = Date.now();
const [destination] = await db
.insert(eventStreamingDestinations)
.values({
orgId,
type,
config: encryptedConfig,
enabled,
createdAt: now,
updatedAt: now,
sendAccessLogs: parsedBody.data.sendAccessLogs,
sendActionLogs: parsedBody.data.sendActionLogs,
sendConnectionLogs: parsedBody.data.sendConnectionLogs,
sendRequestLogs: parsedBody.data.sendRequestLogs
})
.returning();
// Seed cursors at the current max row id for every log type so this
// destination only receives events written *after* it was created.
// Fire-and-forget: a failure here is non-fatal; the manager has a lazy
// fallback that will seed at the next poll if these rows are missing.
logStreamingManager
.initializeCursorsForDestination(destination.destinationId, orgId)
.catch((err) =>
logger.error(
"createEventStreamingDestination: failed to initialise streaming cursors",
err
)
);
return response<CreateEventStreamingDestinationResponse>(res, {
data: {
destinationId: destination.destinationId
},
success: true,
error: false,
message: "Event streaming destination created successfully",
status: HttpCode.CREATED
});
} catch (error) {
logger.error(error);
return next(
createHttpError(HttpCode.INTERNAL_SERVER_ERROR, "An error occurred")
);
}
}

View File

@@ -0,0 +1,103 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025-2026 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.
* You may not use this file except in compliance with the License.
* Unauthorized use, copying, modification, or distribution is strictly prohibited.
*
* This file is not licensed under the AGPLv3.
*/
import { Request, Response, NextFunction } from "express";
import { z } from "zod";
import { db } from "@server/db";
import { eventStreamingDestinations } from "@server/db";
import response from "@server/lib/response";
import HttpCode from "@server/types/HttpCode";
import createHttpError from "http-errors";
import logger from "@server/logger";
import { fromError } from "zod-validation-error";
import { OpenAPITags, registry } from "@server/openApi";
import { and, eq } from "drizzle-orm";
const paramsSchema = z
.object({
orgId: z.string().nonempty(),
destinationId: z.coerce.number<number>()
})
.strict();
registry.registerPath({
method: "delete",
path: "/org/{orgId}/event-streaming-destination/{destinationId}",
description: "Delete an event streaming destination for a specific organization.",
tags: [OpenAPITags.Org],
request: {
params: paramsSchema
},
responses: {}
});
export async function deleteEventStreamingDestination(
req: Request,
res: Response,
next: NextFunction
): Promise<any> {
try {
const parsedParams = paramsSchema.safeParse(req.params);
if (!parsedParams.success) {
return next(
createHttpError(
HttpCode.BAD_REQUEST,
fromError(parsedParams.error).toString()
)
);
}
const { orgId, destinationId } = parsedParams.data;
const [existing] = await db
.select()
.from(eventStreamingDestinations)
.where(
and(
eq(eventStreamingDestinations.destinationId, destinationId),
eq(eventStreamingDestinations.orgId, orgId)
)
);
if (!existing) {
return next(
createHttpError(
HttpCode.NOT_FOUND,
"Event streaming destination not found"
)
);
}
await db
.delete(eventStreamingDestinations)
.where(
and(
eq(eventStreamingDestinations.destinationId, destinationId),
eq(eventStreamingDestinations.orgId, orgId)
)
);
return response<null>(res, {
data: null,
success: true,
error: false,
message: "Event streaming destination deleted successfully",
status: HttpCode.OK
});
} catch (error) {
logger.error(error);
return next(
createHttpError(HttpCode.INTERNAL_SERVER_ERROR, "An error occurred")
);
}
}

Some files were not shown because too many files have changed in this diff Show More