From 10fa9274d01940a902898574bf1b8f3162604fab Mon Sep 17 00:00:00 2001
From: Owen
Date: Fri, 8 May 2026 16:26:10 -0700
Subject: [PATCH] Add streaming errors for debug
---
messages/en-US.json | 2 +-
server/db/pg/schema/privateSchema.ts | 2 +
server/db/sqlite/schema/privateSchema.ts | 2 +
.../lib/logStreaming/LogStreamingManager.ts | 49 ++++++++
.../listEventStreamingDestinations.ts | 5 +-
.../[orgId]/settings/logs/streaming/page.tsx | 38 ++++++-
src/components/HttpDestinationCredenza.tsx | 105 ++++++++++++------
src/components/S3DestinationCredenza.tsx | 10 ++
8 files changed, 176 insertions(+), 37 deletions(-)
diff --git a/messages/en-US.json b/messages/en-US.json
index 0e9cb5786..9a23043d5 100644
--- a/messages/en-US.json
+++ b/messages/en-US.json
@@ -3062,7 +3062,7 @@
"streamingDatadogTitle": "Datadog",
"streamingDatadogDescription": "Forward events directly to your Datadog account.",
"streamingTypePickerDescription": "Choose a destination type to get started.",
- "streamingFailedToLoad": "Failed to load destinations",
+ "streamingLastSyncError": "An error occurred on the last sync",
"streamingUnexpectedError": "An unexpected error occurred.",
"streamingFailedToUpdate": "Failed to update destination",
"streamingDeletedSuccess": "Destination deleted successfully",
diff --git a/server/db/pg/schema/privateSchema.ts b/server/db/pg/schema/privateSchema.ts
index 0f1914fad..6137743bf 100644
--- a/server/db/pg/schema/privateSchema.ts
+++ b/server/db/pg/schema/privateSchema.ts
@@ -439,6 +439,8 @@ export const eventStreamingDestinations = pgTable(
type: varchar("type", { length: 50 }).notNull(), // e.g. "http", "kafka", etc.
config: text("config").notNull(), // JSON string with the configuration for the destination
enabled: boolean("enabled").notNull().default(true),
+ lastError: text("lastError"), // last send error message, null if healthy
+ lastErrorAt: bigint("lastErrorAt", { mode: "number" }), // epoch ms of last error, null if healthy
createdAt: bigint("createdAt", { mode: "number" }).notNull(),
updatedAt: bigint("updatedAt", { mode: "number" }).notNull()
}
diff --git a/server/db/sqlite/schema/privateSchema.ts b/server/db/sqlite/schema/privateSchema.ts
index 05c917887..a25183055 100644
--- a/server/db/sqlite/schema/privateSchema.ts
+++ b/server/db/sqlite/schema/privateSchema.ts
@@ -445,6 +445,8 @@ export const eventStreamingDestinations = sqliteTable(
enabled: integer("enabled", { mode: "boolean" })
.notNull()
.default(true),
+ lastError: text("lastError"), // last send error message, null if healthy
+ lastErrorAt: integer("lastErrorAt"), // epoch ms of last error, null if healthy
createdAt: integer("createdAt").notNull(),
updatedAt: integer("updatedAt").notNull()
}
diff --git a/server/private/lib/logStreaming/LogStreamingManager.ts b/server/private/lib/logStreaming/LogStreamingManager.ts
index a9575fec6..03efc2809 100644
--- a/server/private/lib/logStreaming/LogStreamingManager.ts
+++ b/server/private/lib/logStreaming/LogStreamingManager.ts
@@ -313,6 +313,7 @@ export class LogStreamingManager {
if (enabledTypes.length === 0) return;
let anyFailure = false;
+ let firstError: string | null = null;
for (const logType of enabledTypes) {
if (!this.isRunning) break;
@@ -320,6 +321,10 @@ export class LogStreamingManager {
await this.processLogType(dest, provider, logType);
} catch (err) {
anyFailure = true;
+ if (firstError === null) {
+ firstError =
+ err instanceof Error ? err.message : String(err);
+ }
logger.error(
`LogStreamingManager: failed to process "${logType}" logs ` +
`for destination ${dest.destinationId}`,
@@ -330,6 +335,10 @@ export class LogStreamingManager {
if (anyFailure) {
this.recordFailure(dest.destinationId);
+ await this.setDestinationError(
+ dest.destinationId,
+ firstError ?? "Unknown error"
+ );
} else {
// Any success resets the failure/back-off state
if (this.failures.has(dest.destinationId)) {
@@ -338,6 +347,7 @@ export class LogStreamingManager {
`LogStreamingManager: destination ${dest.destinationId} recovered`
);
}
+ await this.clearDestinationError(dest.destinationId);
}
}
@@ -759,6 +769,45 @@ export class LogStreamingManager {
// DB helpers
// -------------------------------------------------------------------------
+ private async setDestinationError(
+ destinationId: number,
+ errorMessage: string
+ ): Promise {
+ // Truncate to 1000 chars so it fits comfortably in the text column.
+ const truncated = errorMessage.slice(0, 1000);
+ try {
+ await db
+ .update(eventStreamingDestinations)
+ .set({ lastError: truncated, lastErrorAt: Date.now() })
+ .where(
+ eq(eventStreamingDestinations.destinationId, destinationId)
+ );
+ } catch (err) {
+ logger.warn(
+ `LogStreamingManager: could not persist error status for destination ${destinationId}`,
+ err
+ );
+ }
+ }
+
+ private async clearDestinationError(destinationId: number): Promise {
+ try {
+ // Only update if there is actually an error stored, to avoid
+ // unnecessary writes on every successful poll cycle.
+ await db
+ .update(eventStreamingDestinations)
+ .set({ lastError: null, lastErrorAt: null })
+ .where(
+ eq(eventStreamingDestinations.destinationId, destinationId)
+ );
+ } catch (err) {
+ logger.warn(
+ `LogStreamingManager: could not clear error status for destination ${destinationId}`,
+ err
+ );
+ }
+ }
+
private async loadEnabledDestinations(): Promise<
EventStreamingDestination[]
> {
diff --git a/server/private/routers/eventStreamingDestination/listEventStreamingDestinations.ts b/server/private/routers/eventStreamingDestination/listEventStreamingDestinations.ts
index 10a6c3600..27b5d9a5b 100644
--- a/server/private/routers/eventStreamingDestination/listEventStreamingDestinations.ts
+++ b/server/private/routers/eventStreamingDestination/listEventStreamingDestinations.ts
@@ -51,6 +51,8 @@ export type ListEventStreamingDestinationsResponse = {
type: string;
config: string;
enabled: boolean;
+ lastError: string | null;
+ lastErrorAt: number | null;
createdAt: number;
updatedAt: number;
sendConnectionLogs: boolean;
@@ -79,7 +81,8 @@ async function query(orgId: string, limit: number, offset: number) {
registry.registerPath({
method: "get",
path: "/org/{orgId}/event-streaming-destination",
- description: "List all event streaming destinations for a specific organization.",
+ description:
+ "List all event streaming destinations for a specific organization.",
tags: [OpenAPITags.Org],
request: {
query: querySchema,
diff --git a/src/app/[orgId]/settings/logs/streaming/page.tsx b/src/app/[orgId]/settings/logs/streaming/page.tsx
index 661fbd786..8579527d0 100644
--- a/src/app/[orgId]/settings/logs/streaming/page.tsx
+++ b/src/app/[orgId]/settings/logs/streaming/page.tsx
@@ -22,7 +22,18 @@ import {
} from "@app/components/Credenza";
import { Button } from "@app/components/ui/button";
import { Switch } from "@app/components/ui/switch";
-import { Globe, MoreHorizontal, Plus } from "lucide-react";
+import {
+ Globe,
+ MoreHorizontal,
+ Plus,
+ AlertCircle,
+ ChevronDown
+} from "lucide-react";
+import {
+ Popover,
+ PopoverContent,
+ PopoverTrigger
+} from "@app/components/ui/popover";
import { AxiosResponse } from "axios";
import { build } from "@server/build";
import Image from "next/image";
@@ -153,6 +164,31 @@ function DestinationCard({
)}
+ {/* Error indicator */}
+ {destination.lastError && (
+
+
+
+
+
+ {destination.lastError}
+
+
+ )}
+
{/* Footer: edit button + three-dots menu */}
- updateRow(i, "value", e.target.value)
- }
+ onChange={(e) => updateRow(i, "value", e.target.value)}
placeholder={t("httpDestHeaderValuePlaceholder")}
className="flex-1"
/>
@@ -200,10 +201,7 @@ export function HttpDestinationCredenza({
if (!raw) return null;
try {
const parsed = new URL(raw);
- if (
- parsed.protocol !== "http:" &&
- parsed.protocol !== "https:"
- ) {
+ if (parsed.protocol !== "http:" && parsed.protocol !== "https:") {
return t("httpDestUrlErrorHttpRequired");
}
if (build === "saas" && parsed.protocol !== "https:") {
@@ -216,9 +214,7 @@ export function HttpDestinationCredenza({
})();
const isValid =
- cfg.name.trim() !== "" &&
- cfg.url.trim() !== "" &&
- urlError === null;
+ cfg.name.trim() !== "" && cfg.url.trim() !== "" && urlError === null;
async function handleSave() {
if (!isValid) return;
@@ -253,10 +249,7 @@ export function HttpDestinationCredenza({
title: editing
? t("httpDestUpdateFailed")
: t("httpDestCreateFailed"),
- description: formatAxiosError(
- e,
- t("streamingUnexpectedError")
- )
+ description: formatAxiosError(e, t("streamingUnexpectedError"))
});
} finally {
setSaving(false);
@@ -280,6 +273,14 @@ export function HttpDestinationCredenza({
+ {editing?.lastError && (
+
+
+
+ {editing.lastError}
+
+
+ )}
- {t("httpDestAuthNoneDescription")}
+ {t(
+ "httpDestAuthNoneDescription"
+ )}
@@ -375,15 +378,21 @@ export function HttpDestinationCredenza({
htmlFor="auth-bearer"
className="cursor-pointer font-medium"
>
- {t("httpDestAuthBearerTitle")}
+ {t(
+ "httpDestAuthBearerTitle"
+ )}
- {t("httpDestAuthBearerDescription")}
+ {t(
+ "httpDestAuthBearerDescription"
+ )}
{cfg.authType === "bearer" && (
- {t("httpDestAuthBasicTitle")}
+ {t(
+ "httpDestAuthBasicTitle"
+ )}
- {t("httpDestAuthBasicDescription")}
+ {t(
+ "httpDestAuthBasicDescription"
+ )}
{cfg.authType === "basic" && (
- {t("httpDestAuthCustomTitle")}
+ {t(
+ "httpDestAuthCustomTitle"
+ )}
- {t("httpDestAuthCustomDescription")}
+ {t(
+ "httpDestAuthCustomDescription"
+ )}
{cfg.authType === "custom" && (
@@ -616,7 +643,9 @@ export function HttpDestinationCredenza({
{t("httpDestFormatNdjsonTitle")}
- {t("httpDestFormatNdjsonDescription")}
+ {t(
+ "httpDestFormatNdjsonDescription"
+ )}
@@ -636,7 +665,9 @@ export function HttpDestinationCredenza({
{t("httpDestFormatSingleTitle")}
- {t("httpDestFormatSingleDescription")}
+ {t(
+ "httpDestFormatSingleDescription"
+ )}
@@ -717,7 +748,9 @@ export function HttpDestinationCredenza({
{t("httpDestConnectionLogsTitle")}
- {t("httpDestConnectionLogsDescription")}
+ {t(
+ "httpDestConnectionLogsDescription"
+ )}
@@ -739,7 +772,9 @@ export function HttpDestinationCredenza({
{t("httpDestRequestLogsTitle")}
- {t("httpDestRequestLogsDescription")}
+ {t(
+ "httpDestRequestLogsDescription"
+ )}
@@ -764,10 +799,12 @@ export function HttpDestinationCredenza({
loading={saving}
disabled={!isValid || saving}
>
- {editing ? t("httpDestSaveChanges") : t("httpDestCreateDestination")}
+ {editing
+ ? t("httpDestSaveChanges")
+ : t("httpDestCreateDestination")}
);
-}
\ No newline at end of file
+}
diff --git a/src/components/S3DestinationCredenza.tsx b/src/components/S3DestinationCredenza.tsx
index 03b055f58..e6c128805 100644
--- a/src/components/S3DestinationCredenza.tsx
+++ b/src/components/S3DestinationCredenza.tsx
@@ -18,6 +18,8 @@ import { Switch } from "@app/components/ui/switch";
import { HorizontalTabs } from "@app/components/HorizontalTabs";
import { RadioGroup, RadioGroupItem } from "@app/components/ui/radio-group";
import { Checkbox } from "@app/components/ui/checkbox";
+import { AlertCircle } from "lucide-react";
+import { Alert, AlertDescription } from "@app/components/ui/alert";
import { createApiClient, formatAxiosError } from "@app/lib/api";
import { useEnvContext } from "@app/hooks/useEnvContext";
import { toast } from "@app/hooks/useToast";
@@ -164,6 +166,14 @@ export function S3DestinationCredenza({
+ {editing?.lastError && (
+
+
+
+ {editing.lastError}
+
+
+ )}