diff --git a/apps/sim/app/api/webhooks/route.ts b/apps/sim/app/api/webhooks/route.ts index 4d5508a1256..86b5962a772 100644 --- a/apps/sim/app/api/webhooks/route.ts +++ b/apps/sim/app/api/webhooks/route.ts @@ -367,9 +367,7 @@ export async function POST(request: NextRequest) { ) } - // Configure each new webhook (for providers that need configuration) - const pollingProviders = ['gmail', 'outlook'] - const needsConfiguration = pollingProviders.includes(provider) + const needsConfiguration = provider === 'gmail' || provider === 'outlook' if (needsConfiguration) { const configureFunc = diff --git a/apps/sim/blocks/blocks/generic_webhook.ts b/apps/sim/blocks/blocks/generic_webhook.ts index 97ed9c8ec43..08cd69108c2 100644 --- a/apps/sim/blocks/blocks/generic_webhook.ts +++ b/apps/sim/blocks/blocks/generic_webhook.ts @@ -18,6 +18,7 @@ export const GenericWebhookBlock: BlockConfig = { bestPractices: ` - You can test the webhook by sending a request to the webhook URL. E.g. depending on authorization: curl -X POST http://localhost:3000/api/webhooks/trigger/d8abcf0d-1ee5-4b77-bb07-b1e8142ea4e9 -H "Content-Type: application/json" -H "X-Sim-Secret: 1234" -d '{"message": "Test webhook trigger", "data": {"key": "v"}}' - Continuing example above, the body can be accessed in downstream block using dot notation. E.g. and + - To deduplicate incoming events, set the Deduplication Field to a dot-notation path of a unique field in the payload (e.g. "event.id"). Duplicate values within 7 days will be skipped. - Only use when there's no existing integration for the service with triggerAllowed flag set to true. `, subBlocks: [...getTrigger('generic_webhook').subBlocks], diff --git a/apps/sim/executor/handlers/trigger/trigger-handler.ts b/apps/sim/executor/handlers/trigger/trigger-handler.ts index e8d14f8a730..f31aed7371a 100644 --- a/apps/sim/executor/handlers/trigger/trigger-handler.ts +++ b/apps/sim/executor/handlers/trigger/trigger-handler.ts @@ -22,7 +22,7 @@ export class TriggerBlockHandler implements BlockHandler { } const existingState = ctx.blockStates.get(block.id) - if (existingState?.output && Object.keys(existingState.output).length > 0) { + if (existingState?.output) { return existingState.output } diff --git a/apps/sim/lib/core/async-jobs/config.ts b/apps/sim/lib/core/async-jobs/config.ts index 6d5e020eb84..0537a6a8ef9 100644 --- a/apps/sim/lib/core/async-jobs/config.ts +++ b/apps/sim/lib/core/async-jobs/config.ts @@ -7,6 +7,7 @@ const logger = createLogger('AsyncJobsConfig') let cachedBackend: JobQueueBackend | null = null let cachedBackendType: AsyncBackendType | null = null +let cachedInlineBackend: JobQueueBackend | null = null /** * Determines which async backend to use based on environment configuration. @@ -71,6 +72,31 @@ export function getCurrentBackendType(): AsyncBackendType | null { return cachedBackendType } +/** + * Gets a job queue backend that bypasses Trigger.dev (Redis -> Database). + * Used for non-polling webhooks that should always execute inline. + */ +export async function getInlineJobQueue(): Promise { + if (cachedInlineBackend) { + return cachedInlineBackend + } + + const redis = getRedisClient() + let type: string + if (redis) { + const { RedisJobQueue } = await import('@/lib/core/async-jobs/backends/redis') + cachedInlineBackend = new RedisJobQueue(redis) + type = 'redis' + } else { + const { DatabaseJobQueue } = await import('@/lib/core/async-jobs/backends/database') + cachedInlineBackend = new DatabaseJobQueue() + type = 'database' + } + + logger.info(`Inline job backend initialized: ${type}`) + return cachedInlineBackend +} + /** * Checks if jobs should be executed inline (fire-and-forget). * For Redis/DB backends, we execute inline. Trigger.dev handles execution itself. @@ -85,4 +111,5 @@ export function shouldExecuteInline(): boolean { export function resetJobQueueCache(): void { cachedBackend = null cachedBackendType = null + cachedInlineBackend = null } diff --git a/apps/sim/lib/core/async-jobs/index.ts b/apps/sim/lib/core/async-jobs/index.ts index 33bb6883029..24e6f1e526f 100644 --- a/apps/sim/lib/core/async-jobs/index.ts +++ b/apps/sim/lib/core/async-jobs/index.ts @@ -1,6 +1,7 @@ export { getAsyncBackendType, getCurrentBackendType, + getInlineJobQueue, getJobQueue, resetJobQueueCache, shouldExecuteInline, diff --git a/apps/sim/lib/core/idempotency/service.ts b/apps/sim/lib/core/idempotency/service.ts index b8fae55c03a..9582c5ba22a 100644 --- a/apps/sim/lib/core/idempotency/service.ts +++ b/apps/sim/lib/core/idempotency/service.ts @@ -413,6 +413,7 @@ export class IdempotencyService { : undefined const webhookIdHeader = + normalizedHeaders?.['x-sim-idempotency-key'] || normalizedHeaders?.['webhook-id'] || normalizedHeaders?.['x-webhook-id'] || normalizedHeaders?.['x-shopify-webhook-id'] || diff --git a/apps/sim/lib/webhooks/processor.ts b/apps/sim/lib/webhooks/processor.ts index 5e3eaa49f6c..df45c1f981e 100644 --- a/apps/sim/lib/webhooks/processor.ts +++ b/apps/sim/lib/webhooks/processor.ts @@ -5,7 +5,7 @@ import { and, eq, isNull, or } from 'drizzle-orm' import { type NextRequest, NextResponse } from 'next/server' import { v4 as uuidv4 } from 'uuid' import { checkEnterprisePlan, checkTeamPlan } from '@/lib/billing/subscriptions/utils' -import { getJobQueue, shouldExecuteInline } from '@/lib/core/async-jobs' +import { getInlineJobQueue, getJobQueue, shouldExecuteInline } from '@/lib/core/async-jobs' import { isProd } from '@/lib/core/config/feature-flags' import { safeCompare } from '@/lib/core/security/encryption' import { getEffectiveDecryptedEnv } from '@/lib/environment/utils' @@ -29,6 +29,7 @@ import { import { executeWebhookJob } from '@/background/webhook-execution' import { resolveEnvVarReferences } from '@/executor/utils/reference-validation' import { isConfluencePayloadMatch } from '@/triggers/confluence/utils' +import { isPollingWebhookProvider } from '@/triggers/constants' import { isGitHubEventMatch } from '@/triggers/github/utils' import { isHubSpotContactEventMatch } from '@/triggers/hubspot/utils' import { isJiraEventMatch } from '@/triggers/jira/utils' @@ -1049,7 +1050,7 @@ export async function queueWebhookExecution( } } - const headers = Object.fromEntries(request.headers.entries()) + const { 'x-sim-idempotency-key': _, ...headers } = Object.fromEntries(request.headers.entries()) // For Microsoft Teams Graph notifications, extract unique identifiers for idempotency if ( @@ -1067,9 +1068,20 @@ export async function queueWebhookExecution( } } - // Extract credentialId from webhook config - // Note: Each webhook now has its own credentialId (credential sets are fanned out at save time) const providerConfig = (foundWebhook.providerConfig as Record) || {} + + if (foundWebhook.provider === 'generic') { + const idempotencyField = providerConfig.idempotencyField as string | undefined + if (idempotencyField && body) { + const value = idempotencyField + .split('.') + .reduce((acc: any, key: string) => acc?.[key], body) + if (value !== undefined && value !== null && typeof value !== 'object') { + headers['x-sim-idempotency-key'] = String(value) + } + } + } + const credentialId = providerConfig.credentialId as string | undefined // credentialSetId is a direct field on webhook table, not in providerConfig @@ -1105,15 +1117,24 @@ export async function queueWebhookExecution( ...(credentialId ? { credentialId } : {}), } - const jobQueue = await getJobQueue() - const jobId = await jobQueue.enqueue('webhook-execution', payload, { - metadata: { workflowId: foundWorkflow.id, userId: actorUserId }, - }) - logger.info( - `[${options.requestId}] Queued webhook execution task ${jobId} for ${foundWebhook.provider} webhook` - ) + const isPolling = isPollingWebhookProvider(payload.provider) - if (shouldExecuteInline()) { + if (isPolling && !shouldExecuteInline()) { + const jobQueue = await getJobQueue() + const jobId = await jobQueue.enqueue('webhook-execution', payload, { + metadata: { workflowId: foundWorkflow.id, userId: actorUserId }, + }) + logger.info( + `[${options.requestId}] Queued polling webhook execution task ${jobId} for ${foundWebhook.provider} webhook via job queue` + ) + } else { + const jobQueue = await getInlineJobQueue() + const jobId = await jobQueue.enqueue('webhook-execution', payload, { + metadata: { workflowId: foundWorkflow.id, userId: actorUserId }, + }) + logger.info( + `[${options.requestId}] Executing ${foundWebhook.provider} webhook ${jobId} inline` + ) void (async () => { try { await jobQueue.startJob(jobId) @@ -1193,6 +1214,26 @@ export async function queueWebhookExecution( }) } + if (foundWebhook.provider === 'generic' && providerConfig.responseMode === 'custom') { + const rawCode = Number(providerConfig.responseStatusCode) || 200 + const statusCode = rawCode >= 100 && rawCode <= 599 ? rawCode : 200 + const responseBody = (providerConfig.responseBody as string | undefined)?.trim() + + if (!responseBody) { + return new NextResponse(null, { status: statusCode }) + } + + try { + const parsed = JSON.parse(responseBody) + return NextResponse.json(parsed, { status: statusCode }) + } catch { + return new NextResponse(responseBody, { + status: statusCode, + headers: { 'Content-Type': 'text/plain' }, + }) + } + } + return NextResponse.json({ message: 'Webhook processed' }) } catch (error: any) { logger.error(`[${options.requestId}] Failed to queue webhook execution:`, error) diff --git a/apps/sim/lib/webhooks/utils.server.ts b/apps/sim/lib/webhooks/utils.server.ts index 8c8e15381ec..76068e451fe 100644 --- a/apps/sim/lib/webhooks/utils.server.ts +++ b/apps/sim/lib/webhooks/utils.server.ts @@ -19,6 +19,7 @@ import { refreshAccessTokenIfNeeded, resolveOAuthAccountId, } from '@/app/api/auth/oauth/utils' +import { isPollingWebhookProvider } from '@/triggers/constants' const logger = createLogger('WebhookUtils') @@ -2222,10 +2223,7 @@ export async function syncWebhooksForCredentialSet(params: { `[${requestId}] Syncing webhooks for credential set ${credentialSetId}, provider ${provider}` ) - // Polling providers get unique paths per credential (for independent state) - // External webhook providers share the same path (external service sends to one URL) - const pollingProviders = ['gmail', 'outlook', 'rss', 'imap'] - const useUniquePaths = pollingProviders.includes(provider) + const useUniquePaths = isPollingWebhookProvider(provider) const credentials = await getCredentialsForCredentialSet(credentialSetId, oauthProviderId) diff --git a/apps/sim/lib/workflows/comparison/compare.test.ts b/apps/sim/lib/workflows/comparison/compare.test.ts index 5fe6e5923a3..f3a139ac915 100644 --- a/apps/sim/lib/workflows/comparison/compare.test.ts +++ b/apps/sim/lib/workflows/comparison/compare.test.ts @@ -433,7 +433,7 @@ describe('hasWorkflowChanged', () => { expect(hasWorkflowChanged(state1, state2)).toBe(true) }) - it.concurrent('should detect subBlock type changes', () => { + it.concurrent('should ignore subBlock type changes', () => { const state1 = createWorkflowState({ blocks: { block1: createBlock('block1', { @@ -448,7 +448,7 @@ describe('hasWorkflowChanged', () => { }), }, }) - expect(hasWorkflowChanged(state1, state2)).toBe(true) + expect(hasWorkflowChanged(state1, state2)).toBe(false) }) it.concurrent('should handle null/undefined subBlock values consistently', () => { diff --git a/apps/sim/lib/workflows/comparison/normalize.ts b/apps/sim/lib/workflows/comparison/normalize.ts index 70a584141d0..741208e62ed 100644 --- a/apps/sim/lib/workflows/comparison/normalize.ts +++ b/apps/sim/lib/workflows/comparison/normalize.ts @@ -496,7 +496,14 @@ export function normalizeSubBlockValue(subBlockId: string, value: unknown): unkn * @returns SubBlock fields excluding value and is_diff */ export function extractSubBlockRest(subBlock: Record): Record { - const { value: _v, is_diff: _sd, ...rest } = subBlock as SubBlockWithDiffMarker + const { + value: _v, + is_diff: _sd, + type: _type, + ...rest + } = subBlock as SubBlockWithDiffMarker & { + type?: unknown + } return rest } diff --git a/apps/sim/tools/gmail/utils.test.ts b/apps/sim/tools/gmail/utils.test.ts new file mode 100644 index 00000000000..5c28ac9a139 --- /dev/null +++ b/apps/sim/tools/gmail/utils.test.ts @@ -0,0 +1,36 @@ +/** + * @vitest-environment node + */ +import { describe, expect, it } from 'vitest' +import { encodeRfc2047 } from './utils' + +describe('encodeRfc2047', () => { + it('returns ASCII text unchanged', () => { + expect(encodeRfc2047('Simple ASCII Subject')).toBe('Simple ASCII Subject') + }) + + it('returns empty string unchanged', () => { + expect(encodeRfc2047('')).toBe('') + }) + + it('encodes emojis as RFC 2047 base64', () => { + const result = encodeRfc2047('Time to Stretch! 🧘') + expect(result).toBe('=?UTF-8?B?VGltZSB0byBTdHJldGNoISDwn6eY?=') + }) + + it('round-trips non-ASCII subjects correctly', () => { + const subjects = ['Hello 世界', 'Café résumé', '🎉🎊🎈 Party!', '今週のミーティング'] + for (const subject of subjects) { + const encoded = encodeRfc2047(subject) + const match = encoded.match(/^=\?UTF-8\?B\?(.+)\?=$/) + expect(match).not.toBeNull() + const decoded = Buffer.from(match![1], 'base64').toString('utf-8') + expect(decoded).toBe(subject) + } + }) + + it('does not double-encode already-encoded subjects', () => { + const alreadyEncoded = '=?UTF-8?B?VGltZSB0byBTdHJldGNoISDwn6eY?=' + expect(encodeRfc2047(alreadyEncoded)).toBe(alreadyEncoded) + }) +}) diff --git a/apps/sim/tools/gmail/utils.ts b/apps/sim/tools/gmail/utils.ts index 4d856db1d5e..f374a4e00f5 100644 --- a/apps/sim/tools/gmail/utils.ts +++ b/apps/sim/tools/gmail/utils.ts @@ -294,6 +294,19 @@ function generateBoundary(): string { return `----=_Part_${Date.now()}_${Math.random().toString(36).substring(2, 15)}` } +/** + * Encode a header value using RFC 2047 Base64 encoding if it contains non-ASCII characters. + * This matches Google's own Gmail API sample: `=?utf-8?B?${Buffer.from(subject).toString('base64')}?=` + * @see https://github.com/googleapis/google-api-nodejs-client/blob/main/samples/gmail/send.js + */ +export function encodeRfc2047(value: string): string { + // eslint-disable-next-line no-control-regex + if (/^[\x00-\x7F]*$/.test(value)) { + return value + } + return `=?UTF-8?B?${Buffer.from(value, 'utf-8').toString('base64')}?=` +} + /** * Encode string or buffer to base64url format (URL-safe base64) * Gmail API requires base64url encoding for the raw message field @@ -333,7 +346,7 @@ export function buildSimpleEmailMessage(params: { emailHeaders.push(`Bcc: ${bcc}`) } - emailHeaders.push(`Subject: ${subject || ''}`) + emailHeaders.push(`Subject: ${encodeRfc2047(subject || '')}`) if (inReplyTo) { emailHeaders.push(`In-Reply-To: ${inReplyTo}`) @@ -380,7 +393,7 @@ export function buildMimeMessage(params: BuildMimeMessageParams): string { if (bcc) { messageParts.push(`Bcc: ${bcc}`) } - messageParts.push(`Subject: ${subject || ''}`) + messageParts.push(`Subject: ${encodeRfc2047(subject || '')}`) if (inReplyTo) { messageParts.push(`In-Reply-To: ${inReplyTo}`) diff --git a/apps/sim/triggers/constants.test.ts b/apps/sim/triggers/constants.test.ts new file mode 100644 index 00000000000..6de36a36a3c --- /dev/null +++ b/apps/sim/triggers/constants.test.ts @@ -0,0 +1,41 @@ +/** + * @vitest-environment node + */ +import { describe, expect, it } from 'vitest' +import { POLLING_PROVIDERS } from '@/triggers/constants' +import { TRIGGER_REGISTRY } from '@/triggers/registry' + +describe('POLLING_PROVIDERS sync with TriggerConfig.polling', () => { + it('matches every trigger with polling: true in the registry', () => { + const registryPollingProviders = new Set( + Object.values(TRIGGER_REGISTRY) + .filter((t) => t.polling === true) + .map((t) => t.provider) + ) + + expect(POLLING_PROVIDERS).toEqual(registryPollingProviders) + }) + + it('no trigger with polling: true is missing from POLLING_PROVIDERS', () => { + const missing: string[] = [] + for (const trigger of Object.values(TRIGGER_REGISTRY)) { + if (trigger.polling && !POLLING_PROVIDERS.has(trigger.provider)) { + missing.push(`${trigger.id} (provider: ${trigger.provider})`) + } + } + expect(missing, `Triggers with polling: true missing from POLLING_PROVIDERS`).toEqual([]) + }) + + it('no POLLING_PROVIDERS entry lacks a polling: true trigger in the registry', () => { + const extra: string[] = [] + for (const provider of POLLING_PROVIDERS) { + const hasTrigger = Object.values(TRIGGER_REGISTRY).some( + (t) => t.provider === provider && t.polling === true + ) + if (!hasTrigger) { + extra.push(provider) + } + } + expect(extra, `POLLING_PROVIDERS entries with no matching polling trigger`).toEqual([]) + }) +}) diff --git a/apps/sim/triggers/constants.ts b/apps/sim/triggers/constants.ts index d7fcdc997b3..feff397f4cf 100644 --- a/apps/sim/triggers/constants.ts +++ b/apps/sim/triggers/constants.ts @@ -35,3 +35,15 @@ export const TRIGGER_RUNTIME_SUBBLOCK_IDS: string[] = [ * This prevents runaway errors from continuously executing failing workflows. */ export const MAX_CONSECUTIVE_FAILURES = 100 + +/** + * Set of webhook provider names that use polling-based triggers. + * Mirrors the `polling: true` flag on TriggerConfig entries. + * Used to route execution: polling providers use the full job queue + * (Trigger.dev), non-polling providers execute inline. + */ +export const POLLING_PROVIDERS = new Set(['gmail', 'outlook', 'rss', 'imap']) + +export function isPollingWebhookProvider(provider: string): boolean { + return POLLING_PROVIDERS.has(provider) +} diff --git a/apps/sim/triggers/generic/webhook.ts b/apps/sim/triggers/generic/webhook.ts index a91c8857dee..295c58a7396 100644 --- a/apps/sim/triggers/generic/webhook.ts +++ b/apps/sim/triggers/generic/webhook.ts @@ -49,6 +49,49 @@ export const genericWebhookTrigger: TriggerConfig = { required: false, mode: 'trigger', }, + { + id: 'idempotencyField', + title: 'Deduplication Field (Optional)', + type: 'short-input', + placeholder: 'e.g. event.id', + description: + 'Dot-notation path to a unique field in the payload for deduplication. If the same value is seen within 7 days, the duplicate webhook will be skipped.', + required: false, + mode: 'trigger', + }, + { + id: 'responseMode', + title: 'Acknowledgement', + type: 'dropdown', + options: [ + { label: 'Default', id: 'default' }, + { label: 'Custom', id: 'custom' }, + ], + defaultValue: 'default', + mode: 'trigger', + }, + { + id: 'responseStatusCode', + title: 'Response Status Code', + type: 'short-input', + placeholder: '200 (default)', + description: + 'HTTP status code (100–599) to return to the webhook caller. Defaults to 200 if empty or invalid.', + required: false, + mode: 'trigger', + condition: { field: 'responseMode', value: 'custom' }, + }, + { + id: 'responseBody', + title: 'Response Body', + type: 'code', + language: 'json', + placeholder: '{"ok": true}', + description: 'JSON body to return to the webhook caller. Leave empty for no body.', + required: false, + mode: 'trigger', + condition: { field: 'responseMode', value: 'custom' }, + }, { id: 'inputFormat', title: 'Input Format', @@ -76,7 +119,7 @@ export const genericWebhookTrigger: TriggerConfig = { 'The webhook will receive any HTTP method (GET, POST, PUT, DELETE, etc.).', 'All request data (headers, body, query parameters) will be available in your workflow.', 'If authentication is enabled, include the token in requests using either the custom header or "Authorization: Bearer TOKEN".', - 'Common fields like "event", "id", and "data" will be automatically extracted from the payload when available.', + 'To deduplicate incoming events, set the Deduplication Field to the dot-notation path of a unique identifier in the payload (e.g. "event.id"). Duplicate values within 7 days will be skipped.', ] .map( (instruction, index) => diff --git a/apps/sim/triggers/gmail/poller.ts b/apps/sim/triggers/gmail/poller.ts index ee8a8c94718..ada550c5f34 100644 --- a/apps/sim/triggers/gmail/poller.ts +++ b/apps/sim/triggers/gmail/poller.ts @@ -30,6 +30,7 @@ export const gmailPollingTrigger: TriggerConfig = { description: 'Triggers when new emails are received in Gmail (requires Gmail credentials)', version: '1.0.0', icon: GmailIcon, + polling: true, subBlocks: [ { diff --git a/apps/sim/triggers/imap/poller.ts b/apps/sim/triggers/imap/poller.ts index cfcc5c5d724..b7a8063e1ab 100644 --- a/apps/sim/triggers/imap/poller.ts +++ b/apps/sim/triggers/imap/poller.ts @@ -12,6 +12,7 @@ export const imapPollingTrigger: TriggerConfig = { description: 'Triggers when new emails are received via IMAP (works with any email provider)', version: '1.0.0', icon: MailServerIcon, + polling: true, subBlocks: [ // Connection settings diff --git a/apps/sim/triggers/outlook/poller.ts b/apps/sim/triggers/outlook/poller.ts index 9f3d9b09b17..bd22d2d13bc 100644 --- a/apps/sim/triggers/outlook/poller.ts +++ b/apps/sim/triggers/outlook/poller.ts @@ -24,6 +24,7 @@ export const outlookPollingTrigger: TriggerConfig = { description: 'Triggers when new emails are received in Outlook (requires Microsoft credentials)', version: '1.0.0', icon: OutlookIcon, + polling: true, subBlocks: [ { diff --git a/apps/sim/triggers/rss/poller.ts b/apps/sim/triggers/rss/poller.ts index 8d295d47580..0877ee06356 100644 --- a/apps/sim/triggers/rss/poller.ts +++ b/apps/sim/triggers/rss/poller.ts @@ -8,6 +8,7 @@ export const rssPollingTrigger: TriggerConfig = { description: 'Triggers when new items are published to an RSS feed', version: '1.0.0', icon: RssIcon, + polling: true, subBlocks: [ { diff --git a/apps/sim/triggers/types.ts b/apps/sim/triggers/types.ts index 3696c4597b2..69e5a5d2fbd 100644 --- a/apps/sim/triggers/types.ts +++ b/apps/sim/triggers/types.ts @@ -25,6 +25,9 @@ export interface TriggerConfig { method?: 'POST' | 'GET' | 'PUT' | 'DELETE' headers?: Record } + + /** When true, this trigger is poll-based (cron-driven) rather than push-based. */ + polling?: boolean } export interface TriggerRegistry {