|
| 1 | +import type { PropagationContext } from '@sentry/core'; |
| 2 | +import { |
| 3 | + captureException, |
| 4 | + flush, |
| 5 | + SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN, |
| 6 | + SEMANTIC_ATTRIBUTE_SENTRY_SOURCE, |
| 7 | + startSpan, |
| 8 | + withIsolationScope, |
| 9 | + withScope, |
| 10 | +} from '@sentry/core'; |
| 11 | +import type { |
| 12 | + WorkflowEntrypoint, |
| 13 | + WorkflowEvent, |
| 14 | + WorkflowSleepDuration, |
| 15 | + WorkflowStep, |
| 16 | + WorkflowStepConfig, |
| 17 | + WorkflowStepEvent, |
| 18 | + WorkflowTimeoutDuration, |
| 19 | +} from 'cloudflare:workers'; |
| 20 | +import { setAsyncLocalStorageAsyncContextStrategy } from './async'; |
| 21 | +import type { CloudflareOptions } from './client'; |
| 22 | +import { addCloudResourceContext } from './scope-utils'; |
| 23 | +import { init } from './sdk'; |
| 24 | + |
| 25 | +const UUID_REGEX = /^[0-9a-f]{8}-?[0-9a-f]{4}-?[0-9a-f]{4}-?[0-9a-f]{4}-?[0-9a-f]{12}$/i; |
| 26 | + |
| 27 | +function propagationContextFromInstanceId(instanceId: string): PropagationContext { |
| 28 | + // Validate and normalize traceId - should be a valid UUID with or without hyphens |
| 29 | + if (!UUID_REGEX.test(instanceId)) { |
| 30 | + throw new Error("Invalid 'instanceId' for workflow: Sentry requires random UUIDs for instanceId."); |
| 31 | + } |
| 32 | + |
| 33 | + // Remove hyphens to get UUID without hyphens |
| 34 | + const traceId = instanceId.replace(/-/g, ''); |
| 35 | + |
| 36 | + // Derive sampleRand from last 4 characters of the random UUID |
| 37 | + // |
| 38 | + // We cannot store any state between workflow steps, so we derive the |
| 39 | + // sampleRand from the traceId itself. This ensures that the sampling is |
| 40 | + // consistent across all steps in the same workflow instance. |
| 41 | + const sampleRand = parseInt(traceId.slice(-4), 16) / 0xffff; |
| 42 | + |
| 43 | + return { |
| 44 | + traceId, |
| 45 | + sampleRand, |
| 46 | + }; |
| 47 | +} |
| 48 | + |
| 49 | +async function workflowStepWithSentry<V>( |
| 50 | + instanceId: string, |
| 51 | + options: CloudflareOptions, |
| 52 | + callback: () => V, |
| 53 | +): Promise<V> { |
| 54 | + setAsyncLocalStorageAsyncContextStrategy(); |
| 55 | + |
| 56 | + return withIsolationScope(async isolationScope => { |
| 57 | + const client = init({ ...options, enableDedupe: false }); |
| 58 | + isolationScope.setClient(client); |
| 59 | + |
| 60 | + addCloudResourceContext(isolationScope); |
| 61 | + |
| 62 | + return withScope(async scope => { |
| 63 | + const propagationContext = propagationContextFromInstanceId(instanceId); |
| 64 | + scope.setPropagationContext(propagationContext); |
| 65 | + |
| 66 | + // eslint-disable-next-line no-return-await |
| 67 | + return await callback(); |
| 68 | + }); |
| 69 | + }); |
| 70 | +} |
| 71 | + |
| 72 | +class WrappedWorkflowStep implements WorkflowStep { |
| 73 | + public constructor( |
| 74 | + private _instanceId: string, |
| 75 | + private _ctx: ExecutionContext, |
| 76 | + private _options: CloudflareOptions, |
| 77 | + private _step: WorkflowStep, |
| 78 | + ) {} |
| 79 | + |
| 80 | + public async do<T extends Rpc.Serializable<T>>(name: string, callback: () => Promise<T>): Promise<T>; |
| 81 | + public async do<T extends Rpc.Serializable<T>>( |
| 82 | + name: string, |
| 83 | + config: WorkflowStepConfig, |
| 84 | + callback: () => Promise<T>, |
| 85 | + ): Promise<T>; |
| 86 | + public async do<T extends Rpc.Serializable<T>>( |
| 87 | + name: string, |
| 88 | + configOrCallback: WorkflowStepConfig | (() => Promise<T>), |
| 89 | + maybeCallback?: () => Promise<T>, |
| 90 | + ): Promise<T> { |
| 91 | + const userCallback = (maybeCallback || configOrCallback) as () => Promise<T>; |
| 92 | + const config = typeof configOrCallback === 'function' ? undefined : configOrCallback; |
| 93 | + |
| 94 | + const instrumentedCallback: () => Promise<T> = async () => { |
| 95 | + return workflowStepWithSentry(this._instanceId, this._options, async () => { |
| 96 | + return startSpan( |
| 97 | + { |
| 98 | + op: 'function.step.do', |
| 99 | + name, |
| 100 | + attributes: { |
| 101 | + 'cloudflare.workflow.timeout': config?.timeout, |
| 102 | + 'cloudflare.workflow.retries.backoff': config?.retries?.backoff, |
| 103 | + 'cloudflare.workflow.retries.delay': config?.retries?.delay, |
| 104 | + 'cloudflare.workflow.retries.limit': config?.retries?.limit, |
| 105 | + [SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN]: 'auto.faas.cloudflare.workflow', |
| 106 | + [SEMANTIC_ATTRIBUTE_SENTRY_SOURCE]: 'task', |
| 107 | + }, |
| 108 | + }, |
| 109 | + async span => { |
| 110 | + try { |
| 111 | + const result = await userCallback(); |
| 112 | + span.setStatus({ code: 1 }); |
| 113 | + return result; |
| 114 | + } catch (error) { |
| 115 | + captureException(error, { mechanism: { handled: true, type: 'cloudflare' } }); |
| 116 | + throw error; |
| 117 | + } finally { |
| 118 | + this._ctx.waitUntil(flush(2000)); |
| 119 | + } |
| 120 | + }, |
| 121 | + ); |
| 122 | + }); |
| 123 | + }; |
| 124 | + |
| 125 | + return config ? this._step.do(name, config, instrumentedCallback) : this._step.do(name, instrumentedCallback); |
| 126 | + } |
| 127 | + |
| 128 | + public async sleep(name: string, duration: WorkflowSleepDuration): Promise<void> { |
| 129 | + return this._step.sleep(name, duration); |
| 130 | + } |
| 131 | + |
| 132 | + public async sleepUntil(name: string, timestamp: Date | number): Promise<void> { |
| 133 | + return this._step.sleepUntil(name, timestamp); |
| 134 | + } |
| 135 | + |
| 136 | + public async waitForEvent<T extends Rpc.Serializable<T>>( |
| 137 | + name: string, |
| 138 | + options: { type: string; timeout?: WorkflowTimeoutDuration | number }, |
| 139 | + ): Promise<WorkflowStepEvent<T>> { |
| 140 | + return this._step.waitForEvent<T>(name, options); |
| 141 | + } |
| 142 | +} |
| 143 | + |
| 144 | +/** |
| 145 | + * Instruments a Cloudflare Workflow class with Sentry. |
| 146 | + * |
| 147 | + * @example |
| 148 | + * ```typescript |
| 149 | + * const InstrumentedWorkflow = instrumentWorkflowWithSentry( |
| 150 | + * (env) => ({ dsn: env.SENTRY_DSN }), |
| 151 | + * MyWorkflowClass |
| 152 | + * ); |
| 153 | + * |
| 154 | + * export default InstrumentedWorkflow; |
| 155 | + * ``` |
| 156 | + * |
| 157 | + * @param optionsCallback - Function that returns Sentry options to initialize Sentry |
| 158 | + * @param WorkflowClass - The workflow class to instrument |
| 159 | + * @returns Instrumented workflow class with the same interface |
| 160 | + */ |
| 161 | +export function instrumentWorkflowWithSentry< |
| 162 | + E, // Environment type |
| 163 | + P, // Payload type |
| 164 | + T extends WorkflowEntrypoint<E, P>, // WorkflowEntrypoint type |
| 165 | + C extends new (ctx: ExecutionContext, env: E) => T, // Constructor type of the WorkflowEntrypoint class |
| 166 | +>(optionsCallback: (env: E) => CloudflareOptions, WorkFlowClass: C): C { |
| 167 | + return new Proxy(WorkFlowClass, { |
| 168 | + construct(target: C, args: [ctx: ExecutionContext, env: E], newTarget) { |
| 169 | + const [ctx, env] = args; |
| 170 | + const options = optionsCallback(env); |
| 171 | + const instance = Reflect.construct(target, args, newTarget) as T; |
| 172 | + return new Proxy(instance, { |
| 173 | + get(obj, prop, receiver) { |
| 174 | + if (prop === 'run') { |
| 175 | + return async function (event: WorkflowEvent<P>, step: WorkflowStep): Promise<unknown> { |
| 176 | + return obj.run.call(obj, event, new WrappedWorkflowStep(event.instanceId, ctx, options, step)); |
| 177 | + }; |
| 178 | + } |
| 179 | + return Reflect.get(obj, prop, receiver); |
| 180 | + }, |
| 181 | + }); |
| 182 | + }, |
| 183 | + }) as C; |
| 184 | +} |
0 commit comments