Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions apps/sim/app/api/webhooks/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -367,9 +367,7 @@ export async function POST(request: NextRequest) {
)
}

// Configure each new webhook (for providers that need configuration)
const pollingProviders = ['gmail', 'outlook']
const needsConfiguration = pollingProviders.includes(provider)
const needsConfiguration = provider === 'gmail' || provider === 'outlook'

if (needsConfiguration) {
const configureFunc =
Expand Down
1 change: 1 addition & 0 deletions apps/sim/blocks/blocks/generic_webhook.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ export const GenericWebhookBlock: BlockConfig = {
bestPractices: `
- You can test the webhook by sending a request to the webhook URL. E.g. depending on authorization: curl -X POST http://localhost:3000/api/webhooks/trigger/d8abcf0d-1ee5-4b77-bb07-b1e8142ea4e9 -H "Content-Type: application/json" -H "X-Sim-Secret: 1234" -d '{"message": "Test webhook trigger", "data": {"key": "v"}}'
- Continuing example above, the body can be accessed in downstream block using dot notation. E.g. <webhook1.message> and <webhook1.data.key>
- To deduplicate incoming events, set the Deduplication Field to a dot-notation path of a unique field in the payload (e.g. "event.id"). Duplicate values within 7 days will be skipped.
- Only use when there's no existing integration for the service with triggerAllowed flag set to true.
`,
subBlocks: [...getTrigger('generic_webhook').subBlocks],
Expand Down
2 changes: 1 addition & 1 deletion apps/sim/executor/handlers/trigger/trigger-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ export class TriggerBlockHandler implements BlockHandler {
}

const existingState = ctx.blockStates.get(block.id)
if (existingState?.output && Object.keys(existingState.output).length > 0) {
if (existingState?.output) {
return existingState.output
}

Expand Down
27 changes: 27 additions & 0 deletions apps/sim/lib/core/async-jobs/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ const logger = createLogger('AsyncJobsConfig')

let cachedBackend: JobQueueBackend | null = null
let cachedBackendType: AsyncBackendType | null = null
let cachedInlineBackend: JobQueueBackend | null = null

/**
* Determines which async backend to use based on environment configuration.
Expand Down Expand Up @@ -71,6 +72,31 @@ export function getCurrentBackendType(): AsyncBackendType | null {
return cachedBackendType
}

/**
* Gets a job queue backend that bypasses Trigger.dev (Redis -> Database).
* Used for non-polling webhooks that should always execute inline.
*/
export async function getInlineJobQueue(): Promise<JobQueueBackend> {
if (cachedInlineBackend) {
return cachedInlineBackend
}

const redis = getRedisClient()
let type: string
if (redis) {
const { RedisJobQueue } = await import('@/lib/core/async-jobs/backends/redis')
cachedInlineBackend = new RedisJobQueue(redis)
type = 'redis'
} else {
const { DatabaseJobQueue } = await import('@/lib/core/async-jobs/backends/database')
cachedInlineBackend = new DatabaseJobQueue()
type = 'database'
}

logger.info(`Inline job backend initialized: ${type}`)
return cachedInlineBackend
}

/**
* Checks if jobs should be executed inline (fire-and-forget).
* For Redis/DB backends, we execute inline. Trigger.dev handles execution itself.
Expand All @@ -85,4 +111,5 @@ export function shouldExecuteInline(): boolean {
export function resetJobQueueCache(): void {
cachedBackend = null
cachedBackendType = null
cachedInlineBackend = null
}
1 change: 1 addition & 0 deletions apps/sim/lib/core/async-jobs/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
export {
getAsyncBackendType,
getCurrentBackendType,
getInlineJobQueue,
getJobQueue,
resetJobQueueCache,
shouldExecuteInline,
Expand Down
1 change: 1 addition & 0 deletions apps/sim/lib/core/idempotency/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,7 @@ export class IdempotencyService {
: undefined

const webhookIdHeader =
normalizedHeaders?.['x-sim-idempotency-key'] ||
normalizedHeaders?.['webhook-id'] ||
normalizedHeaders?.['x-webhook-id'] ||
normalizedHeaders?.['x-shopify-webhook-id'] ||
Expand Down
65 changes: 53 additions & 12 deletions apps/sim/lib/webhooks/processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { and, eq, isNull, or } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { v4 as uuidv4 } from 'uuid'
import { checkEnterprisePlan, checkTeamPlan } from '@/lib/billing/subscriptions/utils'
import { getJobQueue, shouldExecuteInline } from '@/lib/core/async-jobs'
import { getInlineJobQueue, getJobQueue, shouldExecuteInline } from '@/lib/core/async-jobs'
import { isProd } from '@/lib/core/config/feature-flags'
import { safeCompare } from '@/lib/core/security/encryption'
import { getEffectiveDecryptedEnv } from '@/lib/environment/utils'
Expand All @@ -29,6 +29,7 @@ import {
import { executeWebhookJob } from '@/background/webhook-execution'
import { resolveEnvVarReferences } from '@/executor/utils/reference-validation'
import { isConfluencePayloadMatch } from '@/triggers/confluence/utils'
import { isPollingWebhookProvider } from '@/triggers/constants'
import { isGitHubEventMatch } from '@/triggers/github/utils'
import { isHubSpotContactEventMatch } from '@/triggers/hubspot/utils'
import { isJiraEventMatch } from '@/triggers/jira/utils'
Expand Down Expand Up @@ -1049,7 +1050,7 @@ export async function queueWebhookExecution(
}
}

const headers = Object.fromEntries(request.headers.entries())
const { 'x-sim-idempotency-key': _, ...headers } = Object.fromEntries(request.headers.entries())

// For Microsoft Teams Graph notifications, extract unique identifiers for idempotency
if (
Expand All @@ -1067,9 +1068,20 @@ export async function queueWebhookExecution(
}
}

// Extract credentialId from webhook config
// Note: Each webhook now has its own credentialId (credential sets are fanned out at save time)
const providerConfig = (foundWebhook.providerConfig as Record<string, any>) || {}

if (foundWebhook.provider === 'generic') {
const idempotencyField = providerConfig.idempotencyField as string | undefined
if (idempotencyField && body) {
const value = idempotencyField
.split('.')
.reduce((acc: any, key: string) => acc?.[key], body)
if (value !== undefined && value !== null && typeof value !== 'object') {
headers['x-sim-idempotency-key'] = String(value)
}
}
}

const credentialId = providerConfig.credentialId as string | undefined

// credentialSetId is a direct field on webhook table, not in providerConfig
Expand Down Expand Up @@ -1105,15 +1117,24 @@ export async function queueWebhookExecution(
...(credentialId ? { credentialId } : {}),
}

const jobQueue = await getJobQueue()
const jobId = await jobQueue.enqueue('webhook-execution', payload, {
metadata: { workflowId: foundWorkflow.id, userId: actorUserId },
})
logger.info(
`[${options.requestId}] Queued webhook execution task ${jobId} for ${foundWebhook.provider} webhook`
)
const isPolling = isPollingWebhookProvider(payload.provider)

if (shouldExecuteInline()) {
if (isPolling && !shouldExecuteInline()) {
const jobQueue = await getJobQueue()
const jobId = await jobQueue.enqueue('webhook-execution', payload, {
metadata: { workflowId: foundWorkflow.id, userId: actorUserId },
})
logger.info(
`[${options.requestId}] Queued polling webhook execution task ${jobId} for ${foundWebhook.provider} webhook via job queue`
)
} else {
const jobQueue = await getInlineJobQueue()
const jobId = await jobQueue.enqueue('webhook-execution', payload, {
metadata: { workflowId: foundWorkflow.id, userId: actorUserId },
})
logger.info(
`[${options.requestId}] Executing ${foundWebhook.provider} webhook ${jobId} inline`
)
void (async () => {
try {
await jobQueue.startJob(jobId)
Expand Down Expand Up @@ -1193,6 +1214,26 @@ export async function queueWebhookExecution(
})
}

if (foundWebhook.provider === 'generic' && providerConfig.responseMode === 'custom') {
const rawCode = Number(providerConfig.responseStatusCode) || 200
const statusCode = rawCode >= 100 && rawCode <= 599 ? rawCode : 200
const responseBody = (providerConfig.responseBody as string | undefined)?.trim()

if (!responseBody) {
return new NextResponse(null, { status: statusCode })
}

try {
const parsed = JSON.parse(responseBody)
return NextResponse.json(parsed, { status: statusCode })
} catch {
return new NextResponse(responseBody, {
status: statusCode,
headers: { 'Content-Type': 'text/plain' },
})
}
}

return NextResponse.json({ message: 'Webhook processed' })
} catch (error: any) {
logger.error(`[${options.requestId}] Failed to queue webhook execution:`, error)
Expand Down
6 changes: 2 additions & 4 deletions apps/sim/lib/webhooks/utils.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import {
refreshAccessTokenIfNeeded,
resolveOAuthAccountId,
} from '@/app/api/auth/oauth/utils'
import { isPollingWebhookProvider } from '@/triggers/constants'

const logger = createLogger('WebhookUtils')

Expand Down Expand Up @@ -2222,10 +2223,7 @@ export async function syncWebhooksForCredentialSet(params: {
`[${requestId}] Syncing webhooks for credential set ${credentialSetId}, provider ${provider}`
)

// Polling providers get unique paths per credential (for independent state)
// External webhook providers share the same path (external service sends to one URL)
const pollingProviders = ['gmail', 'outlook', 'rss', 'imap']
const useUniquePaths = pollingProviders.includes(provider)
const useUniquePaths = isPollingWebhookProvider(provider)

const credentials = await getCredentialsForCredentialSet(credentialSetId, oauthProviderId)

Expand Down
4 changes: 2 additions & 2 deletions apps/sim/lib/workflows/comparison/compare.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ describe('hasWorkflowChanged', () => {
expect(hasWorkflowChanged(state1, state2)).toBe(true)
})

it.concurrent('should detect subBlock type changes', () => {
it.concurrent('should ignore subBlock type changes', () => {
const state1 = createWorkflowState({
blocks: {
block1: createBlock('block1', {
Expand All @@ -448,7 +448,7 @@ describe('hasWorkflowChanged', () => {
}),
},
})
expect(hasWorkflowChanged(state1, state2)).toBe(true)
expect(hasWorkflowChanged(state1, state2)).toBe(false)
})

it.concurrent('should handle null/undefined subBlock values consistently', () => {
Expand Down
9 changes: 8 additions & 1 deletion apps/sim/lib/workflows/comparison/normalize.ts
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,14 @@ export function normalizeSubBlockValue(subBlockId: string, value: unknown): unkn
* @returns SubBlock fields excluding value and is_diff
*/
export function extractSubBlockRest(subBlock: Record<string, unknown>): Record<string, unknown> {
const { value: _v, is_diff: _sd, ...rest } = subBlock as SubBlockWithDiffMarker
const {
value: _v,
is_diff: _sd,
type: _type,
...rest
} = subBlock as SubBlockWithDiffMarker & {
type?: unknown
}
return rest
}

Expand Down
36 changes: 36 additions & 0 deletions apps/sim/tools/gmail/utils.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/**
* @vitest-environment node
*/
import { describe, expect, it } from 'vitest'
import { encodeRfc2047 } from './utils'

describe('encodeRfc2047', () => {
it('returns ASCII text unchanged', () => {
expect(encodeRfc2047('Simple ASCII Subject')).toBe('Simple ASCII Subject')
})

it('returns empty string unchanged', () => {
expect(encodeRfc2047('')).toBe('')
})

it('encodes emojis as RFC 2047 base64', () => {
const result = encodeRfc2047('Time to Stretch! 🧘')
expect(result).toBe('=?UTF-8?B?VGltZSB0byBTdHJldGNoISDwn6eY?=')
})

it('round-trips non-ASCII subjects correctly', () => {
const subjects = ['Hello 世界', 'Café résumé', '🎉🎊🎈 Party!', '今週のミーティング']
for (const subject of subjects) {
const encoded = encodeRfc2047(subject)
const match = encoded.match(/^=\?UTF-8\?B\?(.+)\?=$/)
expect(match).not.toBeNull()
const decoded = Buffer.from(match![1], 'base64').toString('utf-8')
expect(decoded).toBe(subject)
}
})

it('does not double-encode already-encoded subjects', () => {
const alreadyEncoded = '=?UTF-8?B?VGltZSB0byBTdHJldGNoISDwn6eY?='
expect(encodeRfc2047(alreadyEncoded)).toBe(alreadyEncoded)
})
})
17 changes: 15 additions & 2 deletions apps/sim/tools/gmail/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,19 @@ function generateBoundary(): string {
return `----=_Part_${Date.now()}_${Math.random().toString(36).substring(2, 15)}`
}

/**
* Encode a header value using RFC 2047 Base64 encoding if it contains non-ASCII characters.
* This matches Google's own Gmail API sample: `=?utf-8?B?${Buffer.from(subject).toString('base64')}?=`
* @see https://github.com/googleapis/google-api-nodejs-client/blob/main/samples/gmail/send.js
*/
export function encodeRfc2047(value: string): string {
// eslint-disable-next-line no-control-regex
if (/^[\x00-\x7F]*$/.test(value)) {
return value
}
return `=?UTF-8?B?${Buffer.from(value, 'utf-8').toString('base64')}?=`
}

/**
* Encode string or buffer to base64url format (URL-safe base64)
* Gmail API requires base64url encoding for the raw message field
Expand Down Expand Up @@ -333,7 +346,7 @@ export function buildSimpleEmailMessage(params: {
emailHeaders.push(`Bcc: ${bcc}`)
}

emailHeaders.push(`Subject: ${subject || ''}`)
emailHeaders.push(`Subject: ${encodeRfc2047(subject || '')}`)

if (inReplyTo) {
emailHeaders.push(`In-Reply-To: ${inReplyTo}`)
Expand Down Expand Up @@ -380,7 +393,7 @@ export function buildMimeMessage(params: BuildMimeMessageParams): string {
if (bcc) {
messageParts.push(`Bcc: ${bcc}`)
}
messageParts.push(`Subject: ${subject || ''}`)
messageParts.push(`Subject: ${encodeRfc2047(subject || '')}`)

if (inReplyTo) {
messageParts.push(`In-Reply-To: ${inReplyTo}`)
Expand Down
41 changes: 41 additions & 0 deletions apps/sim/triggers/constants.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/**
* @vitest-environment node
*/
import { describe, expect, it } from 'vitest'
import { POLLING_PROVIDERS } from '@/triggers/constants'
import { TRIGGER_REGISTRY } from '@/triggers/registry'

describe('POLLING_PROVIDERS sync with TriggerConfig.polling', () => {
it('matches every trigger with polling: true in the registry', () => {
const registryPollingProviders = new Set(
Object.values(TRIGGER_REGISTRY)
.filter((t) => t.polling === true)
.map((t) => t.provider)
)

expect(POLLING_PROVIDERS).toEqual(registryPollingProviders)
})

it('no trigger with polling: true is missing from POLLING_PROVIDERS', () => {
const missing: string[] = []
for (const trigger of Object.values(TRIGGER_REGISTRY)) {
if (trigger.polling && !POLLING_PROVIDERS.has(trigger.provider)) {
missing.push(`${trigger.id} (provider: ${trigger.provider})`)
}
}
expect(missing, `Triggers with polling: true missing from POLLING_PROVIDERS`).toEqual([])
})

it('no POLLING_PROVIDERS entry lacks a polling: true trigger in the registry', () => {
const extra: string[] = []
for (const provider of POLLING_PROVIDERS) {
const hasTrigger = Object.values(TRIGGER_REGISTRY).some(
(t) => t.provider === provider && t.polling === true
)
if (!hasTrigger) {
extra.push(provider)
}
}
expect(extra, `POLLING_PROVIDERS entries with no matching polling trigger`).toEqual([])
})
})
12 changes: 12 additions & 0 deletions apps/sim/triggers/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,15 @@ export const TRIGGER_RUNTIME_SUBBLOCK_IDS: string[] = [
* This prevents runaway errors from continuously executing failing workflows.
*/
export const MAX_CONSECUTIVE_FAILURES = 100

/**
* Set of webhook provider names that use polling-based triggers.
* Mirrors the `polling: true` flag on TriggerConfig entries.
* Used to route execution: polling providers use the full job queue
* (Trigger.dev), non-polling providers execute inline.
*/
export const POLLING_PROVIDERS = new Set(['gmail', 'outlook', 'rss', 'imap'])

export function isPollingWebhookProvider(provider: string): boolean {
return POLLING_PROVIDERS.has(provider)
}
Loading
Loading