Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -190,14 +190,21 @@ private static Duration resolveCallTimeout(HttpOptions httpOptions) {
public Flowable<LlmResponse> complete(LlmRequest llmRequest, boolean stream) {
return Flowable.defer(
() -> {
String effectiveModelName = llmRequest.model().orElse("?");
logger.trace("Chat Completion Request Contents: {}", llmRequest.contents());
llmRequest.config().ifPresent(c -> logger.trace("Chat Completion Request Config: {}", c));

ChatCompletionsRequest dtoRequest =
ChatCompletionsRequest.fromLlmRequest(llmRequest, stream);
String jsonPayload = objectMapper.writeValueAsString(dtoRequest);
logger.trace(
"Chat Completion Request: model={}, stream={}, messagesCount={}",
dtoRequest.model,
dtoRequest.stream,
dtoRequest.messages != null ? dtoRequest.messages.size() : 0);
logger.trace("Chat Completion Request JSON: {}", jsonPayload);

if (stream) {
logger.debug(
"Sending streaming chat-completion request to model {}", effectiveModelName);
} else {
logger.debug("Sending chat-completion request to model {}", effectiveModelName);
}

Request.Builder requestBuilder =
new Request.Builder().url(completionsUrl).post(RequestBody.create(jsonPayload, JSON));
Expand All @@ -209,11 +216,7 @@ public Flowable<LlmResponse> complete(LlmRequest llmRequest, boolean stream) {
requestBuilder.header("Content-Type", JSON.toString());

Request request = requestBuilder.build();
if (stream) {
return createStreamingFlowable(request);
} else {
return createNonStreamingFlowable(request);
}
return stream ? createStreamingFlowable(request) : createNonStreamingFlowable(request);
});
}

Expand Down Expand Up @@ -274,10 +277,14 @@ public void onResponse(Call call, Response response) {
// A single malformed chunk must not abort the entire stream. Log a
// warning and continue.
try {
logger.trace("Raw streaming chat-completion chunk: {}", data);
ChatCompletionsResponse.ChatCompletionChunk chunk =
objectMapper.readValue(
data, ChatCompletionsResponse.ChatCompletionChunk.class);
ImmutableList<LlmResponse> responses = collection.processChunk(chunk);
if (!responses.isEmpty()) {
logger.trace("Responses to emit: {}", responses);
}
for (LlmResponse resp : responses) {
emitter.onNext(resp);
}
Expand Down Expand Up @@ -341,9 +348,12 @@ public void onResponse(Call call, Response response) {
}

String jsonResponse = body.string();
logger.trace("Raw non-streaming chat-completion response: {}", jsonResponse);
ChatCompletionsResponse.ChatCompletion completion =
objectMapper.readValue(jsonResponse, ChatCompletionsResponse.ChatCompletion.class);
emitter.onNext(completion.toLlmResponse());
LlmResponse llmResponse = completion.toLlmResponse();
logger.trace("Response to emit: {}", llmResponse);
emitter.onNext(llmResponse);
emitter.onComplete();
} catch (Exception e) {
emitter.tryOnError(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.google.adk.JsonBaseModel;
import com.google.adk.models.LlmRequest;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.genai.types.Content;
import com.google.genai.types.FunctionDeclaration;
import com.google.genai.types.FunctionResponse;
Expand Down Expand Up @@ -351,41 +352,43 @@ private static List<Message> processContent(Content content) {
List<ChatCompletionsCommon.ToolCall> toolCalls = new ArrayList<>();
List<Message> toolResponses = new ArrayList<>();
List<String> refusals = new ArrayList<>();

content
.parts()
.ifPresent(
parts -> {
for (Part part : parts) {
if (part.text().isPresent()) {
// Text Parts may carry refusal content prefixed with REFUSAL_PREFIX.
ChatCompletionsCommon.RefusalSplit split =
ChatCompletionsCommon.parseRefusalPrefix(part.text().get());
if (split.content() != null) {
ContentPart textPart = new ContentPart();
textPart.type = "text";
textPart.text = split.content();
contentParts.add(textPart);
}
if (split.refusal() != null) {
refusals.add(split.refusal());
}
} else if (part.inlineData().isPresent()) {
contentParts.add(processInlineDataPart(part));
} else if (part.fileData().isPresent()) {
contentParts.add(processFileDataPart(part));
} else if (part.functionCall().isPresent()) {
toolCalls.add(processFunctionCallPart(part));
} else if (part.functionResponse().isPresent()) {
toolResponses.add(processFunctionResponsePart(part));
} else if (part.executableCode().isPresent()) {
logger.warn("Executable code is not supported in Chat Completion conversion");
} else if (part.codeExecutionResult().isPresent()) {
logger.warn(
"Code execution result is not supported in Chat Completion conversion");
}
}
});
// Capture a message-level thought_signature from the first text Part that carries one.
// This signature must be echoed back on subsequent turns to ensure proper round-tripping.
byte[] textThoughtSignature = null;

if (content.parts().isPresent()) {
for (Part part : content.parts().get()) {
if (part.text().isPresent()) {
// Text Parts may carry refusal content prefixed with REFUSAL_PREFIX.
ChatCompletionsCommon.RefusalSplit split =
ChatCompletionsCommon.parseRefusalPrefix(part.text().get());
if (split.content() != null) {
ContentPart textPart = new ContentPart();
textPart.type = "text";
textPart.text = split.content();
contentParts.add(textPart);
}
if (split.refusal() != null) {
refusals.add(split.refusal());
}
if (textThoughtSignature == null && part.thoughtSignature().isPresent()) {
textThoughtSignature = part.thoughtSignature().get();
}
} else if (part.inlineData().isPresent()) {
contentParts.add(processInlineDataPart(part));
} else if (part.fileData().isPresent()) {
contentParts.add(processFileDataPart(part));
} else if (part.functionCall().isPresent()) {
toolCalls.add(processFunctionCallPart(part));
} else if (part.functionResponse().isPresent()) {
toolResponses.add(processFunctionResponsePart(part));
} else if (part.executableCode().isPresent()) {
logger.warn("Executable code is not supported in Chat Completion conversion");
} else if (part.codeExecutionResult().isPresent()) {
logger.warn("Code execution result is not supported in Chat Completion conversion");
}
}
}

if (!toolResponses.isEmpty()) {
return toolResponses;
Expand All @@ -403,6 +406,14 @@ private static List<Message> processContent(Content content) {
msg.content = new MessageContent(ImmutableList.copyOf(contentParts));
}
}
// Round-trip the message-level thought_signature for assistant text responses.
if (textThoughtSignature != null) {
msg.extraContent =
ImmutableMap.of(
"google",
ImmutableMap.of(
"thought_signature", Base64.getEncoder().encodeToString(textThoughtSignature)));
}
List<Message> messages = new ArrayList<>();
messages.add(msg);
return messages;
Expand Down Expand Up @@ -446,6 +457,10 @@ private static ContentPart processFileDataPart(Part part) {
/**
* Processes a function call part and returns a mapped ToolCall.
*
* <p>If the source {@link Part} carries a {@code thoughtSignature}, it is round-tripped back out
* as a base64-encoded string in {@code extra_content.google.thought_signature} to satisfy
* endpoint requirements.
*
* @param part The input part containing a requested function call or invocation.
* @return The mapped function call tool call.
*/
Expand All @@ -464,6 +479,13 @@ private static ChatCompletionsCommon.ToolCall processFunctionCallPart(Part part)
}
}
toolCall.function = function;
part.thoughtSignature()
.ifPresent(
sigBytes -> {
String sig = Base64.getEncoder().encodeToString(sigBytes);
toolCall.extraContent =
ImmutableMap.of("google", ImmutableMap.of("thought_signature", sig));
});
return toolCall;
}

Expand Down Expand Up @@ -616,6 +638,13 @@ static class Message {

/** See class definition for more details. */
public String refusal;

/**
* Message-level additional parameters used by some providers. Used for round-tripping data like
* {@code extra_content.google.thought_signature}.
*/
@JsonProperty("extra_content")
public Map<String, Object> extraContent;
}

/**
Expand Down
Loading