""" LangChain specific utilities for CopilotKit """ import uuid import json import warnings import asyncio from typing import List, Optional, Any, Union, Dict from typing_extensions import TypedDict from langgraph.graph import MessagesState from langchain_core.messages import ( HumanMessage, SystemMessage, BaseMessage, AIMessage, ToolMessage, ) from langchain_core.runnables import RunnableConfig from langchain_core.callbacks.manager import adispatch_custom_event from langgraph.types import interrupt from .types import Message, IntermediateStateConfig from .exc import CopilotKitMisuseError from .logging import get_logger logger = get_logger(__name__) class CopilotContextItem(TypedDict): """Copilot context item""" description: str value: Any class CopilotKitProperties(TypedDict): """CopilotKit state""" actions: List[Any] context: List[CopilotContextItem] # Private state for CopilotKit middleware intercepted_tool_calls: Any original_ai_message_id: Any class CopilotKitState(MessagesState): """CopilotKit state""" copilotkit: CopilotKitProperties def langchain_messages_to_copilotkit(messages: List[BaseMessage]) -> List[Message]: """ Convert LangChain messages to CopilotKit messages """ result = [] tool_call_names = {} for message in messages: if isinstance(message, AIMessage): for tool_call in message.tool_calls or []: tool_call_names[tool_call["id"]] = tool_call["name"] for message in messages: content = None if hasattr(message, "content"): content = message.content # Content can be a list of content blocks (e.g. Anthropic models). # Extract and concatenate all text parts instead of only taking # the first element. if isinstance(content, list): text_parts = [] for part in content: if isinstance(part, str): text_parts.append(part) elif isinstance(part, dict) and part.get("type") == "text": text_parts.append(part.get("text", "")) elif isinstance(part, dict) and "text" in part: text_parts.append(part.get("text", "")) content = "".join(text_parts) # Anthropic models return a dict with a "text" key if isinstance(content, dict): content = content.get("text", "") if isinstance(message, HumanMessage): result.append( { "role": "user", "content": content, "id": message.id, } ) elif isinstance(message, SystemMessage): result.append( { "role": "system", "content": content, "id": message.id, } ) elif isinstance(message, AIMessage): # Always emit the assistant message, even with empty content. # Tool call entries reference it via parentMessageId; omitting it # orphans tool calls and breaks frontend thread reconstruction. result.append( { "role": "assistant", "content": content if content is not None else "", "id": message.id, } ) if message.tool_calls: for tool_call in message.tool_calls: result.append( { "id": tool_call["id"], "name": tool_call["name"], "arguments": tool_call["args"], "parentMessageId": message.id, } ) elif isinstance(message, ToolMessage): result.append( { "actionExecutionId": message.tool_call_id, "actionName": tool_call_names.get( message.tool_call_id, message.name or "" ), "result": content, "id": message.id, } ) # Create a dictionary to map message ids to their corresponding messages results_dict = { msg["actionExecutionId"]: msg for msg in result if "actionExecutionId" in msg } # since we are splitting multiple tool calls into multiple messages, # we need to reorder the corresponding result messages to be after the tool call reordered_result = [] for msg in result: # add all messages that are not tool call results if not "actionExecutionId" in msg: reordered_result.append(msg) # if the message is a tool call, also add the corresponding result message # immediately after the tool call if "arguments" in msg: msg_id = msg["id"] if msg_id in results_dict: reordered_result.append(results_dict[msg_id]) else: logger.warning("Tool call result message not found for id: %s", msg_id) return reordered_result def copilotkit_customize_config( base_config: Optional[RunnableConfig] = None, *, emit_messages: Optional[bool] = None, emit_tool_calls: Optional[Union[bool, str, List[str]]] = None, emit_intermediate_state: Optional[List[IntermediateStateConfig]] = None, emit_all: Optional[bool] = None, # deprecated ) -> RunnableConfig: """ Customize the LangGraph configuration for use in CopilotKit. To install the CopilotKit SDK, run: ```bash pip install copilotkit ``` ### Examples Disable emitting messages and tool calls: ```python from copilotkit.langgraph import copilotkit_customize_config config = copilotkit_customize_config( config, emit_messages=False, emit_tool_calls=False ) ``` To emit a tool call as streaming LangGraph state, pass the destination key in state, the tool name and optionally the tool argument. (If you don't pass the argument name, all arguments are emitted under the state key.) ```python from copilotkit.langgraph import copilotkit_customize_config config = copilotkit_customize_config( config, emit_intermediate_state=[ { "state_key": "steps", "tool": "SearchTool", "tool_argument": "steps" }, ] ) ``` Parameters ---------- base_config : Optional[RunnableConfig] The LangChain/LangGraph configuration to customize. Pass None to make a new configuration. emit_messages : Optional[bool] Configure how messages are emitted. By default, all messages are emitted. Pass False to disable emitting messages. emit_tool_calls : Optional[Union[bool, str, List[str]]] Configure how tool calls are emitted. By default, all tool calls are emitted. Pass False to disable emitting tool calls. Pass a string or list of strings to emit only specific tool calls. emit_intermediate_state : Optional[List[IntermediateStateConfig]] Lets you emit tool calls as streaming LangGraph state. Returns ------- RunnableConfig The customized LangGraph configuration. """ if emit_all is not None: warnings.warn( "The `emit_all` parameter is deprecated and will be removed in a future version. " "CopilotKit will now emit all messages and tool calls by default.", DeprecationWarning, stacklevel=2, ) metadata = base_config.get("metadata", {}) if base_config else {} if emit_all is True: metadata["copilotkit:emit-tool-calls"] = True metadata["copilotkit:emit-messages"] = True else: if emit_tool_calls is not None: metadata["copilotkit:emit-tool-calls"] = emit_tool_calls if emit_messages is not None: metadata["copilotkit:emit-messages"] = emit_messages if emit_intermediate_state: metadata["copilotkit:emit-intermediate-state"] = emit_intermediate_state base_config = base_config or {} return {**base_config, "metadata": metadata} async def copilotkit_exit(config: RunnableConfig): """ Exits the current agent after the run completes. Calling copilotkit_exit() will not immediately stop the agent. Instead, it signals to CopilotKit to stop the agent after the run completes. ### Examples ```python from copilotkit.langgraph import copilotkit_exit def my_node(state: Any): await copilotkit_exit(config) return state ``` Parameters ---------- config : RunnableConfig The LangGraph configuration. Returns ------- Awaitable[bool] Always return True. """ await adispatch_custom_event( "copilotkit_exit", {}, config=config, ) await asyncio.sleep(0.02) return True async def copilotkit_emit_state(config: RunnableConfig, state: Any): """ Emits intermediate state to CopilotKit. Useful if you have a longer running node and you want to update the user with the current state of the node. ### Examples ```python from copilotkit.langgraph import copilotkit_emit_state for i in range(10): await some_long_running_operation(i) await copilotkit_emit_state(config, {"progress": i}) ``` Parameters ---------- config : RunnableConfig The LangGraph configuration. state : Any The state to emit (Must be JSON serializable). Returns ------- Awaitable[bool] Always return True. """ await adispatch_custom_event( "copilotkit_manually_emit_intermediate_state", state, config=config, ) await asyncio.sleep(0.02) return True async def copilotkit_emit_message(config: RunnableConfig, message: str): """ Manually emits a message to CopilotKit. Useful in longer running nodes to update the user. Important: You still need to return the messages from the node. ### Examples ```python from copilotkit.langgraph import copilotkit_emit_message message = "Step 1 of 10 complete" await copilotkit_emit_message(config, message) # Return the message from the node return { "messages": [AIMessage(content=message)] } ``` Parameters ---------- config : RunnableConfig The LangGraph configuration. message : str The message to emit. Returns ------- Awaitable[bool] Always return True. """ await adispatch_custom_event( "copilotkit_manually_emit_message", {"message": message, "message_id": str(uuid.uuid4()), "role": "assistant"}, config=config, ) await asyncio.shield(asyncio.sleep(0.02)) return True async def copilotkit_emit_tool_call( config: RunnableConfig, *, name: str, args: Dict[str, Any], tool_call_id: Optional[str] = None, ) -> str: """ Manually emits a tool call to CopilotKit. ```python from copilotkit.langgraph import copilotkit_emit_tool_call auto_id = await copilotkit_emit_tool_call(config, name="SearchTool", args={"steps": 10}) # With a custom ID for correlation/idempotency: custom_id = await copilotkit_emit_tool_call(config, name="SearchTool", args={"steps": 10}, tool_call_id="my-custom-id") ``` Parameters ---------- config : RunnableConfig The LangGraph configuration. name : str The name of the tool to emit. args : Dict[str, Any] The arguments to emit. tool_call_id : Optional[str] Optional tool call ID. If not provided, a random UUID is generated. When provided, this ID is used as the toolCallId and parentMessageId in AG-UI protocol events. The caller is responsible for ensuring uniqueness. Returns ------- str The tool call ID used for the emitted tool call. """ if not isinstance(name, str) or not name.strip(): raise CopilotKitMisuseError( "Tool name must be a non-empty string for copilotkit_emit_tool_call" ) if tool_call_id is not None: if not isinstance(tool_call_id, str) or not tool_call_id.strip(): raise CopilotKitMisuseError( "Tool call id must be a non-empty string when provided for copilotkit_emit_tool_call" ) else: tool_call_id = str(uuid.uuid4()) try: json.dumps(args) except (TypeError, ValueError) as e: raise CopilotKitMisuseError( f"Tool arguments for '{name}' are not JSON-serializable: {e}" ) from e await adispatch_custom_event( "copilotkit_manually_emit_tool_call", {"name": name, "args": args, "id": tool_call_id}, config=config, ) # LangGraph's adispatch_custom_event is async but does not guarantee the event # has been flushed to the SSE stream before it returns. Without this sleep, # a subsequent emit can interleave and corrupt event ordering on the client. # Shielded so that task cancellation doesn't prevent us from returning the ID. try: await asyncio.shield(asyncio.sleep(0.02)) except asyncio.CancelledError: logger.warning( "copilotkit_emit_tool_call cancelled during post-dispatch flush for " "tool_call_id=%s; event was already dispatched", tool_call_id, ) raise return tool_call_id def copilotkit_interrupt( message: Optional[str] = None, action: Optional[str] = None, args: Optional[Dict[str, Any]] = None, ): if message is None and action is None: raise ValueError( "Either message or action (and optional arguments) must be provided" ) interrupt_message = None interrupt_values = None answer = None if message is not None: interrupt_values = message interrupt_message = AIMessage(content=message, id=str(uuid.uuid4())) else: tool_id = str(uuid.uuid4()) interrupt_message = AIMessage( content="", tool_calls=[{"id": tool_id, "name": action, "args": args or {}}] ) interrupt_values = {"action": action, "args": args or {}} response = interrupt( { "__copilotkit_interrupt_value__": interrupt_values, "__copilotkit_messages__": [interrupt_message], } ) if isinstance(response, str): answer = response elif isinstance(response, dict): answer = json.dumps(response) elif isinstance(response, list): answer = response[-1].content else: answer = str(response) return answer, response