Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 18 additions & 3 deletions apps/sim/app/api/copilot/chat/delete/route.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import { db } from '@sim/db'
import { copilotChats } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { eq } from 'drizzle-orm'
import { and, eq } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { z } from 'zod'
import { getSession } from '@/lib/auth'
import { taskPubSub } from '@/lib/copilot/task-events'

const logger = createLogger('DeleteChatAPI')

Expand All @@ -22,11 +23,25 @@ export async function DELETE(request: NextRequest) {
const body = await request.json()
const parsed = DeleteChatSchema.parse(body)

// Delete the chat
await db.delete(copilotChats).where(eq(copilotChats.id, parsed.chatId))
const [deleted] = await db
.delete(copilotChats)
.where(and(eq(copilotChats.id, parsed.chatId), eq(copilotChats.userId, session.user.id)))
.returning({ workspaceId: copilotChats.workspaceId })

if (!deleted) {
return NextResponse.json({ success: false, error: 'Chat not found' }, { status: 404 })
}

logger.info('Chat deleted', { chatId: parsed.chatId })

if (deleted.workspaceId) {
taskPubSub?.publishStatusChanged({
workspaceId: deleted.workspaceId,
chatId: parsed.chatId,
type: 'deleted',
})
}

return NextResponse.json({ success: true })
} catch (error) {
logger.error('Error deleting chat:', error)
Expand Down
14 changes: 12 additions & 2 deletions apps/sim/app/api/copilot/chat/rename/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { and, eq } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { z } from 'zod'
import { getSession } from '@/lib/auth'
import { taskPubSub } from '@/lib/copilot/task-events'

const logger = createLogger('RenameChatAPI')

Expand All @@ -23,18 +24,27 @@ export async function PATCH(request: NextRequest) {
const body = await request.json()
const { chatId, title } = RenameChatSchema.parse(body)

const now = new Date()
const [updated] = await db
.update(copilotChats)
.set({ title, updatedAt: new Date() })
.set({ title, updatedAt: now, lastSeenAt: now })
.where(and(eq(copilotChats.id, chatId), eq(copilotChats.userId, session.user.id)))
.returning({ id: copilotChats.id })
.returning({ id: copilotChats.id, workspaceId: copilotChats.workspaceId })

if (!updated) {
return NextResponse.json({ success: false, error: 'Chat not found' }, { status: 404 })
}

logger.info('Chat renamed', { chatId, title })

if (updated.workspaceId) {
taskPubSub?.publishStatusChanged({
workspaceId: updated.workspaceId,
chatId,
type: 'renamed',
})
}

return NextResponse.json({ success: true })
} catch (error) {
if (error instanceof z.ZodError) {
Expand Down
121 changes: 17 additions & 104 deletions apps/sim/app/api/mcp/events/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,127 +8,40 @@
* Auth is handled via session cookies (EventSource sends cookies automatically).
*/

import { createLogger } from '@sim/logger'
import type { NextRequest } from 'next/server'
import { getSession } from '@/lib/auth'
import { SSE_HEADERS } from '@/lib/core/utils/sse'
import { createWorkspaceSSE } from '@/lib/events/sse-endpoint'
import { mcpConnectionManager } from '@/lib/mcp/connection-manager'
import { mcpPubSub } from '@/lib/mcp/pubsub'
import { decrementSSEConnections, incrementSSEConnections } from '@/lib/monitoring/sse-connections'
import { getUserEntityPermissions } from '@/lib/workspaces/permissions/utils'

const logger = createLogger('McpEventsSSE')

export const dynamic = 'force-dynamic'

const HEARTBEAT_INTERVAL_MS = 30_000

export async function GET(request: NextRequest) {
const session = await getSession()
if (!session?.user?.id) {
return new Response('Unauthorized', { status: 401 })
}

const { searchParams } = new URL(request.url)
const workspaceId = searchParams.get('workspaceId')
if (!workspaceId) {
return new Response('Missing workspaceId query parameter', { status: 400 })
}

const permissions = await getUserEntityPermissions(session.user.id, 'workspace', workspaceId)
if (!permissions) {
return new Response('Access denied to workspace', { status: 403 })
}

const encoder = new TextEncoder()
const unsubscribers: Array<() => void> = []
let cleaned = false

const cleanup = () => {
if (cleaned) return
cleaned = true
for (const unsub of unsubscribers) {
unsub()
}
decrementSSEConnections('mcp-events')
logger.info(`SSE connection closed for workspace ${workspaceId}`)
}

const stream = new ReadableStream({
start(controller) {
incrementSSEConnections('mcp-events')

const send = (eventName: string, data: Record<string, unknown>) => {
if (cleaned) return
try {
controller.enqueue(
encoder.encode(`event: ${eventName}\ndata: ${JSON.stringify(data)}\n\n`)
)
} catch {
// Stream already closed
}
}

// Subscribe to external MCP server tool changes
if (mcpConnectionManager) {
const unsub = mcpConnectionManager.subscribe((event) => {
export const GET = createWorkspaceSSE({
label: 'mcp-events',
subscriptions: [
{
subscribe: (workspaceId, send) => {
if (!mcpConnectionManager) return () => {}
return mcpConnectionManager.subscribe((event) => {
if (event.workspaceId !== workspaceId) return
send('tools_changed', {
source: 'external',
serverId: event.serverId,
timestamp: event.timestamp,
})
})
unsubscribers.push(unsub)
}

// Subscribe to workflow CRUD tool changes
if (mcpPubSub) {
const unsub = mcpPubSub.onWorkflowToolsChanged((event) => {
},
},
{
subscribe: (workspaceId, send) => {
if (!mcpPubSub) return () => {}
return mcpPubSub.onWorkflowToolsChanged((event) => {
if (event.workspaceId !== workspaceId) return
send('tools_changed', {
source: 'workflow',
serverId: event.serverId,
timestamp: Date.now(),
})
})
unsubscribers.push(unsub)
}

// Heartbeat to keep the connection alive
const heartbeat = setInterval(() => {
if (cleaned) {
clearInterval(heartbeat)
return
}
try {
controller.enqueue(encoder.encode(': heartbeat\n\n'))
} catch {
clearInterval(heartbeat)
}
}, HEARTBEAT_INTERVAL_MS)
unsubscribers.push(() => clearInterval(heartbeat))

// Cleanup when client disconnects
request.signal.addEventListener(
'abort',
() => {
cleanup()
try {
controller.close()
} catch {
// Already closed
}
},
{ once: true }
)

logger.info(`SSE connection opened for workspace ${workspaceId}`)
},
},
cancel() {
cleanup()
},
})

return new Response(stream, { headers: SSE_HEADERS })
}
],
})
10 changes: 10 additions & 0 deletions apps/sim/app/api/mothership/chat/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { buildCopilotRequestPayload } from '@/lib/copilot/chat-payload'
import { createSSEStream, SSE_RESPONSE_HEADERS } from '@/lib/copilot/chat-streaming'
import type { OrchestratorResult } from '@/lib/copilot/orchestrator/types'
import { createRequestTracker, createUnauthorizedResponse } from '@/lib/copilot/request-helpers'
import { taskPubSub } from '@/lib/copilot/task-events'
import { generateWorkspaceContext } from '@/lib/copilot/workspace-context'
import { getUserEntityPermissions } from '@/lib/workspaces/permissions/utils'

Expand Down Expand Up @@ -146,6 +147,7 @@ export async function POST(req: NextRequest) {
if (updated) {
const freshMessages: any[] = Array.isArray(updated.messages) ? updated.messages : []
conversationHistory = freshMessages.filter((m: any) => m.id !== userMessageId)
taskPubSub?.publishStatusChanged({ workspaceId, chatId: actualChatId, type: 'started' })
}
}

Expand Down Expand Up @@ -182,6 +184,7 @@ export async function POST(req: NextRequest) {
message,
titleModel: 'claude-opus-4-5',
requestId: tracker.requestId,
workspaceId,
orchestrateOptions: {
userId: authenticatedUserId,
workspaceId,
Expand Down Expand Up @@ -243,8 +246,15 @@ export async function POST(req: NextRequest) {
.set({
messages: sql`${copilotChats.messages} || ${JSON.stringify([assistantMessage])}::jsonb`,
conversationId: sql`CASE WHEN ${copilotChats.conversationId} = ${userMessageId} THEN NULL ELSE ${copilotChats.conversationId} END`,
updatedAt: new Date(),
})
.where(eq(copilotChats.id, actualChatId))

taskPubSub?.publishStatusChanged({
workspaceId,
chatId: actualChatId,
type: 'completed',
})
}
} catch (error) {
logger.error(`[${tracker.requestId}] Failed to persist chat messages`, {
Expand Down
12 changes: 11 additions & 1 deletion apps/sim/app/api/mothership/chat/stop/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { and, eq, sql } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { z } from 'zod'
import { getSession } from '@/lib/auth'
import { taskPubSub } from '@/lib/copilot/task-events'

const logger = createLogger('MothershipChatStopAPI')

Expand Down Expand Up @@ -78,7 +79,7 @@ export async function POST(req: NextRequest) {
setClause.messages = sql`${copilotChats.messages} || ${JSON.stringify([assistantMessage])}::jsonb`
}

await db
const [updated] = await db
.update(copilotChats)
.set(setClause)
.where(
Expand All @@ -88,6 +89,15 @@ export async function POST(req: NextRequest) {
eq(copilotChats.conversationId, streamId)
)
)
.returning({ workspaceId: copilotChats.workspaceId })

if (updated?.workspaceId) {
taskPubSub?.publishStatusChanged({
workspaceId: updated.workspaceId,
chatId,
type: 'completed',
})
}

return NextResponse.json({ success: true })
} catch (error) {
Expand Down
43 changes: 43 additions & 0 deletions apps/sim/app/api/mothership/chats/read/route.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import { db } from '@sim/db'
import { copilotChats } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { and, eq } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { z } from 'zod'
import {
authenticateCopilotRequestSessionOnly,
createBadRequestResponse,
createInternalServerErrorResponse,
createUnauthorizedResponse,
} from '@/lib/copilot/request-helpers'

const logger = createLogger('MarkTaskReadAPI')

const MarkReadSchema = z.object({
chatId: z.string().min(1),
})

export async function POST(request: NextRequest) {
try {
const { userId, isAuthenticated } = await authenticateCopilotRequestSessionOnly()
if (!isAuthenticated || !userId) {
return createUnauthorizedResponse()
}

const body = await request.json()
const { chatId } = MarkReadSchema.parse(body)

await db
.update(copilotChats)
.set({ lastSeenAt: new Date() })
.where(and(eq(copilotChats.id, chatId), eq(copilotChats.userId, userId)))

return NextResponse.json({ success: true })
} catch (error) {
if (error instanceof z.ZodError) {
return createBadRequestResponse('chatId is required')
}
logger.error('Error marking task as read:', error)
return createInternalServerErrorResponse('Failed to mark task as read')
}
}
8 changes: 8 additions & 0 deletions apps/sim/app/api/mothership/chats/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import {
createInternalServerErrorResponse,
createUnauthorizedResponse,
} from '@/lib/copilot/request-helpers'
import { taskPubSub } from '@/lib/copilot/task-events'

const logger = createLogger('MothershipChatsAPI')

Expand All @@ -34,6 +35,8 @@ export async function GET(request: NextRequest) {
id: copilotChats.id,
title: copilotChats.title,
updatedAt: copilotChats.updatedAt,
conversationId: copilotChats.conversationId,
lastSeenAt: copilotChats.lastSeenAt,
})
.from(copilotChats)
.where(
Expand Down Expand Up @@ -70,6 +73,7 @@ export async function POST(request: NextRequest) {
const body = await request.json()
const { workspaceId } = CreateChatSchema.parse(body)

const now = new Date()
const [chat] = await db
.insert(copilotChats)
.values({
Expand All @@ -79,9 +83,13 @@ export async function POST(request: NextRequest) {
title: null,
model: 'claude-opus-4-5',
messages: [],
updatedAt: now,
lastSeenAt: now,
})
.returning({ id: copilotChats.id })

taskPubSub?.publishStatusChanged({ workspaceId, chatId: chat.id, type: 'created' })

return NextResponse.json({ success: true, id: chat.id })
} catch (error) {
if (error instanceof z.ZodError) {
Expand Down
Loading