diff --git a/apps/sim/app/api/webhooks/trigger/[path]/route.test.ts b/apps/sim/app/api/webhooks/trigger/[path]/route.test.ts index 640b2d01808..3f02f330e8e 100644 --- a/apps/sim/app/api/webhooks/trigger/[path]/route.test.ts +++ b/apps/sim/app/api/webhooks/trigger/[path]/route.test.ts @@ -324,7 +324,9 @@ vi.mock('@/lib/webhooks/processor', () => ({ return null } ), - checkWebhookPreprocessing: vi.fn().mockResolvedValue(null), + checkWebhookPreprocessing: vi + .fn() + .mockResolvedValue({ error: null, actorUserId: 'test-user-id' }), formatProviderErrorResponse: vi.fn().mockImplementation((_webhook, error, status) => { const { NextResponse } = require('next/server') return NextResponse.json({ error }, { status }) diff --git a/apps/sim/app/api/webhooks/trigger/[path]/route.ts b/apps/sim/app/api/webhooks/trigger/[path]/route.ts index ba08df3907e..139bdf7a483 100644 --- a/apps/sim/app/api/webhooks/trigger/[path]/route.ts +++ b/apps/sim/app/api/webhooks/trigger/[path]/route.ts @@ -4,7 +4,6 @@ import { generateRequestId } from '@/lib/core/utils/request' import { checkWebhookPreprocessing, findAllWebhooksForPath, - formatProviderErrorResponse, handlePreDeploymentVerification, handleProviderChallenges, handleProviderReachabilityTest, @@ -82,7 +81,6 @@ export async function POST( requestId ) if (authError) { - // For multi-webhook, log and continue to next webhook if (webhooksForPath.length > 1) { logger.warn(`[${requestId}] Auth failed for webhook ${foundWebhook.id}, continuing to next`) continue @@ -92,39 +90,18 @@ export async function POST( const reachabilityResponse = handleProviderReachabilityTest(foundWebhook, body, requestId) if (reachabilityResponse) { - // Reachability test should return immediately for the first webhook return reachabilityResponse } - let preprocessError: NextResponse | null = null - try { - preprocessError = await checkWebhookPreprocessing(foundWorkflow, foundWebhook, requestId) - if (preprocessError) { - if (webhooksForPath.length > 1) { - logger.warn( - `[${requestId}] Preprocessing failed for webhook ${foundWebhook.id}, continuing to next` - ) - continue - } - return preprocessError - } - } catch (error) { - logger.error(`[${requestId}] Unexpected error during webhook preprocessing`, { - error: error instanceof Error ? error.message : String(error), - stack: error instanceof Error ? error.stack : undefined, - webhookId: foundWebhook.id, - workflowId: foundWorkflow.id, - }) - + const preprocessResult = await checkWebhookPreprocessing(foundWorkflow, foundWebhook, requestId) + if (preprocessResult.error) { if (webhooksForPath.length > 1) { + logger.warn( + `[${requestId}] Preprocessing failed for webhook ${foundWebhook.id}, continuing to next` + ) continue } - - return formatProviderErrorResponse( - foundWebhook, - 'An unexpected error occurred during preprocessing', - 500 - ) + return preprocessResult.error } if (foundWebhook.blockId) { @@ -152,6 +129,7 @@ export async function POST( const response = await queueWebhookExecution(foundWebhook, foundWorkflow, body, request, { requestId, path, + actorUserId: preprocessResult.actorUserId, }) responses.push(response) } diff --git a/apps/sim/background/webhook-execution.ts b/apps/sim/background/webhook-execution.ts index 5113dc075c5..26b2179012c 100644 --- a/apps/sim/background/webhook-execution.ts +++ b/apps/sim/background/webhook-execution.ts @@ -1,18 +1,13 @@ import { db } from '@sim/db' -import { webhook, workflow as workflowTable } from '@sim/db/schema' +import { account, webhook } from '@sim/db/schema' import { createLogger } from '@sim/logger' import { task } from '@trigger.dev/sdk' import { eq } from 'drizzle-orm' import { v4 as uuidv4 } from 'uuid' -import { getHighestPrioritySubscription } from '@/lib/billing' -import { - createTimeoutAbortController, - getExecutionTimeout, - getTimeoutErrorMessage, -} from '@/lib/core/execution-limits' +import { createTimeoutAbortController, getTimeoutErrorMessage } from '@/lib/core/execution-limits' import { IdempotencyService, webhookIdempotency } from '@/lib/core/idempotency' -import type { SubscriptionPlan } from '@/lib/core/rate-limiter/types' import { processExecutionFiles } from '@/lib/execution/files' +import { preprocessExecution } from '@/lib/execution/preprocessing' import { LoggingSession } from '@/lib/logs/execution/logging-session' import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans' import { WebhookAttachmentProcessor } from '@/lib/webhooks/attachment-processor' @@ -20,7 +15,7 @@ import { fetchAndProcessAirtablePayloads, formatWebhookInput } from '@/lib/webho import { executeWorkflowCore } from '@/lib/workflows/executor/execution-core' import { PauseResumeManager } from '@/lib/workflows/executor/human-in-the-loop-manager' import { loadDeployedWorkflowState } from '@/lib/workflows/persistence/utils' -import { getWorkflowById } from '@/lib/workflows/utils' +import { resolveOAuthAccountId } from '@/app/api/auth/oauth/utils' import { getBlock } from '@/blocks' import { ExecutionSnapshot } from '@/executor/execution/snapshot' import type { ExecutionMetadata } from '@/executor/execution/types' @@ -109,8 +104,8 @@ export type WebhookExecutionPayload = { headers: Record path: string blockId?: string + workspaceId?: string credentialId?: string - credentialAccountUserId?: string } export async function executeWebhookJob(payload: WebhookExecutionPayload) { @@ -143,6 +138,22 @@ export async function executeWebhookJob(payload: WebhookExecutionPayload) { ) } +/** + * Resolve the account userId for a credential + */ +async function resolveCredentialAccountUserId(credentialId: string): Promise { + const resolved = await resolveOAuthAccountId(credentialId) + if (!resolved) { + return undefined + } + const [credentialRecord] = await db + .select({ userId: account.userId }) + .from(account) + .where(eq(account.id, resolved.accountId)) + .limit(1) + return credentialRecord?.userId +} + async function executeWebhookJobInternal( payload: WebhookExecutionPayload, executionId: string, @@ -155,17 +166,56 @@ async function executeWebhookJobInternal( requestId ) - const userSubscription = await getHighestPrioritySubscription(payload.userId) - const asyncTimeout = getExecutionTimeout( - userSubscription?.plan as SubscriptionPlan | undefined, - 'async' - ) + // Resolve workflow record, billing actor, subscription, and timeout + const preprocessResult = await preprocessExecution({ + workflowId: payload.workflowId, + userId: payload.userId, + triggerType: 'webhook', + executionId, + requestId, + checkRateLimit: false, + checkDeployment: false, + skipUsageLimits: true, + workspaceId: payload.workspaceId, + loggingSession, + }) + + if (!preprocessResult.success) { + throw new Error(preprocessResult.error?.message || 'Preprocessing failed in background job') + } + + const { workflowRecord, executionTimeout } = preprocessResult + if (!workflowRecord) { + throw new Error(`Workflow ${payload.workflowId} not found during preprocessing`) + } + + const workspaceId = workflowRecord.workspaceId + if (!workspaceId) { + throw new Error(`Workflow ${payload.workflowId} has no associated workspace`) + } + + const workflowVariables = (workflowRecord.variables as Record) || {} + const asyncTimeout = executionTimeout?.async ?? 120_000 const timeoutController = createTimeoutAbortController(asyncTimeout) let deploymentVersionId: string | undefined try { - const workflowData = await loadDeployedWorkflowState(payload.workflowId) + // Parallelize workflow state, webhook record, and credential resolution + const [workflowData, webhookRows, resolvedCredentialUserId] = await Promise.all([ + loadDeployedWorkflowState(payload.workflowId, workspaceId), + db.select().from(webhook).where(eq(webhook.id, payload.webhookId)).limit(1), + payload.credentialId + ? resolveCredentialAccountUserId(payload.credentialId) + : Promise.resolve(undefined), + ]) + const credentialAccountUserId = resolvedCredentialUserId + if (payload.credentialId && !credentialAccountUserId) { + logger.warn( + `[${requestId}] Failed to resolve credential account for credential ${payload.credentialId}` + ) + } + if (!workflowData) { throw new Error( 'Workflow state not found. The workflow may not be deployed or the deployment data may be corrupted.' @@ -178,28 +228,11 @@ async function executeWebhookJobInternal( ? (workflowData.deploymentVersionId as string) : undefined - const wfRows = await db - .select({ workspaceId: workflowTable.workspaceId, variables: workflowTable.variables }) - .from(workflowTable) - .where(eq(workflowTable.id, payload.workflowId)) - .limit(1) - const workspaceId = wfRows[0]?.workspaceId - if (!workspaceId) { - throw new Error(`Workflow ${payload.workflowId} has no associated workspace`) - } - const workflowVariables = (wfRows[0]?.variables as Record) || {} - // Handle special Airtable case if (payload.provider === 'airtable') { logger.info(`[${requestId}] Processing Airtable webhook via fetchAndProcessAirtablePayloads`) - // Load the actual webhook record from database to get providerConfig - const [webhookRecord] = await db - .select() - .from(webhook) - .where(eq(webhook.id, payload.webhookId)) - .limit(1) - + const webhookRecord = webhookRows[0] if (!webhookRecord) { throw new Error(`Webhook record not found: ${payload.webhookId}`) } @@ -210,29 +243,20 @@ async function executeWebhookJobInternal( providerConfig: webhookRecord.providerConfig, } - // Create a mock workflow object for Airtable processing const mockWorkflow = { id: payload.workflowId, userId: payload.userId, } - // Get the processed Airtable input const airtableInput = await fetchAndProcessAirtablePayloads( webhookData, mockWorkflow, requestId ) - // If we got input (changes), execute the workflow like other providers if (airtableInput) { logger.info(`[${requestId}] Executing workflow with Airtable changes`) - // Get workflow for core execution - const workflow = await getWorkflowById(payload.workflowId) - if (!workflow) { - throw new Error(`Workflow ${payload.workflowId} not found`) - } - const metadata: ExecutionMetadata = { requestId, executionId, @@ -240,13 +264,13 @@ async function executeWebhookJobInternal( workspaceId, userId: payload.userId, sessionUserId: undefined, - workflowUserId: workflow.userId, + workflowUserId: workflowRecord.userId, triggerType: payload.provider || 'webhook', triggerBlockId: payload.blockId, useDraftState: false, startTime: new Date().toISOString(), isClientSession: false, - credentialAccountUserId: payload.credentialAccountUserId, + credentialAccountUserId, workflowStateOverride: { blocks, edges, @@ -258,7 +282,7 @@ async function executeWebhookJobInternal( const snapshot = new ExecutionSnapshot( metadata, - workflow, + workflowRecord, airtableInput, workflowVariables, [] @@ -329,7 +353,6 @@ async function executeWebhookJobInternal( // No changes to process logger.info(`[${requestId}] No Airtable changes to process`) - // Start logging session so the complete call has a log entry to update await loggingSession.safeStart({ userId: payload.userId, workspaceId, @@ -357,13 +380,6 @@ async function executeWebhookJobInternal( } // Format input for standard webhooks - // Load the actual webhook to get providerConfig (needed for Teams credentialId) - const webhookRows = await db - .select() - .from(webhook) - .where(eq(webhook.id, payload.webhookId)) - .limit(1) - const actualWebhook = webhookRows.length > 0 ? webhookRows[0] @@ -386,7 +402,6 @@ async function executeWebhookJobInternal( if (!input && payload.provider === 'whatsapp') { logger.info(`[${requestId}] No messages in WhatsApp payload, skipping execution`) - // Start logging session so the complete call has a log entry to update await loggingSession.safeStart({ userId: payload.userId, workspaceId, @@ -452,7 +467,6 @@ async function executeWebhookJobInternal( } } catch (error) { logger.error(`[${requestId}] Error processing trigger file outputs:`, error) - // Continue without processing attachments rather than failing execution } } @@ -499,18 +513,11 @@ async function executeWebhookJobInternal( } } catch (error) { logger.error(`[${requestId}] Error processing generic webhook files:`, error) - // Continue without processing files rather than failing execution } } logger.info(`[${requestId}] Executing workflow for ${payload.provider} webhook`) - // Get workflow for core execution - const workflow = await getWorkflowById(payload.workflowId) - if (!workflow) { - throw new Error(`Workflow ${payload.workflowId} not found`) - } - const metadata: ExecutionMetadata = { requestId, executionId, @@ -518,13 +525,13 @@ async function executeWebhookJobInternal( workspaceId, userId: payload.userId, sessionUserId: undefined, - workflowUserId: workflow.userId, + workflowUserId: workflowRecord.userId, triggerType: payload.provider || 'webhook', triggerBlockId: payload.blockId, useDraftState: false, startTime: new Date().toISOString(), isClientSession: false, - credentialAccountUserId: payload.credentialAccountUserId, + credentialAccountUserId, workflowStateOverride: { blocks, edges, @@ -536,7 +543,13 @@ async function executeWebhookJobInternal( const triggerInput = input || {} - const snapshot = new ExecutionSnapshot(metadata, workflow, triggerInput, workflowVariables, []) + const snapshot = new ExecutionSnapshot( + metadata, + workflowRecord, + triggerInput, + workflowVariables, + [] + ) const executionResult = await executeWorkflowCore({ snapshot, @@ -611,23 +624,9 @@ async function executeWebhookJobInternal( }) try { - const wfRow = await db - .select({ workspaceId: workflowTable.workspaceId }) - .from(workflowTable) - .where(eq(workflowTable.id, payload.workflowId)) - .limit(1) - const errorWorkspaceId = wfRow[0]?.workspaceId - - if (!errorWorkspaceId) { - logger.warn( - `[${requestId}] Cannot log error: workflow ${payload.workflowId} has no workspace` - ) - throw error - } - await loggingSession.safeStart({ userId: payload.userId, - workspaceId: errorWorkspaceId, + workspaceId, variables: {}, triggerData: { isTest: false, diff --git a/apps/sim/lib/webhooks/processor.ts b/apps/sim/lib/webhooks/processor.ts index 25a1c6ae2cd..5e3eaa49f6c 100644 --- a/apps/sim/lib/webhooks/processor.ts +++ b/apps/sim/lib/webhooks/processor.ts @@ -1,5 +1,5 @@ import { db, webhook, workflow, workflowDeploymentVersion } from '@sim/db' -import { account, credentialSet, subscription } from '@sim/db/schema' +import { credentialSet, subscription } from '@sim/db/schema' import { createLogger } from '@sim/logger' import { and, eq, isNull, or } from 'drizzle-orm' import { type NextRequest, NextResponse } from 'next/server' @@ -26,8 +26,6 @@ import { validateTypeformSignature, verifyProviderWebhook, } from '@/lib/webhooks/utils.server' -import { getWorkspaceBilledAccountUserId } from '@/lib/workspaces/utils' -import { resolveOAuthAccountId } from '@/app/api/auth/oauth/utils' import { executeWebhookJob } from '@/background/webhook-execution' import { resolveEnvVarReferences } from '@/executor/utils/reference-validation' import { isConfluencePayloadMatch } from '@/triggers/confluence/utils' @@ -41,6 +39,12 @@ export interface WebhookProcessorOptions { requestId: string path?: string webhookId?: string + actorUserId?: string +} + +export interface WebhookPreprocessingResult { + error: NextResponse | null + actorUserId?: string } function getExternalUrl(request: NextRequest): string { @@ -836,7 +840,7 @@ export async function checkWebhookPreprocessing( foundWorkflow: any, foundWebhook: any, requestId: string -): Promise { +): Promise { try { const executionId = uuidv4() @@ -849,6 +853,7 @@ export async function checkWebhookPreprocessing( checkRateLimit: true, checkDeployment: true, workspaceId: foundWorkflow.workspaceId, + workflowRecord: foundWorkflow, }) if (!preprocessResult.success) { @@ -860,33 +865,39 @@ export async function checkWebhookPreprocessing( }) if (foundWebhook.provider === 'microsoft-teams') { - return NextResponse.json( - { - type: 'message', - text: error.message, - }, - { status: error.statusCode } - ) + return { + error: NextResponse.json( + { + type: 'message', + text: error.message, + }, + { status: error.statusCode } + ), + } } - return NextResponse.json({ error: error.message }, { status: error.statusCode }) + return { error: NextResponse.json({ error: error.message }, { status: error.statusCode }) } } - return null + return { error: null, actorUserId: preprocessResult.actorUserId } } catch (preprocessError) { logger.error(`[${requestId}] Error during webhook preprocessing:`, preprocessError) if (foundWebhook.provider === 'microsoft-teams') { - return NextResponse.json( - { - type: 'message', - text: 'Internal error during preprocessing', - }, - { status: 500 } - ) + return { + error: NextResponse.json( + { + type: 'message', + text: 'Internal error during preprocessing', + }, + { status: 500 } + ), + } } - return NextResponse.json({ error: 'Internal error during preprocessing' }, { status: 500 }) + return { + error: NextResponse.json({ error: 'Internal error during preprocessing' }, { status: 500 }), + } } } @@ -1060,22 +1071,7 @@ export async function queueWebhookExecution( // Note: Each webhook now has its own credentialId (credential sets are fanned out at save time) const providerConfig = (foundWebhook.providerConfig as Record) || {} const credentialId = providerConfig.credentialId as string | undefined - let credentialAccountUserId: string | undefined - if (credentialId) { - const resolved = await resolveOAuthAccountId(credentialId) - if (!resolved) { - logger.error( - `[${options.requestId}] Failed to resolve OAuth account for credential ${credentialId}` - ) - return formatProviderErrorResponse(foundWebhook, 'Failed to resolve credential', 500) - } - const [credentialRecord] = await db - .select({ userId: account.userId }) - .from(account) - .where(eq(account.id, resolved.accountId)) - .limit(1) - credentialAccountUserId = credentialRecord?.userId - } + // credentialSetId is a direct field on webhook table, not in providerConfig const credentialSetId = foundWebhook.credentialSetId as string | undefined @@ -1090,16 +1086,9 @@ export async function queueWebhookExecution( } } - if (!foundWorkflow.workspaceId) { - logger.error(`[${options.requestId}] Workflow ${foundWorkflow.id} has no workspaceId`) - return NextResponse.json({ error: 'Workflow has no associated workspace' }, { status: 500 }) - } - - const actorUserId = await getWorkspaceBilledAccountUserId(foundWorkflow.workspaceId) + const actorUserId = options.actorUserId if (!actorUserId) { - logger.error( - `[${options.requestId}] No billing account for workspace ${foundWorkflow.workspaceId}` - ) + logger.error(`[${options.requestId}] No actorUserId provided for webhook ${foundWebhook.id}`) return NextResponse.json({ error: 'Unable to resolve billing account' }, { status: 500 }) } @@ -1112,8 +1101,8 @@ export async function queueWebhookExecution( headers, path: options.path || foundWebhook.path, blockId: foundWebhook.blockId, + workspaceId: foundWorkflow.workspaceId, ...(credentialId ? { credentialId } : {}), - ...(credentialAccountUserId ? { credentialAccountUserId } : {}), } const jobQueue = await getJobQueue()