Skip to content

Commit 2c68a7c

Browse files
committed
Refactor
1 parent 35a1a02 commit 2c68a7c

File tree

1 file changed

+80
-73
lines changed
  • packages/core/src/utils/anthropic-ai

1 file changed

+80
-73
lines changed

packages/core/src/utils/anthropic-ai/index.ts

Lines changed: 80 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,74 @@ function addResponseAttributes(span: Span, response: AnthropicAiResponse, record
194194
addMetadataAttributes(span, response);
195195
}
196196

197+
/**
198+
* Handle common error catching and reporting for streaming requests
199+
*/
200+
function handleStreamingError(error: unknown, span: Span, methodPath: string): never {
201+
captureException(error, {
202+
mechanism: { handled: false, type: 'auto.ai.anthropic', data: { function: methodPath } },
203+
});
204+
205+
if (span.isRecording()) {
206+
span.setStatus({ code: SPAN_STATUS_ERROR, message: 'internal_error' });
207+
span.end();
208+
}
209+
throw error;
210+
}
211+
212+
/**
213+
* Handle streaming cases with common logic
214+
*/
215+
function handleStreamingRequest<T extends unknown[], R>(
216+
originalMethod: (...args: T) => Promise<R>,
217+
target: (...args: T) => Promise<R>,
218+
context: unknown,
219+
args: T,
220+
requestAttributes: Record<string, unknown>,
221+
operationName: string,
222+
methodPath: string,
223+
params: Record<string, unknown> | undefined,
224+
options: AnthropicAiOptions,
225+
isStreamRequested: boolean,
226+
): Promise<R> {
227+
const model = requestAttributes[GEN_AI_REQUEST_MODEL_ATTRIBUTE] ?? 'unknown';
228+
const spanConfig = {
229+
name: `${operationName} ${model} stream-response`,
230+
op: getSpanOperation(methodPath),
231+
attributes: requestAttributes as Record<string, SpanAttributeValue>,
232+
};
233+
234+
if (isStreamRequested) {
235+
return startSpanManual(spanConfig, async span => {
236+
try {
237+
if (options.recordInputs && params) {
238+
addPrivateRequestAttributes(span, params);
239+
}
240+
const result = await originalMethod.apply(context, args);
241+
return instrumentAsyncIterableStream(
242+
result as AsyncIterable<AnthropicAiStreamingEvent>,
243+
span,
244+
options.recordOutputs ?? false,
245+
) as unknown as R;
246+
} catch (error) {
247+
return handleStreamingError(error, span, methodPath);
248+
}
249+
});
250+
} else {
251+
return startSpanManual(spanConfig, span => {
252+
try {
253+
if (options.recordInputs && params) {
254+
addPrivateRequestAttributes(span, params);
255+
}
256+
const messageStream = target.apply(context, args);
257+
return instrumentMessageStream(messageStream, span, options.recordOutputs ?? false);
258+
} catch (error) {
259+
return handleStreamingError(error, span, methodPath);
260+
}
261+
});
262+
}
263+
}
264+
197265
/**
198266
* Instrument a method with Sentry spans
199267
* Following Sentry AI Agents Manual Instrumentation conventions
@@ -215,79 +283,18 @@ function instrumentMethod<T extends unknown[], R>(
215283
const isStreamRequested = Boolean(params?.stream);
216284
const isStreamingMethod = methodPath === 'messages.stream';
217285

218-
if (isStreamRequested) {
219-
return startSpanManual(
220-
{
221-
name: `${operationName} ${model} stream-response`,
222-
op: getSpanOperation(methodPath),
223-
attributes: requestAttributes as Record<string, SpanAttributeValue>,
224-
},
225-
async span => {
226-
try {
227-
if (options.recordInputs && params) {
228-
addPrivateRequestAttributes(span, params);
229-
}
230-
231-
const result = await originalMethod.apply(context, args);
232-
return instrumentAsyncIterableStream(
233-
result as AsyncIterable<AnthropicAiStreamingEvent>,
234-
span,
235-
options.recordOutputs ?? false,
236-
) as unknown as R;
237-
} catch (error) {
238-
captureException(error, {
239-
mechanism: {
240-
handled: false,
241-
type: 'auto.ai.anthropic',
242-
data: {
243-
function: methodPath,
244-
},
245-
},
246-
});
247-
248-
if (span.isRecording()) {
249-
span.setStatus({ code: SPAN_STATUS_ERROR, message: 'internal_error' });
250-
span.end();
251-
}
252-
throw error;
253-
}
254-
},
255-
);
256-
} else if (isStreamingMethod) {
257-
// Create span for instrumentation using startSpanManual
258-
return startSpanManual(
259-
{
260-
name: `${operationName} ${model} stream-response`,
261-
op: getSpanOperation(methodPath),
262-
attributes: requestAttributes as Record<string, SpanAttributeValue>,
263-
},
264-
span => {
265-
try {
266-
if (options.recordInputs && params) {
267-
addPrivateRequestAttributes(span, params);
268-
}
269-
270-
const messageStream = target.apply(context, args);
271-
return instrumentMessageStream(messageStream, span, options.recordOutputs ?? false);
272-
} catch (error) {
273-
captureException(error, {
274-
mechanism: {
275-
handled: false,
276-
type: 'auto.ai.anthropic',
277-
data: {
278-
function: methodPath,
279-
},
280-
},
281-
});
282-
283-
if (span.isRecording()) {
284-
span.setStatus({ code: SPAN_STATUS_ERROR, message: 'internal_error' });
285-
span.end();
286-
}
287-
288-
throw error;
289-
}
290-
},
286+
if (isStreamRequested || isStreamingMethod) {
287+
return handleStreamingRequest(
288+
originalMethod,
289+
target,
290+
context,
291+
args,
292+
requestAttributes,
293+
operationName,
294+
methodPath,
295+
params,
296+
options,
297+
isStreamRequested,
291298
);
292299
}
293300

0 commit comments

Comments
 (0)