From 65bf643ce43473b5e7a60e5340f167b2baf6d9aa Mon Sep 17 00:00:00 2001 From: SearchSavior Date: Wed, 28 Jan 2026 20:29:50 -0500 Subject: [PATCH] update streamers with cancel --- src/engine/ov_genai/streamers.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/src/engine/ov_genai/streamers.py b/src/engine/ov_genai/streamers.py index a2f67208..d293b12f 100644 --- a/src/engine/ov_genai/streamers.py +++ b/src/engine/ov_genai/streamers.py @@ -21,8 +21,15 @@ def __init__(self, decoder_tokenizer, gen_config: OVGenAI_GenConfig): self.since_last_emit: int = 0 # tokens collected since last emit self.last_print_len: int = 0 # length of decoded text we've already emitted self.text_queue: "asyncio.Queue[Optional[str]]" = asyncio.Queue() + self._cancelled = asyncio.Event() # cancellation flag for thread-safe signaling def write(self, token: Union[int, List[int]]) -> openvino_genai.StreamingStatus: + # Check for cancellation first + if self._cancelled.is_set(): + # Signal completion to the queue so the consumer can exit + self.text_queue.put_nowait(None) + return openvino_genai.StreamingStatus.CANCEL + # Normalize input to a list of ints if isinstance(token, list): self.tokens_cache.extend(token) @@ -44,6 +51,14 @@ def write(self, token: Union[int, List[int]]) -> openvino_genai.StreamingStatus: return openvino_genai.StreamingStatus.RUNNING + def cancel(self) -> None: + """Signal cancellation of the streaming generation.""" + self._cancelled.set() + + def is_cancelled(self) -> bool: + """Check if cancellation has been signaled.""" + return self._cancelled.is_set() + def end(self) -> None: # Flush any remaining tokens at the end text = self.decoder_tokenizer.decode(self.tokens_cache)