Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 127 additions & 0 deletions astrbot/core/provider/sources/anthropic_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,131 @@ def _prepare_payload(self, messages: list[dict]):

return system_prompt, new_messages

@staticmethod
def _merge_consecutive_anthropic_messages(messages: list[Any]) -> list[Any]:
"""Merge adjacent Anthropic messages with the same role.

Args:
messages: Anthropic messages to merge.

Returns:
Merged Anthropic messages. When merging user messages, tool result
blocks are moved before other blocks to satisfy Anthropic ordering.
"""
merged: list[Any] = []
for msg in messages:
if not isinstance(msg, dict):
merged.append(msg)
continue

if (
msg.get("role")
and merged
and isinstance(merged[-1], dict)
and merged[-1].get("role") == msg.get("role")
):
prev = merged[-1]
prev_content = prev.get("content") or []
if isinstance(prev_content, str):
prev_content = [{"type": "text", "text": prev_content}]
elif isinstance(prev_content, list):
prev_content = list(prev_content)
else:
prev_content = [prev_content]

cur_content = msg.get("content") or []
if isinstance(cur_content, str):
cur_content = [{"type": "text", "text": cur_content}]
elif isinstance(cur_content, list):
cur_content = list(cur_content)
else:
cur_content = [cur_content]

combined_content = prev_content + cur_content
if msg.get("role") == "user":
tool_results = [
block
for block in combined_content
if isinstance(block, dict)
and block.get("type") == "tool_result"
]
if tool_results:
combined_content = tool_results + [
block
for block in combined_content
if not (
isinstance(block, dict)
and block.get("type") == "tool_result"
)
]

merged[-1] = {**prev, "content": combined_content}
else:
merged.append(msg)

return merged

@staticmethod
def _sanitize_assistant_messages(payloads: dict) -> None:
"""Remove orphaned tool results from Anthropic messages.

Args:
payloads: Anthropic request payload containing a messages list.

Returns:
None. The messages list is updated in place on ``payloads``.
"""
messages = payloads.get("messages")
if not isinstance(messages, list):
return

merged = ProviderAnthropic._merge_consecutive_anthropic_messages(messages)
sanitized: list[Any] = []
pending_tool_use_ids: set[str] = set()
for msg in merged:
if not isinstance(msg, dict):
sanitized.append(msg)
pending_tool_use_ids = set()
continue

role = msg.get("role")
content = msg.get("content")
if role == "assistant":
pending_tool_use_ids = set()
if isinstance(content, list):
for block in content:
if isinstance(block, dict) and block.get("type") == "tool_use":
tool_use_id = block.get("id")
if tool_use_id:
pending_tool_use_ids.add(tool_use_id)
sanitized.append(msg)
continue

if role == "user" and isinstance(content, list):
tool_results: list[Any] = []
other_blocks: list[Any] = []
for block in content:
if isinstance(block, dict) and block.get("type") == "tool_result":
tool_use_id = block.get("tool_use_id")
if tool_use_id in pending_tool_use_ids:
tool_results.append(block)
pending_tool_use_ids.remove(tool_use_id)
continue
other_blocks.append(block)

cleaned_content = tool_results + other_blocks
if cleaned_content:
sanitized.append({**msg, "content": cleaned_content})
pending_tool_use_ids = set()
continue

sanitized.append(msg)
pending_tool_use_ids = set()

payloads["messages"] = ProviderAnthropic._merge_consecutive_anthropic_messages(
sanitized
)

def _extract_usage(self, usage: Usage | None) -> TokenUsage:
if usage is None:
return TokenUsage()
Expand Down Expand Up @@ -373,6 +498,7 @@ async def _query(
if "max_tokens" not in payloads:
payloads["max_tokens"] = 65536
self._apply_thinking_config(payloads)
self._sanitize_assistant_messages(payloads)

try:
completion = await retry_provider_request(
Expand Down Expand Up @@ -473,6 +599,7 @@ async def _query_stream(
if "max_tokens" not in payloads:
payloads["max_tokens"] = 65536
self._apply_thinking_config(payloads)
self._sanitize_assistant_messages(payloads)

async with retry_provider_request_context(
"Anthropic",
Expand Down
186 changes: 186 additions & 0 deletions tests/test_anthropic_kimi_code_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,192 @@ def test_prepare_payload_does_not_merge_non_consecutive_tool_results():
]


def test_sanitize_assistant_messages_removes_orphaned_tool_results_and_merges():
payloads = {
"messages": [
{
"role": "assistant",
"content": [{"type": "text", "text": "First answer."}],
},
{
"role": "user",
"content": [
{
"type": "tool_result",
"tool_use_id": "missing_call",
"content": "stale result",
}
],
},
{
"role": "assistant",
"content": [{"type": "text", "text": "Second answer."}],
},
]
}

anthropic_source.ProviderAnthropic._sanitize_assistant_messages(payloads)

assert payloads["messages"] == [
{
"role": "assistant",
"content": [
{"type": "text", "text": "First answer."},
{"type": "text", "text": "Second answer."},
],
}
]


def test_sanitize_assistant_messages_keeps_valid_tool_results_only():
payloads = {
"messages": [
{
"role": "assistant",
"content": [
{
"type": "tool_use",
"id": "call_00",
"name": "read_file",
"input": {"path": "/tmp/one.txt"},
},
{
"type": "tool_use",
"id": "",
"name": "bad_tool",
"input": {},
},
],
},
{
"role": "user",
"content": [
{
"type": "tool_result",
"tool_use_id": "call_00",
"content": "one",
},
{
"type": "tool_result",
"tool_use_id": "",
"content": "empty id should not be valid",
},
],
},
]
}

anthropic_source.ProviderAnthropic._sanitize_assistant_messages(payloads)

assert payloads["messages"][1]["content"] == [
{"type": "tool_result", "tool_use_id": "call_00", "content": "one"}
]


def test_sanitize_assistant_messages_removes_stale_duplicate_tool_result():
payloads = {
"messages": [
{
"role": "assistant",
"content": [
{
"type": "tool_use",
"id": "call_00",
"name": "read_file",
"input": {"path": "/tmp/one.txt"},
}
],
},
{
"role": "user",
"content": [
{
"type": "tool_result",
"tool_use_id": "call_00",
"content": "one",
}
],
},
{
"role": "assistant",
"content": [{"type": "text", "text": "Done."}],
},
{
"role": "user",
"content": [
{
"type": "tool_result",
"tool_use_id": "call_00",
"content": "stale duplicate",
}
],
},
]
}

anthropic_source.ProviderAnthropic._sanitize_assistant_messages(payloads)

assert payloads["messages"] == [
{
"role": "assistant",
"content": [
{
"type": "tool_use",
"id": "call_00",
"name": "read_file",
"input": {"path": "/tmp/one.txt"},
}
],
},
{
"role": "user",
"content": [
{"type": "tool_result", "tool_use_id": "call_00", "content": "one"}
],
},
{
"role": "assistant",
"content": [{"type": "text", "text": "Done."}],
},
]


def test_sanitize_assistant_messages_puts_tool_results_before_user_text():
payloads = {
"messages": [
{
"role": "assistant",
"content": [
{
"type": "tool_use",
"id": "call_00",
"name": "read_file",
"input": {"path": "/tmp/one.txt"},
}
],
},
{"role": "user", "content": "continue"},
{
"role": "user",
"content": [
{
"type": "tool_result",
"tool_use_id": "call_00",
"content": "one",
}
],
},
]
}

anthropic_source.ProviderAnthropic._sanitize_assistant_messages(payloads)

assert payloads["messages"][1]["content"] == [
{"type": "tool_result", "tool_use_id": "call_00", "content": "one"},
{"type": "text", "text": "continue"},
]


# ---- tool_choice 转换测试 ----


Expand Down
Loading