From ded92ead5baa5a8ec6d2a9fb5996e140142cfa21 Mon Sep 17 00:00:00 2001 From: Shyam Desigan Date: Sat, 20 Jun 2026 13:00:01 +0000 Subject: [PATCH] feat: Add LiteLLM provider instrumentation for cross-provider LLM tracking --- agentops/instrumentation/__init__.py | 5 + .../providers/litellm/__init__.py | 12 + .../providers/litellm/attributes.py | 316 ++++++++++++++ .../providers/litellm/instrumentor.py | 384 ++++++++++++++++++ 4 files changed, 717 insertions(+) create mode 100644 agentops/instrumentation/providers/litellm/__init__.py create mode 100644 agentops/instrumentation/providers/litellm/attributes.py create mode 100644 agentops/instrumentation/providers/litellm/instrumentor.py diff --git a/agentops/instrumentation/__init__.py b/agentops/instrumentation/__init__.py index 9f1b6a3c8..a83b47db1 100644 --- a/agentops/instrumentation/__init__.py +++ b/agentops/instrumentation/__init__.py @@ -73,6 +73,11 @@ class InstrumentorConfig(TypedDict): "min_version": "0.1.0", "package_name": "mem0ai", }, + "litellm": { + "module_name": "agentops.instrumentation.providers.litellm", + "class_name": "LiteLLMInstrumentor", + "min_version": "1.3.1", + }, } # Configuration for supported agentic libraries diff --git a/agentops/instrumentation/providers/litellm/__init__.py b/agentops/instrumentation/providers/litellm/__init__.py new file mode 100644 index 000000000..a6d66841c --- /dev/null +++ b/agentops/instrumentation/providers/litellm/__init__.py @@ -0,0 +1,12 @@ +"""LiteLLM instrumentation module. + +This module provides AgentOps instrumentation for LiteLLM, tracking LLM calls +regardless of which underlying provider LiteLLM routes to (OpenAI, Anthropic, etc.). +It wraps litellm.completion() and litellm.acompletion() directly at the LiteLLM +entry point, ensuring that provider-specific tracking works even when LiteLLM's +internal handlers bypass the official provider SDKs. +""" + +from agentops.instrumentation.providers.litellm.instrumentor import LiteLLMInstrumentor + +__all__ = ["LiteLLMInstrumentor"] diff --git a/agentops/instrumentation/providers/litellm/attributes.py b/agentops/instrumentation/providers/litellm/attributes.py new file mode 100644 index 000000000..2e97a34b8 --- /dev/null +++ b/agentops/instrumentation/providers/litellm/attributes.py @@ -0,0 +1,316 @@ +"""Attribute extraction for LiteLLM request/response tracking. + +This module handles the extraction of telemetry attributes from LiteLLM +completion requests and responses, including the special handling needed +for LiteLLM's Responses API response format vs. standard ModelResponse. + +LiteLLM normalizes all provider responses to a ModelResponse format, but +the Responses API endpoint may return ResponsesAPIResponse objects that +have different attribute paths for usage data. +""" + +from typing import Any, Dict, Optional, Union + +from agentops.semconv import SpanAttributes, MessageAttributes +from agentops.logging import logger + +# Known providers supported by LiteLLM with "provider/model" format +KNOWN_PROVIDERS = { + "anthropic", + "openai", + "azure", + "bedrock", + "vertex_ai", + "vertex_ai_beta", + "gemini", + "cohere", + "mistral", + "together_ai", + "replicate", + "huggingface", + "perplexity", + "deepseek", + "groq", + "sambanova", + "watsonx", + "ai21", + "claude", + "ollama", + "custom", + "databricks", + "fireworks_ai", + "xai", + "openrouter", +} + + +def extract_provider_from_model(model: str) -> str: + """Extract the provider name from a LiteLLM model string. + + LiteLLM uses the format "provider/model_name" (e.g. "anthropic/claude-3-5-sonnet", + "openai/gpt-4o", "bedrock/anthropic.claude-3-sonnet"). + + Args: + model: The model string (e.g. "anthropic/claude-3-5-sonnet-20240620") + + Returns: + The provider name (e.g. "anthropic"), or "unknown" if it cannot be determined. + """ + if not model or not isinstance(model, str): + return "unknown" + + # Check for "provider/model" format + if "/" in model: + provider_part = model.split("/")[0].lower().strip() + if provider_part in KNOWN_PROVIDERS: + return provider_part + return provider_part # Return even if unknown, for custom providers + + # No provider prefix - treat as native model name + if model.startswith("gpt") or model.startswith("o1") or model.startswith("o3") or model.startswith("dall"): + return "openai" + if model.startswith("claude"): + return "anthropic" + if model.startswith("gemini"): + return "google" + + return "unknown" + + +def get_request_attributes(kwargs: Dict[str, Any]) -> Dict[str, Any]: + """Extract attributes from the LiteLLM request kwargs. + + Args: + kwargs: The keyword arguments passed to litellm.completion() + + Returns: + A dict of OpenTelemetry span attributes. + """ + attributes = {} + + # Extract model + model = kwargs.get("model", "") + if model: + attributes[SpanAttributes.LLM_REQUEST_MODEL] = str(model) + + # Extract provider from model string + provider = extract_provider_from_model(str(model)) + if provider != "unknown": + attributes["gen_ai.request.provider"] = provider + + # Extract provider-specific prefix + if model and "/" in str(model): + attributes["llm.litellm.provider"] = str(model).split("/")[0] + + # Extract max_tokens + if "max_tokens" in kwargs: + attributes[SpanAttributes.LLM_REQUEST_MAX_TOKENS] = kwargs["max_tokens"] + + # Extract temperature + if "temperature" in kwargs: + attributes[SpanAttributes.LLM_REQUEST_TEMPERATURE] = kwargs["temperature"] + + # Extract top_p + if "top_p" in kwargs: + attributes[SpanAttributes.LLM_REQUEST_TOP_P] = kwargs["top_p"] + + # Extract stop sequences + if "stop" in kwargs: + stop = kwargs["stop"] + if isinstance(stop, (list, tuple)): + attributes["gen_ai.request.stop_sequences"] = ",".join(str(s) for s in stop) + else: + attributes["gen_ai.request.stop_sequences"] = str(stop) + + # Extract streaming flag + attributes["gen_ai.request.stream"] = str(kwargs.get("stream", False)) + + # Extract user (if provided) + if "user" in kwargs: + attributes["gen_ai.request.user"] = kwargs["user"] + + return attributes + + +def get_response_attributes( + response: Any, + request_kwargs: Optional[Dict[str, Any]] = None, +) -> Dict[str, Any]: + """Extract attributes from the LiteLLM response. + + Handles both standard ModelResponse and ResponsesAPIResponse formats. + LiteLLM normalizes most provider responses to ModelResponse, but the + Responses API endpoint uses a different response structure. + + Args: + response: The response object from litellm.completion() + request_kwargs: The original request kwargs (optional, for fallback) + + Returns: + A dict of OpenTelemetry span attributes. + """ + attributes = {} + + try: + # Extract model from response + model = getattr(response, "model", None) + if model: + attributes[SpanAttributes.LLM_RESPONSE_MODEL] = str(model) + + # Extract provider from model in the response too + provider = extract_provider_from_model(str(model)) + if provider != "unknown": + attributes["gen_ai.response.provider"] = provider + + # Extract response id + response_id = getattr(response, "id", None) + if response_id: + attributes[SpanAttributes.LLM_RESPONSE_ID] = str(response_id) + + # Try to extract usage data + usage = getattr(response, "usage", None) + if usage is not None: + _extract_usage_attributes(usage, attributes, request_kwargs) + else: + # If no usage directly on response, try the Responses API format + _extract_responses_api_usage_if_present(response, attributes) + + # Extract completion content from choices + choices = getattr(response, "choices", None) + if choices and len(choices) > 0: + choice = choices[0] + message = getattr(choice, "message", None) + if message is not None: + content = getattr(message, "content", None) + if content is not None: + attributes[MessageAttributes.COMPLETION_CONTENT.format(i=0)] = str(content) + + role = getattr(message, "role", None) + if role is not None: + attributes[MessageAttributes.COMPLETION_ROLE.format(i=0)] = str(role) + + # Extract finish reason + finish_reason = getattr(choice, "finish_reason", None) + if finish_reason is not None: + attributes[MessageAttributes.COMPLETION_FINISH_REASON.format(i=0)] = str(finish_reason) + + # Extract tool calls if present + tool_calls = getattr(message, "tool_calls", None) + if tool_calls and len(tool_calls) > 0: + for idx, tc in enumerate(tool_calls): + tc_id = getattr(tc, "id", None) or getattr(tc, "id_", None) + tc_function = getattr(tc, "function", None) + tc_type = getattr(tc, "type", None) + if tc_function: + tc_name = getattr(tc_function, "name", None) + tc_args = getattr(tc_function, "arguments", None) + if tc_name: + attributes[f"gen_ai.tool_call.{idx}.name"] = str(tc_name) + if tc_args: + attributes[f"gen_ai.tool_call.{idx}.arguments"] = str(tc_args) + else: + # Try Responses API output format + _extract_responses_api_output(response, attributes) + + except Exception as e: + logger.debug(f"[LITELLM ATTRIBUTES] Error extracting response attributes: {e}") + + return attributes + + +def _extract_usage_attributes( + usage: Any, + attributes: Dict[str, Any], + request_kwargs: Optional[Dict[str, Any]] = None, +) -> None: + """Extract token usage attributes from a usage object. + + Args: + usage: The usage object from the response + attributes: The attributes dict to update + request_kwargs: Original request kwargs (optional) + """ + # Standard ModelResponse usage format + prompt_tokens = getattr(usage, "prompt_tokens", None) + completion_tokens = getattr(usage, "completion_tokens", None) + total_tokens = getattr(usage, "total_tokens", None) + + # If not found, try LiteLLM-specific usage format (some providers) + if prompt_tokens is None: + prompt_tokens = getattr(usage, "input_tokens", None) + if completion_tokens is None: + completion_tokens = getattr(usage, "output_tokens", None) + + if prompt_tokens is not None: + attributes[SpanAttributes.LLM_USAGE_PROMPT_TOKENS] = int(prompt_tokens) + if completion_tokens is not None: + attributes[SpanAttributes.LLM_USAGE_COMPLETION_TOKENS] = int(completion_tokens) + if total_tokens is not None: + attributes[SpanAttributes.LLM_USAGE_TOTAL_TOKENS] = int(total_tokens) + + +def _extract_responses_api_usage_if_present(response: Any, attributes: Dict[str, Any]) -> None: + """Check if response has Responses API style usage and extract it. + + The Responses API (used by OpenAI /v1/responses) may store usage + directly on the response object under different attribute names. + """ + # Responses API usage may be at response.usage.input_tokens etc. + input_tokens = getattr(response, "input_tokens", None) + if input_tokens is None: + input_tokens = getattr(response, "prompt_tokens", None) + if input_tokens is not None: + attributes[SpanAttributes.LLM_USAGE_PROMPT_TOKENS] = int(input_tokens) + + output_tokens = getattr(response, "output_tokens", None) + if output_tokens is None: + output_tokens = getattr(response, "completion_tokens", None) + if output_tokens is not None: + attributes[SpanAttributes.LLM_USAGE_COMPLETION_TOKENS] = int(output_tokens) + + +def _extract_responses_api_output(response: Any, attributes: Dict[str, Any]) -> None: + """Extract output from Responses API format. + + The Responses API uses 'output' instead of 'choices', containing + a list of output items (messages, function calls, etc.). + """ + try: + output = getattr(response, "output", None) + if output and isinstance(output, (list, tuple)) and len(output) > 0: + for item in output: + item_type = getattr(item, "type", None) + if item_type == "message": + content_list = getattr(item, "content", None) + if content_list and isinstance(content_list, (list, tuple)): + for content_item in content_list: + content_type = getattr(content_item, "type", None) + if content_type == "output_text": + text = getattr(content_item, "text", None) + if text: + attributes[MessageAttributes.COMPLETION_CONTENT.format(i=0)] = str(text) + break + # Also try direct text attribute + text = getattr(item, "text", None) + if text: + attributes[MessageAttributes.COMPLETION_CONTENT.format(i=0)] = str(text) + elif item_type == "function_call": + fc_name = getattr(item, "name", None) + fc_args = getattr(item, "arguments", None) + if fc_name: + attributes["gen_ai.tool_call.0.name"] = str(fc_name) + if fc_args: + attributes["gen_ai.tool_call.0.arguments"] = str(fc_args) + elif item_type == "reasoning" or item_type == "thinking": + summary = getattr(item, "summary", None) + if summary: + attributes["gen_ai.response.reasoning"] = str(summary) + content_list = getattr(item, "content", None) + if content_list and isinstance(content_list, (list, tuple)): + for content_item in content_list: + text = getattr(content_item, "text", None) + if text: + attributes["gen_ai.response.reasoning"] = str(text) + break + except Exception: + pass diff --git a/agentops/instrumentation/providers/litellm/instrumentor.py b/agentops/instrumentation/providers/litellm/instrumentor.py new file mode 100644 index 000000000..1643dede8 --- /dev/null +++ b/agentops/instrumentation/providers/litellm/instrumentor.py @@ -0,0 +1,384 @@ +"""LiteLLM Instrumentation for AgentOps + +This module provides automatic instrumentation for LiteLLM, enabling AgentOps +to track LLM calls through the LiteLLM entry point regardless of the underlying +provider. + +Why LiteLLM needs its own instrumentation: +- When LiteLLM calls litellm.completion(model="anthropic/claude-3-5-sonnet-..."), + LiteLLM uses its own internal provider handler that makes direct HTTP calls. + It does NOT use the official Anthropic SDK, so AgentOps' Anthropic instrumentor + cannot capture those calls. +- Similarly, for other non-OpenAI providers (Bedrock, Gemini, Cohere, etc.), + LiteLLM routes through its own handlers, bypassing the provider SDKs. +- By wrapping litellm.completion() and litellm.acompletion() directly, we capture + ALL provider calls at the LiteLLM entry point. + +This instrumentation handles: +- Standard completion requests (non-streaming) +- Streaming completion requests +- Both sync and async methods +- Responses API response format (used by OpenAI /v1/responses) +""" + +import time +from typing import Any, Dict, Generator, AsyncGenerator + +from opentelemetry.trace import SpanKind, Status, StatusCode +from opentelemetry import context as context_api +from opentelemetry.context import _SUPPRESS_INSTRUMENTATION_KEY +from opentelemetry.trace import Span, set_span_in_context +from opentelemetry.metrics import Meter +from wrapt import wrap_function_wrapper + +from agentops.logging import logger +from agentops.instrumentation.common import ( + CommonInstrumentor, + InstrumentorConfig, + WrapConfig, + StandardMetrics, + MetricsRecorder, +) +from agentops.instrumentation.providers.litellm.attributes import ( + get_request_attributes, + get_response_attributes, +) +from agentops.semconv import SpanAttributes, LLMRequestTypeValues + +LIBRARY_NAME = "litellm" +LIBRARY_VERSION = "1.3.1" # Minimum version required + +_instruments = ("litellm >= 1.3.1",) + + +class LiteLLMInstrumentor(CommonInstrumentor): + """Instrumentor for LiteLLM's completion methods. + + Wraps litellm.completion() and litellm.acompletion() to track LLM calls + across all providers that LiteLLM supports. This is necessary because + LiteLLM uses internal handlers for many providers that bypass the official + SDKs, making provider-specific instrumentation insufficient. + """ + + def __init__(self): + config = InstrumentorConfig( + library_name=LIBRARY_NAME, + library_version=LIBRARY_VERSION, + wrapped_methods=[], + metrics_enabled=True, + dependencies=_instruments, + ) + super().__init__(config) + + def _custom_wrap(self, **kwargs): + """Wrap litellm.completion and litellm.acompletion.""" + if not self._tracer: + logger.debug("[LITELLM INSTRUMENTOR] No tracer available, skipping wrapping") + return + + try: + # Wrap sync completion + wrap_function_wrapper( + "litellm", + "completion", + _completion_wrapper(self._tracer), + ) + + # Wrap async completion + wrap_function_wrapper( + "litellm", + "acompletion", + _async_completion_wrapper(self._tracer), + ) + + logger.debug("[LITELLM INSTRUMENTOR] Successfully wrapped litellm.completion and litellm.acompletion") + except Exception as e: + logger.warning(f"[LITELLM INSTRUMENTOR] Error setting up wrappers: {e}") + + def _create_metrics(self, meter: Meter) -> Dict[str, Any]: + """Create metrics for LiteLLM instrumentation.""" + return StandardMetrics.create_standard_metrics(meter) + + +def _completion_wrapper(tracer): + """Wrapper for litellm.completion().""" + + def wrapper(wrapped, instance, args, kwargs): + if context_api.get_value(_SUPPRESS_INSTRUMENTATION_KEY): + return wrapped(*args, **kwargs) + + # Check if streaming is enabled + is_streaming = kwargs.get("stream", False) + + # Extract request attributes + request_attributes = get_request_attributes(kwargs) + model_str = request_attributes.get(SpanAttributes.LLM_REQUEST_MODEL, "unknown") + provider = request_attributes.get("gen_ai.request.provider", "unknown") + + # Create span + span_name = f"litellm.completion ({model_str})" + span = tracer.start_span( + span_name, + kind=SpanKind.CLIENT, + attributes={ + SpanAttributes.LLM_REQUEST_TYPE: LLMRequestTypeValues.CHAT.value, + "llm.litellm.model": model_str, + "llm.litellm.provider": provider, + **request_attributes, + }, + ) + + current_context = context_api.get_current() + token = context_api.attach(set_span_in_context(span, current_context)) + + try: + start_time = time.time() + response = wrapped(*args, **kwargs) + duration = time.time() - start_time + + span.set_attribute("llm.litellm.duration_ms", duration * 1000) + + if is_streaming: + # For streaming, wrap the response + context_api.detach(token) + logger.debug(f"[LITELLM] Wrapping streaming response for model: {model_str}") + return _LiteLLMStreamWrapper(response, span, kwargs) + else: + # For non-streaming, extract response attributes + response_attributes = get_response_attributes(response, kwargs) + for key, value in response_attributes.items(): + span.set_attribute(key, value) + + # Record cost if available + cost_attr = getattr(response, "_hidden_params", None) + if cost_attr: + cost = getattr(cost_attr, "additional_headers", {}).get("x-litellm-cost") + if cost: + span.set_attribute("llm.litellm.cost", float(cost)) + + span.set_status(Status(StatusCode.OK)) + span.end() + context_api.detach(token) + logger.debug(f"[LITELLM] Completed non-streaming span for model: {model_str}") + return response + + except Exception as e: + span.record_exception(e) + span.set_status(Status(StatusCode.ERROR, str(e))) + span.end() + context_api.detach(token) + raise + + return wrapper + + +def _async_completion_wrapper(tracer): + """Wrapper for litellm.acompletion().""" + + async def wrapper(wrapped, instance, args, kwargs): + if context_api.get_value(_SUPPRESS_INSTRUMENTATION_KEY): + return await wrapped(*args, **kwargs) + + # Check if streaming is enabled + is_streaming = kwargs.get("stream", False) + + # Extract request attributes + request_attributes = get_request_attributes(kwargs) + model_str = request_attributes.get(SpanAttributes.LLM_REQUEST_MODEL, "unknown") + provider = request_attributes.get("gen_ai.request.provider", "unknown") + + # Create span + span_name = f"litellm.acompletion ({model_str})" + span = tracer.start_span( + span_name, + kind=SpanKind.CLIENT, + attributes={ + SpanAttributes.LLM_REQUEST_TYPE: LLMRequestTypeValues.CHAT.value, + "llm.litellm.model": model_str, + "llm.litellm.provider": provider, + **request_attributes, + }, + ) + + current_context = context_api.get_current() + token = context_api.attach(set_span_in_context(span, current_context)) + + try: + start_time = time.time() + response = await wrapped(*args, **kwargs) + duration = time.time() - start_time + + span.set_attribute("llm.litellm.duration_ms", duration * 1000) + + if is_streaming: + # For streaming, wrap the response + context_api.detach(token) + logger.debug(f"[LITELLM] Wrapping async streaming response for model: {model_str}") + return _LiteLLMStreamWrapper(response, span, kwargs) + else: + # For non-streaming, extract response attributes + response_attributes = get_response_attributes(response, kwargs) + for key, value in response_attributes.items(): + span.set_attribute(key, value) + + span.set_status(Status(StatusCode.OK)) + span.end() + context_api.detach(token) + return response + + except Exception as e: + span.record_exception(e) + span.set_status(Status(StatusCode.ERROR, str(e))) + span.end() + context_api.detach(token) + raise + + return wrapper + + +class _LiteLLMStreamWrapper: + """Wrapper for LiteLLM streaming responses. + + Handles both sync and async streaming iterations, aggregating chunks + and collecting usage data when the stream is complete. + """ + + def __init__(self, stream: Any, span: Span, kwargs: dict): + self._stream = stream + self._span = span + self._kwargs = kwargs + self._start_time = time.time() + self._chunks = [] + self._content_chunks = [] + self._finished = False + self._usage = None + self._model = None + + current_context = context_api.get_current() + self._token = context_api.attach(set_span_in_context(span, current_context)) + + def __iter__(self): + return self + + def __next__(self): + try: + chunk = next(self._stream) + self._process_chunk(chunk) + return chunk + except StopIteration: + self._finalize() + raise + + def __aiter__(self): + return self + + async def __anext__(self): + try: + if hasattr(self._stream, "__anext__"): + chunk = await self._stream.__anext__() + else: + chunk = next(self._stream) + self._process_chunk(chunk) + return chunk + except StopAsyncIteration: + self._finalize() + raise + except StopIteration: + self._finalize() + raise StopAsyncIteration + except Exception as e: + self._span.record_exception(e) + self._span.set_status(Status(StatusCode.ERROR, str(e))) + self._span.end() + context_api.detach(self._token) + raise + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + if exc_type is not None: + self._span.record_exception(exc_val) + self._span.set_status(Status(StatusCode.ERROR, str(exc_val))) + self._finalize() + return False + + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + if exc_type is not None: + self._span.record_exception(exc_val) + self._span.set_status(Status(StatusCode.ERROR, str(exc_val))) + self._finalize() + return False + + def _process_chunk(self, chunk): + """Process a streaming chunk from LiteLLM.""" + self._chunks.append(chunk) + + # Try to extract model from chunk + if self._model is None: + model = getattr(chunk, "model", None) + if model: + self._model = model + self._span.set_attribute(SpanAttributes.LLM_RESPONSE_MODEL, str(model)) + + # Try to extract content from chunk + choices = getattr(chunk, "choices", None) + if choices and len(choices) > 0: + delta = getattr(choices[0], "delta", None) + if delta: + content = getattr(delta, "content", None) + if content: + self._content_chunks.append(content) + + # Try to extract usage from chunk (usually in the final chunk) + usage = getattr(chunk, "usage", None) + if usage: + self._usage = usage + + def _finalize(self): + """Finalize the streaming span with aggregated data.""" + if self._finished: + return + self._finished = True + + try: + duration = time.time() - self._start_time + self._span.set_attribute("llm.litellm.duration_ms", duration * 1000) + self._span.set_attribute("llm.litellm.stream.chunk_count", len(self._chunks)) + + # Aggregate content + if self._content_chunks: + full_content = "".join(self._content_chunks) + from agentops.semconv import MessageAttributes + self._span.set_attribute(MessageAttributes.COMPLETION_CONTENT.format(i=0), full_content) + + # Set usage from aggregated data + if self._usage: + prompt_tokens = getattr(self._usage, "prompt_tokens", None) + if prompt_tokens is None: + prompt_tokens = getattr(self._usage, "input_tokens", None) + if prompt_tokens: + self._span.set_attribute(SpanAttributes.LLM_USAGE_PROMPT_TOKENS, int(prompt_tokens)) + + completion_tokens = getattr(self._usage, "completion_tokens", None) + if completion_tokens is None: + completion_tokens = getattr(self._usage, "output_tokens", None) + if completion_tokens: + self._span.set_attribute(SpanAttributes.LLM_USAGE_COMPLETION_TOKENS, int(completion_tokens)) + + total_tokens = getattr(self._usage, "total_tokens", None) + if total_tokens: + self._span.set_attribute(SpanAttributes.LLM_USAGE_TOTAL_TOKENS, int(total_tokens)) + + self._span.set_status(Status(StatusCode.OK)) + self._span.end() + context_api.detach(self._token) + + logger.debug(f"[LITELLM] Finalized streaming span: {len(self._chunks)} chunks, {len(self._content_chunks)} content deltas") + except Exception as e: + logger.debug(f"[LITELLM] Error finalizing stream: {e}") + self._span.end() + context_api.detach(self._token)