Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 1 addition & 19 deletions apps/sim/app/api/a2a/serve/[agentId]/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import { validateUrlWithDNS } from '@/lib/core/security/input-validation.server'
import { SSE_HEADERS } from '@/lib/core/utils/sse'
import { getBaseUrl } from '@/lib/core/utils/urls'
import { markExecutionCancelled } from '@/lib/execution/cancellation'
import { decrementSSEConnections, incrementSSEConnections } from '@/lib/monitoring/sse-connections'
import { checkWorkspaceAccess } from '@/lib/workspaces/permissions/utils'
import { getWorkspaceBilledAccountUserId } from '@/lib/workspaces/utils'
import {
Expand Down Expand Up @@ -631,11 +630,9 @@ async function handleMessageStream(
}

const encoder = new TextEncoder()
let messageStreamDecremented = false

const stream = new ReadableStream({
async start(controller) {
incrementSSEConnections('a2a-message')
const sendEvent = (event: string, data: unknown) => {
try {
const jsonRpcResponse = {
Expand Down Expand Up @@ -845,19 +842,10 @@ async function handleMessageStream(
})
} finally {
await releaseLock(lockKey, lockValue)
if (!messageStreamDecremented) {
messageStreamDecremented = true
decrementSSEConnections('a2a-message')
}
controller.close()
}
},
cancel() {
if (!messageStreamDecremented) {
messageStreamDecremented = true
decrementSSEConnections('a2a-message')
}
},
cancel() {},
})

return new NextResponse(stream, {
Expand Down Expand Up @@ -1042,22 +1030,16 @@ async function handleTaskResubscribe(
{ once: true }
)

let sseDecremented = false
const cleanup = () => {
isCancelled = true
if (pollTimeoutId) {
clearTimeout(pollTimeoutId)
pollTimeoutId = null
}
if (!sseDecremented) {
sseDecremented = true
decrementSSEConnections('a2a-resubscribe')
}
}

const stream = new ReadableStream({
async start(controller) {
incrementSSEConnections('a2a-resubscribe')
const sendEvent = (event: string, data: unknown): boolean => {
if (isCancelled || abortSignal.aborted) return false
try {
Expand Down
4 changes: 0 additions & 4 deletions apps/sim/app/api/mcp/events/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import { getSession } from '@/lib/auth'
import { SSE_HEADERS } from '@/lib/core/utils/sse'
import { mcpConnectionManager } from '@/lib/mcp/connection-manager'
import { mcpPubSub } from '@/lib/mcp/pubsub'
import { decrementSSEConnections, incrementSSEConnections } from '@/lib/monitoring/sse-connections'
import { getUserEntityPermissions } from '@/lib/workspaces/permissions/utils'

const logger = createLogger('McpEventsSSE')
Expand Down Expand Up @@ -50,14 +49,11 @@ export async function GET(request: NextRequest) {
for (const unsub of unsubscribers) {
unsub()
}
decrementSSEConnections('mcp-events')
logger.info(`SSE connection closed for workspace ${workspaceId}`)
}

const stream = new ReadableStream({
start(controller) {
incrementSSEConnections('mcp-events')

const send = (eventName: string, data: Record<string, unknown>) => {
if (cleaned) return
try {
Expand Down
16 changes: 1 addition & 15 deletions apps/sim/app/api/wand/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import { checkAndBillOverageThreshold } from '@/lib/billing/threshold-billing'
import { env } from '@/lib/core/config/env'
import { getCostMultiplier, isBillingEnabled } from '@/lib/core/config/feature-flags'
import { generateRequestId } from '@/lib/core/utils/request'
import { decrementSSEConnections, incrementSSEConnections } from '@/lib/monitoring/sse-connections'
import { enrichTableSchema } from '@/lib/table/llm/wand'
import { verifyWorkspaceMembership } from '@/app/api/workflows/utils'
import { extractResponseText, parseResponsesUsage } from '@/providers/openai/utils'
Expand Down Expand Up @@ -331,14 +330,10 @@ export async function POST(req: NextRequest) {
const encoder = new TextEncoder()
const decoder = new TextDecoder()

let wandStreamClosed = false
const readable = new ReadableStream({
async start(controller) {
incrementSSEConnections('wand')
const reader = response.body?.getReader()
if (!reader) {
wandStreamClosed = true
decrementSSEConnections('wand')
controller.close()
return
}
Expand Down Expand Up @@ -483,18 +478,9 @@ export async function POST(req: NextRequest) {
controller.close()
} finally {
reader.releaseLock()
if (!wandStreamClosed) {
wandStreamClosed = true
decrementSSEConnections('wand')
}
}
},
cancel() {
if (!wandStreamClosed) {
wandStreamClosed = true
decrementSSEConnections('wand')
}
},
cancel() {},
})

return new Response(readable, {
Expand Down
11 changes: 0 additions & 11 deletions apps/sim/app/api/workflows/[id]/execute/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import { createExecutionEventWriter, setExecutionMeta } from '@/lib/execution/ev
import { processInputFileFields } from '@/lib/execution/files'
import { preprocessExecution } from '@/lib/execution/preprocessing'
import { LoggingSession } from '@/lib/logs/execution/logging-session'
import { decrementSSEConnections, incrementSSEConnections } from '@/lib/monitoring/sse-connections'
import {
cleanupExecutionBase64Cache,
hydrateUserFilesWithBase64,
Expand Down Expand Up @@ -764,7 +763,6 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
const encoder = new TextEncoder()
const timeoutController = createTimeoutAbortController(preprocessResult.executionTimeout?.sync)
let isStreamClosed = false
let sseDecremented = false

const eventWriter = createExecutionEventWriter(executionId)
setExecutionMeta(executionId, {
Expand All @@ -775,7 +773,6 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:

const stream = new ReadableStream<Uint8Array>({
async start(controller) {
incrementSSEConnections('workflow-execute')
let finalMetaStatus: 'complete' | 'error' | 'cancelled' | null = null

const sendEvent = (event: ExecutionEvent) => {
Expand Down Expand Up @@ -1159,10 +1156,6 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
if (executionId) {
await cleanupExecutionBase64Cache(executionId)
}
if (!sseDecremented) {
sseDecremented = true
decrementSSEConnections('workflow-execute')
}
if (!isStreamClosed) {
try {
controller.enqueue(encoder.encode('data: [DONE]\n\n'))
Expand All @@ -1174,10 +1167,6 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
cancel() {
isStreamClosed = true
logger.info(`[${requestId}] Client disconnected from SSE stream`)
if (!sseDecremented) {
sseDecremented = true
decrementSSEConnections('workflow-execute')
}
},
})

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import {
getExecutionMeta,
readExecutionEvents,
} from '@/lib/execution/event-buffer'
import { decrementSSEConnections, incrementSSEConnections } from '@/lib/monitoring/sse-connections'
import { formatSSEEvent } from '@/lib/workflows/executor/execution-events'
import { authorizeWorkflowByWorkspacePermission } from '@/lib/workflows/utils'

Expand Down Expand Up @@ -74,10 +73,8 @@ export async function GET(

let closed = false

let sseDecremented = false
const stream = new ReadableStream<Uint8Array>({
async start(controller) {
incrementSSEConnections('execution-stream-reconnect')
let lastEventId = fromEventId
const pollDeadline = Date.now() + MAX_POLL_DURATION_MS

Expand Down Expand Up @@ -145,20 +142,11 @@ export async function GET(
controller.close()
} catch {}
}
} finally {
if (!sseDecremented) {
sseDecremented = true
decrementSSEConnections('execution-stream-reconnect')
}
}
},
cancel() {
closed = true
logger.info('Client disconnected from reconnection stream', { executionId })
if (!sseDecremented) {
sseDecremented = true
decrementSSEConnections('execution-stream-reconnect')
}
},
})

Expand Down
22 changes: 2 additions & 20 deletions apps/sim/lib/monitoring/memory-telemetry.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,10 @@
/**
* Periodic memory telemetry for diagnosing heap growth in production.
* Logs process.memoryUsage(), V8 heap stats, and active SSE connection
* counts every 60s, enabling correlation between connection leaks and
* memory spikes.
* Periodic memory telemetry for monitoring heap growth in production.
* Logs process.memoryUsage() and V8 heap stats every 60s.
*/

import v8 from 'node:v8'
import { createLogger } from '@sim/logger'
import {
getActiveSSEConnectionCount,
getActiveSSEConnectionsByRoute,
} from '@/lib/monitoring/sse-connections'

const logger = createLogger('MemoryTelemetry', { logLevel: 'INFO' })

Expand All @@ -23,16 +17,6 @@ export function startMemoryTelemetry(intervalMs = 60_000) {
started = true

const timer = setInterval(() => {
// Trigger opportunistic (non-blocking) garbage collection if running on Bun.
// This signals JSC GC + mimalloc page purge without blocking the event loop,
// helping reclaim RSS that mimalloc otherwise retains under sustained load.
const bunGlobal = (globalThis as Record<string, unknown>).Bun as
| { gc?: (force: boolean) => void }
| undefined
if (typeof bunGlobal?.gc === 'function') {
bunGlobal.gc(false)
}

const mem = process.memoryUsage()
const heap = v8.getHeapStatistics()

Expand All @@ -49,8 +33,6 @@ export function startMemoryTelemetry(intervalMs = 60_000) {
? process.getActiveResourcesInfo().length
: -1,
uptimeMin: Math.round(process.uptime() / 60),
activeSSEConnections: getActiveSSEConnectionCount(),
sseByRoute: getActiveSSEConnectionsByRoute(),
})
}, intervalMs)
timer.unref()
Expand Down
27 changes: 0 additions & 27 deletions apps/sim/lib/monitoring/sse-connections.ts

This file was deleted.