forked from CopilotKit/CopilotKit
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy path_header_forwarding_middleware.py
More file actions
202 lines (173 loc) · 7.51 KB
/
Copy path_header_forwarding_middleware.py
File metadata and controls
202 lines (173 loc) · 7.51 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
"""Minimal header-forwarding-only AgentMiddleware.
Some showcase demos (reasoning, tool-rendering-reasoning-chain, the
sub-agents in `subagents.py`) intentionally avoid the full
`CopilotKitMiddleware` because they don't need its frontend-tool
injection, App-Context surfacing, or state-note features — they're
minimal demos of LangGraph capabilities.
But every showcase request goes through aimock (the locally-served
LLM mock), and aimock requires the ``x-aimock-context`` header (and
friends) on every ``/v1/responses`` and ``/v1/chat/completions``
request to match the right fixture. Without middleware to populate
the header-forwarding ContextVar from the LangGraph RunnableConfig
``configurable``, those requests go out without the header and aimock
returns 404, breaking the demo.
This middleware does ONLY that header propagation — nothing else.
It reuses copilotkit's own primitives (kept private but exported by
the installed package at the module level) so the propagation logic
is identical to the full middleware. No App-Context injection, no
tool-merging, no state-to-prompt surfacing, no Bedrock message
fix-up.
CVDIAG instrumentation (diagnostic only — DOES NOT change WHERE
headers come from): after the existing
``_extract_forwarded_headers_from_config()`` populates copilotkit's
forwarded-headers ContextVar, we read it back via
``get_forwarded_headers()`` and emit a structured ``CVDIAG`` log line
at the configurable-read boundary recording whether
``x-aimock-context`` actually arrived on the LangGraph configurable
channel (``header_present=false`` is the alarm we are hunting). We
also append this layer's hop tag to ``x-diag-hops`` on the SAME
ContextVar the httpx hook already forwards from — so the breadcrumb
and correlation headers (``x-diag-run-id``, ``x-diag-hops``) ride
along on the outbound LLM call exactly the way ``x-aimock-context``
does, without introducing any new forwarding source.
"""
from __future__ import annotations
import logging
from typing import Any, Awaitable, Callable, Dict
from langchain.agents.middleware import (
AgentMiddleware,
AgentState,
ModelRequest,
ModelResponse,
)
# Reuse the installed copilotkit's existing header-forwarding helpers so
# the behaviour stays bit-identical to the full CopilotKitMiddleware's
# header-propagation step. These are module-level functions in
# copilotkit 0.1.94's copilotkit_lg_middleware module.
from copilotkit.copilotkit_lg_middleware import (
_extract_forwarded_headers_from_config,
_ensure_httpx_hook,
)
# CVDIAG-only: read/append the forwarded-header ContextVar copilotkit
# already populates. set_forwarded_headers is used SOLELY to append the
# diagnostic hop breadcrumb onto the SAME channel x-aimock-context rides;
# it does not introduce a new forwarding source.
from copilotkit.header_propagation import (
get_forwarded_headers,
set_forwarded_headers,
)
logger = logging.getLogger(__name__)
_CVDIAG_COMPONENT = "backend-langgraph-py"
_CVDIAG_HOP_TAG = "backend-langgraph-py"
def _cvdiag(
boundary: str,
headers: Dict[str, str],
status: str,
*,
hop: Any = "-",
error: str = "",
) -> None:
"""Emit a single CVDIAG log line in the shared cross-language convention.
Never logs full header values — only a 12-char prefix of
``x-aimock-context``.
"""
slug = headers.get("x-aimock-context")
header_present = isinstance(slug, str) and len(slug) > 0
run_id = headers.get("x-diag-run-id", "none")
test_id = headers.get("x-test-id", "none")
prefix = slug[:12] if header_present else ""
logger.info(
"CVDIAG component=%s boundary=%s run_id=%s slug=%s "
"header_present=%s header_value_prefix=%s hop=%s status=%s "
"test_id=%s error=%s",
_CVDIAG_COMPONENT,
boundary,
run_id,
slug if header_present else "MISSING",
str(header_present).lower(),
prefix,
hop,
status,
test_id,
error,
)
def _instrument_and_breadcrumb() -> None:
"""Read the configurable-read result, log it, and append the diag hop.
Called immediately AFTER
``_extract_forwarded_headers_from_config()`` has populated the
ContextVar. Reads the headers back, emits the configurable-read
CVDIAG line (wrapping the previously-silent "no x-aimock-context in
configurable" case as an alarm), then — only when x-aimock-context
is present — appends this layer's hop tag to ``x-diag-hops`` on the
SAME ContextVar so the breadcrumb rides the existing forwarding path.
"""
headers = dict(get_forwarded_headers())
has_context = (
isinstance(headers.get("x-aimock-context"), str)
and len(headers.get("x-aimock-context", "")) > 0
)
if has_context:
_cvdiag("configurable-read", headers, "ok")
else:
# The alarm we are hunting: the configurable channel reached this
# middleware without x-aimock-context. Surface it instead of the
# previous silent no-op.
_cvdiag(
"configurable-read",
headers,
"miss" if headers else "error",
error="x-aimock-context-absent-in-configurable"
if headers
else "no-forwarded-headers-in-configurable",
)
# Nothing to breadcrumb onto — do not invent a forwarding source.
return
# Append this layer's hop tag to x-diag-hops on the SAME ContextVar the
# httpx hook forwards from. This rides the existing path; no new source.
existing_hops = headers.get("x-diag-hops", "")
headers["x-diag-hops"] = (
f"{existing_hops},{_CVDIAG_HOP_TAG}"
if isinstance(existing_hops, str) and existing_hops
else _CVDIAG_HOP_TAG
)
set_forwarded_headers(headers)
hop = len([h for h in headers["x-diag-hops"].split(",") if h])
_cvdiag("outbound-llm", headers, "ok", hop=hop)
class HeaderForwardingMiddleware(AgentMiddleware[AgentState, Any]):
"""AgentMiddleware that only forwards inbound x-* headers.
Behaviourally a no-op except for two calls inside both
``wrap_model_call`` and ``awrap_model_call``:
1. ``_extract_forwarded_headers_from_config()`` — read the
``x-*`` keys from the active LangGraph RunnableConfig
(``context`` and ``configurable``) and populate the
header-forwarding ContextVar.
2. ``_ensure_httpx_hook(request.model)`` — install copilotkit's
httpx event hook on the model's underlying HTTP client(s)
so the next outgoing LLM request picks the headers up.
No App-Context injection, no tool-merging, no state-surfacing,
no Bedrock message fix-up — strictly header propagation.
CVDIAG: ``_instrument_and_breadcrumb()`` is inserted between the
two steps purely to OBSERVE the configurable-read boundary and tag
the existing breadcrumb. It does not change where headers come from.
"""
@property
def name(self) -> str:
return "HeaderForwardingMiddleware"
def wrap_model_call(
self,
request: ModelRequest,
handler: Callable[[ModelRequest], ModelResponse],
) -> ModelResponse:
_extract_forwarded_headers_from_config()
_instrument_and_breadcrumb()
_ensure_httpx_hook(request.model)
return handler(request)
async def awrap_model_call(
self,
request: ModelRequest,
handler: Callable[[ModelRequest], Awaitable[ModelResponse]],
) -> ModelResponse:
_extract_forwarded_headers_from_config()
_instrument_and_breadcrumb()
_ensure_httpx_hook(request.model)
return await handler(request)