Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,037 changes: 1,037 additions & 0 deletions src/backend-api/src/app/libs/logging/llm_token_telemetry.py

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions src/processor/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ dependencies = [
"azure-ai-projects==2.0.0b3",
"azure-appconfiguration==1.7.2",
"azure-core==1.38.0",
"azure-monitor-events-extension==0.1.0",
"azure-monitor-opentelemetry==1.8.7",
"azure-cosmos==4.15.0",
"azure-identity==1.26.0b1",
"azure-storage-blob==12.28.0",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,140 @@
logger = logging.getLogger(__name__)


def _extract_tokens_from_dict_or_obj(ud: Any) -> tuple[int, int, int]:
"""Extract (input, output, total) token counts from a dict or object."""
inp = out = tot = 0
if isinstance(ud, dict):
inp = ud.get("input_token_count", 0) or ud.get("input_tokens", 0) or 0
out = ud.get("output_token_count", 0) or ud.get("output_tokens", 0) or 0
tot = ud.get("total_token_count", 0) or ud.get("total_tokens", 0) or 0
else:
inp = getattr(ud, "input_token_count", 0) or getattr(ud, "input_tokens", 0) or 0
out = getattr(ud, "output_token_count", 0) or getattr(ud, "output_tokens", 0) or 0
tot = getattr(ud, "total_token_count", 0) or getattr(ud, "total_tokens", 0) or 0
if not tot:
tot = int(inp) + int(out)
return int(inp), int(out), int(tot)


def _try_emit_token_event(inp: int, out: int, tot: int, source: str) -> None:
"""Log token usage found in stream/response for diagnostics.

The actual LLM_Token_Usage event is emitted by TokenUsageTracker.record()
in the orchestrator, which has full context (agent, step, model, user).
This function only logs for debugging to avoid duplicate events.
"""
if tot > 0 or inp > 0 or out > 0:
logger.info(
"[TOKEN_STREAM] usage found: input=%s output=%s total=%s source=%s",
inp, out, tot, source,
)


def _emit_usage_from_stream_item(item: Any) -> None:
"""Check a streamed ChatResponseUpdate for usage Content and emit an App Insights event.

Checks multiple locations where usage data may appear:
1. item.contents[] with type="usage" and usage_details
2. item.usage (direct attribute - some SDK versions)
3. item.metadata with usage keys
"""
try:
item_type = type(item).__name__

# --- Path 1: contents list with Content(type="usage") ---
contents = getattr(item, "contents", None)
if contents:
for content in contents:
ctype = getattr(content, "type", None)
if ctype == "usage":
# SDK UsageContent uses "details"; fall back to "usage_details"
ud = getattr(content, "details", None) or getattr(content, "usage_details", None)
if ud:
inp, out, tot = _extract_tokens_from_dict_or_obj(ud)
_try_emit_token_event(inp, out, tot, "stream_contents")
return

# --- Path 2: direct .usage attribute ---
usage = getattr(item, "usage", None)
if usage is not None:
inp, out, tot = _extract_tokens_from_dict_or_obj(usage)
_try_emit_token_event(inp, out, tot, "stream_usage_attr")
return

# --- Path 3: .metadata dict with usage keys ---
metadata = getattr(item, "metadata", None)
if isinstance(metadata, dict):
if any(k in metadata for k in ("input_tokens", "input_token_count", "usage")):
usage_data = metadata.get("usage", metadata)
inp, out, tot = _extract_tokens_from_dict_or_obj(usage_data)
_try_emit_token_event(inp, out, tot, "stream_metadata")
return

# --- Diagnostic: log item shape for debugging (only for non-text items) ---
if contents:
content_types = [getattr(c, "type", "?") for c in contents]
if any(t not in ("text",) for t in content_types):
logger.debug(
"[TOKEN_DIAG] item_type=%s content_types=%s attrs=%s",
item_type,
content_types,
[a for a in dir(item) if not a.startswith("_")],
)
except Exception as e:
logger.debug("[TOKEN_STREAM] error in emit: %s", e)


def _emit_usage_from_response(response: Any) -> None:
"""Extract and emit token usage from a non-streaming ChatResponse.

Checks usage_details (SDK attribute) and contents for UsageContent items.
"""
try:
# Path 1: response.usage_details (ChatResponse from SDK)
ud = getattr(response, "usage_details", None) or getattr(response, "details", None)
if ud is not None:
inp, out, tot = _extract_tokens_from_dict_or_obj(ud)
_try_emit_token_event(inp, out, tot, "response_usage_details")
return

# Path 2: response.usage direct attribute
usage = getattr(response, "usage", None)
if usage is not None:
inp, out, tot = _extract_tokens_from_dict_or_obj(usage)
_try_emit_token_event(inp, out, tot, "response_usage_attr")
return

# Path 3: contents list with UsageContent
contents = getattr(response, "contents", None)
if contents:
for content in contents:
ctype = getattr(content, "type", None)
if ctype == "usage":
ud = getattr(content, "details", None) or getattr(content, "usage_details", None)
if ud:
inp, out, tot = _extract_tokens_from_dict_or_obj(ud)
_try_emit_token_event(inp, out, tot, "response_contents")
return

# Path 4: messages list with usage content
messages = getattr(response, "messages", None)
if messages:
for msg in messages:
msg_contents = getattr(msg, "contents", None)
if not msg_contents:
continue
for item in msg_contents:
if getattr(item, "type", None) == "usage":
ud = getattr(item, "details", None) or getattr(item, "usage_details", None)
if ud:
inp, out, tot = _extract_tokens_from_dict_or_obj(ud)
_try_emit_token_event(inp, out, tot, "response_msg_contents")
return
except Exception as e:
logger.debug("[TOKEN_RESPONSE] error in emit: %s", e)


def _format_exc_brief(exc: BaseException) -> str:
name = type(exc).__name__
msg = str(exc)
Expand Down Expand Up @@ -78,6 +212,20 @@ def _looks_like_rate_limit(error: BaseException) -> bool:
if isinstance(status, int) and 500 <= status < 600:
return True

# "The model produced invalid content" is a transient error from Azure OpenAI
# when the model output fails content/schema validation — worth retrying.
# "No tool call found" is a 400 error when the conversation has orphaned
# function call outputs with no matching tool call request.
if any(
s in msg
for s in [
"model produced invalid content",
"invalid content",
"no tool call found",
]
):
return True

cause = getattr(error, "__cause__", None)
if cause and cause is not error:
return _looks_like_rate_limit(cause)
Expand Down Expand Up @@ -548,12 +696,15 @@ async def _inner_get_response(
)

try:
return await _retry_call(
response = await _retry_call(
lambda: parent_inner_get_response(
messages=effective_messages, chat_options=chat_options, **kwargs
),
config=self._retry_config,
)
# Extract and emit token usage from non-streaming response
_emit_usage_from_response(response)
return response
except Exception as e:
if not (
self._context_trim_config.enabled
Expand Down Expand Up @@ -643,8 +794,36 @@ async def _tail():
async for item in iterator:
yield item

_item_count = 0
_last_item = None
async for item in _tail():
_item_count += 1
_last_item = item
_emit_usage_from_stream_item(item)
yield item

# After stream completes, log diagnostic about the last item
if _last_item is not None:
try:
_attrs = [a for a in dir(_last_item) if not a.startswith("_")]
_contents = getattr(_last_item, "contents", None)
_content_info = []
if _contents:
for _c in _contents:
_ct = getattr(_c, "type", "?")
_ca = [a for a in dir(_c) if not a.startswith("_")]
_content_info.append({"type": _ct, "attrs": _ca})
_usage_attr = getattr(_last_item, "usage", None)
logger.info(
"[TOKEN_DIAG_FINAL] stream_items=%d last_item_type=%s attrs=%s contents=%s usage_attr=%s",
_item_count,
type(_last_item).__name__,
_attrs,
_content_info,
repr(_usage_attr) if _usage_attr is not None else "None",
)
except Exception:
pass
return
except StopAsyncIteration:
return
Expand Down
Loading