From 898166aeb3faeb2a178203633e51329e557cebc8 Mon Sep 17 00:00:00 2001 From: Ryley Mao Date: Wed, 14 Jan 2026 15:02:14 -0500 Subject: [PATCH 1/6] fix(a2a): Implement A2aAgentExecutor.cancel() method - Replace NotImplementedError with full cancellation implementation - Add asyncio.Lock to protect _active_tasks from race conditions - Track active tasks by task_id for cancellation support - Publish TaskStatusUpdateEvent with TaskState.failed on cancellation - Add comprehensive unit tests for cancellation scenarios Fixes #4127 --- .../adk/a2a/executor/a2a_agent_executor.py | 123 ++++++++++++++-- .../a2a/executor/test_a2a_agent_executor.py | 138 ++++++++++++++++-- 2 files changed, 236 insertions(+), 25 deletions(-) diff --git a/src/google/adk/a2a/executor/a2a_agent_executor.py b/src/google/adk/a2a/executor/a2a_agent_executor.py index b6880aaa5c..08548ff2e7 100644 --- a/src/google/adk/a2a/executor/a2a_agent_executor.py +++ b/src/google/adk/a2a/executor/a2a_agent_executor.py @@ -14,6 +14,7 @@ from __future__ import annotations +import asyncio from datetime import datetime from datetime import timezone import inspect @@ -87,9 +88,16 @@ def __init__( super().__init__() self._runner = runner self._config = config or A2aAgentExecutorConfig() + # Track active tasks by task_id for cancellation support + self._active_tasks: dict[str, asyncio.Task] = {} + # Lock to protect _active_tasks from race conditions + self._tasks_lock = asyncio.Lock() async def _resolve_runner(self) -> Runner: - """Resolve the runner, handling cases where it's a callable that returns a Runner.""" + """Resolve the runner. + + Handles cases where it's a callable returning a Runner. + """ # If already resolved and cached, return it if isinstance(self._runner, Runner): return self._runner @@ -114,9 +122,68 @@ async def _resolve_runner(self) -> Runner: @override async def cancel(self, context: RequestContext, event_queue: EventQueue): - """Cancel the execution.""" - # TODO: Implement proper cancellation logic if needed - raise NotImplementedError('Cancellation is not supported') + """Cancel the execution of a running task. + + Args: + context: The request context containing the task_id to cancel. + event_queue: The event queue to publish cancellation events to. + + If the task is found and running, it will be cancelled and a cancellation + event will be published. If the task is not found or already completed, + the method will log a warning and return gracefully. + """ + if not context.task_id: + logger.warning('Cannot cancel task: no task_id provided in context') + return + + # Use lock to prevent race conditions with _handle_request cleanup + async with self._tasks_lock: + task = self._active_tasks.get(context.task_id) + if not task: + logger.warning( + 'Task %s not found or already completed', context.task_id + ) + return + + if task.done(): + # Task already completed, clean up + self._active_tasks.pop(context.task_id, None) + logger.info('Task %s already completed', context.task_id) + return + + # Remove from tracking before cancelling to prevent double cleanup + self._active_tasks.pop(context.task_id, None) + + # Cancel the task (outside lock to avoid blocking other operations) + logger.info('Cancelling task %s', context.task_id) + task.cancel() + try: + # Wait for cancellation to complete with timeout + await asyncio.wait_for(task, timeout=1.0) + except (asyncio.CancelledError, asyncio.TimeoutError): + # Expected when task is cancelled or timeout occurs + pass + + # Publish cancellation event + try: + await event_queue.enqueue_event( + TaskStatusUpdateEvent( + task_id=context.task_id, + status=TaskStatus( + state=TaskState.failed, + timestamp=datetime.now(timezone.utc).isoformat(), + message=Message( + message_id=str(uuid.uuid4()), + role=Role.agent, + parts=[TextPart(text='Task was cancelled')], + ), + ), + context_id=context.context_id, + final=True, + ) + ) + except Exception as e: + logger.error('Failed to publish cancellation event: %s', e, exc_info=True) @override async def execute( @@ -221,17 +288,43 @@ async def _handle_request( ) task_result_aggregator = TaskResultAggregator() - async with Aclosing(runner.run_async(**vars(run_request))) as agen: - async for adk_event in agen: - for a2a_event in self._config.event_converter( - adk_event, - invocation_context, - context.task_id, - context.context_id, - self._config.gen_ai_part_converter, - ): - task_result_aggregator.process_event(a2a_event) - await event_queue.enqueue_event(a2a_event) + + # Helper function to iterate over async generator + async def _process_events(): + async with Aclosing(runner.run_async(**vars(run_request))) as agen: + async for adk_event in agen: + for a2a_event in self._config.event_converter( + adk_event, + invocation_context, + context.task_id, + context.context_id, + self._config.gen_ai_part_converter, + ): + task_result_aggregator.process_event(a2a_event) + await event_queue.enqueue_event(a2a_event) + + # Create and track the task for cancellation support + if context.task_id: + task = asyncio.create_task(_process_events()) + # Use lock to prevent race conditions with cancel() + async with self._tasks_lock: + self._active_tasks[context.task_id] = task + try: + await task + except asyncio.CancelledError: + # Task was cancelled + # Note: cancellation event is published by cancel() method, + # so we just log and handle gracefully here + logger.info('Task %s was cancelled', context.task_id) + # Return early - don't publish completion events for cancelled tasks + return + finally: + # Clean up task tracking (use lock to prevent race conditions) + async with self._tasks_lock: + self._active_tasks.pop(context.task_id, None) + else: + # No task_id, run without tracking + await _process_events() # publish the task result event - this is final if ( diff --git a/tests/unittests/a2a/executor/test_a2a_agent_executor.py b/tests/unittests/a2a/executor/test_a2a_agent_executor.py index 58d7521f7d..b640aedebc 100644 --- a/tests/unittests/a2a/executor/test_a2a_agent_executor.py +++ b/tests/unittests/a2a/executor/test_a2a_agent_executor.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import asyncio from unittest.mock import AsyncMock from unittest.mock import Mock from unittest.mock import patch @@ -20,6 +21,7 @@ from a2a.server.events.event_queue import EventQueue from a2a.types import Message from a2a.types import TaskState +from a2a.types import TaskStatusUpdateEvent from a2a.types import TextPart from google.adk.a2a.converters.request_converter import AgentRunRequest from google.adk.a2a.executor.a2a_agent_executor import A2aAgentExecutor @@ -583,22 +585,138 @@ async def test_cancel_with_task_id(self): """Test cancellation with a task ID.""" self.mock_context.task_id = "test-task-id" - # The current implementation raises NotImplementedError - with pytest.raises( - NotImplementedError, match="Cancellation is not supported" - ): - await self.executor.cancel(self.mock_context, self.mock_event_queue) + # Cancel should succeed without raising + await self.executor.cancel(self.mock_context, self.mock_event_queue) + + # If no task is running, should log warning but not raise + # Verify event queue was not called (no task to cancel) + assert self.mock_event_queue.enqueue_event.call_count == 0 @pytest.mark.asyncio async def test_cancel_without_task_id(self): """Test cancellation without a task ID.""" self.mock_context.task_id = None - # The current implementation raises NotImplementedError regardless of task_id - with pytest.raises( - NotImplementedError, match="Cancellation is not supported" - ): - await self.executor.cancel(self.mock_context, self.mock_event_queue) + # Cancel should handle missing task_id gracefully + await self.executor.cancel(self.mock_context, self.mock_event_queue) + + # Should not publish any events when task_id is missing + assert self.mock_event_queue.enqueue_event.call_count == 0 + + @pytest.mark.asyncio + async def test_cancel_running_task(self): + """Test cancellation of a running task.""" + self.mock_context.task_id = "test-task-id" + + # Setup: Create a running task by starting execution + self.mock_request_converter.return_value = AgentRunRequest( + user_id="test-user", + session_id="test-session", + new_message=Mock(spec=Content), + run_config=Mock(spec=RunConfig), + ) + mock_session = Mock() + mock_session.id = "test-session" + self.mock_runner.session_service.get_session = AsyncMock( + return_value=mock_session + ) + mock_invocation_context = Mock() + self.mock_runner._new_invocation_context.return_value = ( + mock_invocation_context + ) + + # Create an async generator that yields events slowly + async def slow_generator(): + mock_event = Mock(spec=Event) + yield mock_event + # This will hang if not cancelled + await asyncio.sleep(10) + + # Replace run_async with the async generator function + self.mock_runner.run_async = slow_generator + self.mock_event_converter.return_value = [] + + # Start execution in background + execute_task = asyncio.create_task( + self.executor.execute(self.mock_context, self.mock_event_queue) + ) + + # Wait a bit to ensure task is running + await asyncio.sleep(0.1) + + # Cancel the task + await self.executor.cancel(self.mock_context, self.mock_event_queue) + + # Wait for cancellation to complete + try: + await asyncio.wait_for(execute_task, timeout=2.0) + except asyncio.CancelledError: + pass + + # Verify cancellation event was published + assert self.mock_event_queue.enqueue_event.call_count > 0 + # Find the cancellation event (should be the last one with failed state) + cancellation_events = [ + call[0][0] + for call in self.mock_event_queue.enqueue_event.call_args_list + if isinstance(call[0][0], TaskStatusUpdateEvent) + and call[0][0].status.state == TaskState.failed + and call[0][0].final is True + ] + assert len(cancellation_events) > 0, "No cancellation event found" + cancellation_event = cancellation_events[-1] + assert cancellation_event.status.state == TaskState.failed + assert cancellation_event.final is True + + @pytest.mark.asyncio + async def test_cancel_nonexistent_task(self): + """Test cancellation of a non-existent task.""" + self.mock_context.task_id = "nonexistent-task-id" + + # Cancel should handle gracefully + await self.executor.cancel(self.mock_context, self.mock_event_queue) + + # Should not publish any events for non-existent task + assert self.mock_event_queue.enqueue_event.call_count == 0 + + @pytest.mark.asyncio + async def test_cancel_completed_task(self): + """Test cancellation of an already completed task.""" + self.mock_context.task_id = "test-task-id" + + # Setup and run a task to completion + self.mock_request_converter.return_value = AgentRunRequest( + user_id="test-user", + session_id="test-session", + new_message=Mock(spec=Content), + run_config=Mock(spec=RunConfig), + ) + mock_session = Mock() + mock_session.id = "test-session" + self.mock_runner.session_service.get_session = AsyncMock( + return_value=mock_session + ) + mock_invocation_context = Mock() + self.mock_runner._new_invocation_context.return_value = ( + mock_invocation_context + ) + + # Create a generator that completes immediately + async def quick_generator(): + mock_event = Mock(spec=Event) + yield mock_event + + self.mock_runner.run_async.return_value = quick_generator() + self.mock_event_converter.return_value = [] + + # Run to completion + await self.executor.execute(self.mock_context, self.mock_event_queue) + + # Now try to cancel (should handle gracefully) + await self.executor.cancel(self.mock_context, self.mock_event_queue) + + # Should not publish additional cancellation event for completed task + # (The execute already published final event) @pytest.mark.asyncio async def test_execute_with_exception_handling(self): From 588a736e6a744d634e0e9f4c9de31f478b7f6403 Mon Sep 17 00:00:00 2001 From: Ryley Mao Date: Wed, 14 Jan 2026 19:49:04 -0500 Subject: [PATCH 2/6] fix(a2a): Handle race condition in cancel() when task completes early Address review feedback: check task.cancel() return value to prevent duplicate final events when task completes between lock release and cancel() call. Added test to verify this race condition is handled. --- PR_DESCRIPTION.md | 86 +++++++++++++++++++ .../adk/a2a/executor/a2a_agent_executor.py | 6 +- .../a2a/executor/test_a2a_agent_executor.py | 38 ++++++++ 3 files changed, 129 insertions(+), 1 deletion(-) create mode 100644 PR_DESCRIPTION.md diff --git a/PR_DESCRIPTION.md b/PR_DESCRIPTION.md new file mode 100644 index 0000000000..2304513f3f --- /dev/null +++ b/PR_DESCRIPTION.md @@ -0,0 +1,86 @@ +### Link to Issue or Description of Change + +**1. Link to an existing issue (if applicable):** + +- Closes: #4127 + +**Problem:** +The `A2aAgentExecutor.cancel()` method was raising `NotImplementedError`, making the CancelTask A2A protocol method completely non-functional in ADK. Any attempt to cancel a running task would fail immediately, preventing clients from canceling tasks through the A2A protocol. + +**Solution:** +Implemented the `cancel()` method with the following changes: + +1. **Task Tracking**: Added `_active_tasks` dictionary to track running tasks by `task_id` for cancellation support. +2. **Race Condition Protection**: Added `asyncio.Lock` (`_tasks_lock`) to protect concurrent access to `_active_tasks`, following ADK patterns used in other components (e.g., `mcp_session_manager.py`, `local_storage.py`). +3. **Cancellation Logic**: + - Modified `_handle_request()` to wrap async generator iteration in an `asyncio.Task` and store it in `_active_tasks`. + - Implemented `cancel()` to lookup the task, cancel it gracefully, and publish a `TaskStatusUpdateEvent` with `TaskState.failed` and "Task was cancelled" message. + - Added proper cleanup in `finally` blocks to remove tasks from tracking. +4. **Edge Case Handling**: Gracefully handles missing `task_id`, non-existent tasks, and already-completed tasks with appropriate logging. + +The implementation follows existing ADK patterns for async task management and ensures thread-safe access to shared state. + +### Testing Plan + +**Unit Tests:** + +- [x] I have added or updated unit tests for my change. +- [x] All unit tests pass locally. + +**pytest Results Summary:** + +All 27 tests in `test_a2a_agent_executor.py` pass, including 6 new cancellation tests: + +``` +tests/unittests/a2a/executor/test_a2a_agent_executor.py::TestA2aAgentExecutor::test_cancel_with_task_id PASSED +tests/unittests/a2a/executor/test_a2a_agent_executor.py::TestA2aAgentExecutor::test_cancel_without_task_id PASSED +tests/unittests/a2a/executor/test_a2a_agent_executor.py::TestA2aAgentExecutor::test_cancel_running_task PASSED +tests/unittests/a2a/executor/test_a2a_agent_executor.py::TestA2aAgentExecutor::test_cancel_nonexistent_task PASSED +tests/unittests/a2a/executor/test_a2a_agent_executor.py::TestA2aAgentExecutor::test_cancel_completed_task PASSED +tests/unittests/a2a/executor/test_a2a_agent_executor.py::TestA2aAgentExecutor::test_cancel_race_condition_task_completes_before_cancel PASSED +``` + +**Test Coverage:** +- `test_cancel_with_task_id`: Verifies successful cancellation when task_id is provided +- `test_cancel_without_task_id`: Verifies graceful handling when task_id is missing +- `test_cancel_running_task`: Verifies cancellation of an actively running task and proper event publishing +- `test_cancel_nonexistent_task`: Verifies graceful handling when task doesn't exist +- `test_cancel_completed_task`: Verifies graceful handling when task is already completed +- `test_cancel_race_condition_task_completes_before_cancel`: Verifies race condition where task completes before cancel() can execute (prevents duplicate final events) + +**Manual End-to-End (E2E) Tests:** + +**Setup:** +1. Ensure you have an A2A-compatible agent configured +2. Start an A2A server with `A2aAgentExecutor` + +**Test Steps:** +1. Start a long-running task via A2A protocol +2. While the task is running, send a CancelTask request with the task's `task_id` +3. Verify that: + - The task is cancelled and execution stops + - A `TaskStatusUpdateEvent` with `TaskState.failed` and message "Task was cancelled" is published + - The event has `final=True` to indicate task completion + - No duplicate cancellation events are published + +**Expected Logs:** +``` +INFO: Cancelling task +INFO: Task was cancelled +``` + +**Verification:** +- Check event queue for `TaskStatusUpdateEvent` with `state=TaskState.failed` and `message="Task was cancelled"` +- Verify task execution stops immediately after cancellation +- Verify no errors are raised when cancelling non-existent or completed tasks + +### Screenshots + +**Unit Test Results - All Cancellation Tests Passing:** +Screenshot 2026-01-14 160554 + +**Implementation - cancel() Method:** +Screenshot 2026-01-14 163417 + +**E2E Test Results - Cancellation Verification:** +Screenshot 2026-01-14 163215 diff --git a/src/google/adk/a2a/executor/a2a_agent_executor.py b/src/google/adk/a2a/executor/a2a_agent_executor.py index 08548ff2e7..4fe328f08d 100644 --- a/src/google/adk/a2a/executor/a2a_agent_executor.py +++ b/src/google/adk/a2a/executor/a2a_agent_executor.py @@ -156,7 +156,11 @@ async def cancel(self, context: RequestContext, event_queue: EventQueue): # Cancel the task (outside lock to avoid blocking other operations) logger.info('Cancelling task %s', context.task_id) - task.cancel() + if not task.cancel(): + # Task completed before it could be cancelled + logger.info('Task %s completed before it could be cancelled', context.task_id) + return + try: # Wait for cancellation to complete with timeout await asyncio.wait_for(task, timeout=1.0) diff --git a/tests/unittests/a2a/executor/test_a2a_agent_executor.py b/tests/unittests/a2a/executor/test_a2a_agent_executor.py index b640aedebc..9322542edf 100644 --- a/tests/unittests/a2a/executor/test_a2a_agent_executor.py +++ b/tests/unittests/a2a/executor/test_a2a_agent_executor.py @@ -718,6 +718,44 @@ async def quick_generator(): # Should not publish additional cancellation event for completed task # (The execute already published final event) + @pytest.mark.asyncio + async def test_cancel_race_condition_task_completes_before_cancel(self): + """Test race condition where task completes before cancel() is called.""" + self.mock_context.task_id = "test-task-id" + + # Create a mock task that is already done + mock_task = Mock(spec=asyncio.Task) + mock_task.done.return_value = False # Initially not done (passes check) + mock_task.cancel.return_value = ( + False # Returns False because task completed between check and cancel + ) + + # Manually add task to _active_tasks to simulate race condition + self.executor._active_tasks["test-task-id"] = mock_task + + # Call cancel + await self.executor.cancel(self.mock_context, self.mock_event_queue) + + # Verify task.cancel() was called + mock_task.cancel.assert_called_once() + + # Verify no cancellation event was published (since cancel() returned False) + # Check that no TaskStatusUpdateEvent with "Task was cancelled" was published + cancellation_events = [ + call[0][0] + for call in self.mock_event_queue.enqueue_event.call_args_list + if isinstance(call[0][0], TaskStatusUpdateEvent) + and call[0][0].status.state == TaskState.failed + and any( + part.text == "Task was cancelled" + for part in call[0][0].status.message.parts + if hasattr(part, "text") + ) + ] + assert ( + len(cancellation_events) == 0 + ), "Should not publish cancellation event when task completed before cancel" + @pytest.mark.asyncio async def test_execute_with_exception_handling(self): """Test execution with exception handling.""" From 222d0974c75bd9aa163d7496b56b68d5ddc0a16a Mon Sep 17 00:00:00 2001 From: Ryley Mao Date: Wed, 14 Jan 2026 20:01:39 -0500 Subject: [PATCH 3/6] chore: Remove PR_DESCRIPTION.md from repo PR description should only exist in GitHub PR, not in repository. --- PR_DESCRIPTION.md | 86 ----------------------------------------------- 1 file changed, 86 deletions(-) delete mode 100644 PR_DESCRIPTION.md diff --git a/PR_DESCRIPTION.md b/PR_DESCRIPTION.md deleted file mode 100644 index 2304513f3f..0000000000 --- a/PR_DESCRIPTION.md +++ /dev/null @@ -1,86 +0,0 @@ -### Link to Issue or Description of Change - -**1. Link to an existing issue (if applicable):** - -- Closes: #4127 - -**Problem:** -The `A2aAgentExecutor.cancel()` method was raising `NotImplementedError`, making the CancelTask A2A protocol method completely non-functional in ADK. Any attempt to cancel a running task would fail immediately, preventing clients from canceling tasks through the A2A protocol. - -**Solution:** -Implemented the `cancel()` method with the following changes: - -1. **Task Tracking**: Added `_active_tasks` dictionary to track running tasks by `task_id` for cancellation support. -2. **Race Condition Protection**: Added `asyncio.Lock` (`_tasks_lock`) to protect concurrent access to `_active_tasks`, following ADK patterns used in other components (e.g., `mcp_session_manager.py`, `local_storage.py`). -3. **Cancellation Logic**: - - Modified `_handle_request()` to wrap async generator iteration in an `asyncio.Task` and store it in `_active_tasks`. - - Implemented `cancel()` to lookup the task, cancel it gracefully, and publish a `TaskStatusUpdateEvent` with `TaskState.failed` and "Task was cancelled" message. - - Added proper cleanup in `finally` blocks to remove tasks from tracking. -4. **Edge Case Handling**: Gracefully handles missing `task_id`, non-existent tasks, and already-completed tasks with appropriate logging. - -The implementation follows existing ADK patterns for async task management and ensures thread-safe access to shared state. - -### Testing Plan - -**Unit Tests:** - -- [x] I have added or updated unit tests for my change. -- [x] All unit tests pass locally. - -**pytest Results Summary:** - -All 27 tests in `test_a2a_agent_executor.py` pass, including 6 new cancellation tests: - -``` -tests/unittests/a2a/executor/test_a2a_agent_executor.py::TestA2aAgentExecutor::test_cancel_with_task_id PASSED -tests/unittests/a2a/executor/test_a2a_agent_executor.py::TestA2aAgentExecutor::test_cancel_without_task_id PASSED -tests/unittests/a2a/executor/test_a2a_agent_executor.py::TestA2aAgentExecutor::test_cancel_running_task PASSED -tests/unittests/a2a/executor/test_a2a_agent_executor.py::TestA2aAgentExecutor::test_cancel_nonexistent_task PASSED -tests/unittests/a2a/executor/test_a2a_agent_executor.py::TestA2aAgentExecutor::test_cancel_completed_task PASSED -tests/unittests/a2a/executor/test_a2a_agent_executor.py::TestA2aAgentExecutor::test_cancel_race_condition_task_completes_before_cancel PASSED -``` - -**Test Coverage:** -- `test_cancel_with_task_id`: Verifies successful cancellation when task_id is provided -- `test_cancel_without_task_id`: Verifies graceful handling when task_id is missing -- `test_cancel_running_task`: Verifies cancellation of an actively running task and proper event publishing -- `test_cancel_nonexistent_task`: Verifies graceful handling when task doesn't exist -- `test_cancel_completed_task`: Verifies graceful handling when task is already completed -- `test_cancel_race_condition_task_completes_before_cancel`: Verifies race condition where task completes before cancel() can execute (prevents duplicate final events) - -**Manual End-to-End (E2E) Tests:** - -**Setup:** -1. Ensure you have an A2A-compatible agent configured -2. Start an A2A server with `A2aAgentExecutor` - -**Test Steps:** -1. Start a long-running task via A2A protocol -2. While the task is running, send a CancelTask request with the task's `task_id` -3. Verify that: - - The task is cancelled and execution stops - - A `TaskStatusUpdateEvent` with `TaskState.failed` and message "Task was cancelled" is published - - The event has `final=True` to indicate task completion - - No duplicate cancellation events are published - -**Expected Logs:** -``` -INFO: Cancelling task -INFO: Task was cancelled -``` - -**Verification:** -- Check event queue for `TaskStatusUpdateEvent` with `state=TaskState.failed` and `message="Task was cancelled"` -- Verify task execution stops immediately after cancellation -- Verify no errors are raised when cancelling non-existent or completed tasks - -### Screenshots - -**Unit Test Results - All Cancellation Tests Passing:** -Screenshot 2026-01-14 160554 - -**Implementation - cancel() Method:** -Screenshot 2026-01-14 163417 - -**E2E Test Results - Cancellation Verification:** -Screenshot 2026-01-14 163215 From 7356af5db58649e8527b007ca149230c14331aac Mon Sep 17 00:00:00 2001 From: Ryley Mao Date: Thu, 15 Jan 2026 18:57:38 -0500 Subject: [PATCH 4/6] fix: Format logger.info to comply with 80 char line limit --- src/google/adk/a2a/executor/a2a_agent_executor.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/google/adk/a2a/executor/a2a_agent_executor.py b/src/google/adk/a2a/executor/a2a_agent_executor.py index 4fe328f08d..82c717cc01 100644 --- a/src/google/adk/a2a/executor/a2a_agent_executor.py +++ b/src/google/adk/a2a/executor/a2a_agent_executor.py @@ -158,7 +158,9 @@ async def cancel(self, context: RequestContext, event_queue: EventQueue): logger.info('Cancelling task %s', context.task_id) if not task.cancel(): # Task completed before it could be cancelled - logger.info('Task %s completed before it could be cancelled', context.task_id) + logger.info( + 'Task %s completed before it could be cancelled', context.task_id + ) return try: From c6cf2b0509d665d391cba7d9b76f26931ec78fcf Mon Sep 17 00:00:00 2001 From: Ryley Mao Date: Thu, 15 Jan 2026 19:06:49 -0500 Subject: [PATCH 5/6] Update src/google/adk/a2a/executor/a2a_agent_executor.py Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- .../adk/a2a/executor/a2a_agent_executor.py | 22 ++++++++----------- 1 file changed, 9 insertions(+), 13 deletions(-) diff --git a/src/google/adk/a2a/executor/a2a_agent_executor.py b/src/google/adk/a2a/executor/a2a_agent_executor.py index 82c717cc01..421097a3b8 100644 --- a/src/google/adk/a2a/executor/a2a_agent_executor.py +++ b/src/google/adk/a2a/executor/a2a_agent_executor.py @@ -138,21 +138,17 @@ async def cancel(self, context: RequestContext, event_queue: EventQueue): # Use lock to prevent race conditions with _handle_request cleanup async with self._tasks_lock: - task = self._active_tasks.get(context.task_id) - if not task: - logger.warning( - 'Task %s not found or already completed', context.task_id - ) - return + task = self._active_tasks.pop(context.task_id, None) - if task.done(): - # Task already completed, clean up - self._active_tasks.pop(context.task_id, None) - logger.info('Task %s already completed', context.task_id) - return + if not task: + logger.warning( + 'Task %s not found or already completed', context.task_id + ) + return - # Remove from tracking before cancelling to prevent double cleanup - self._active_tasks.pop(context.task_id, None) + if task.done(): + logger.info('Task %s already completed', context.task_id) + return # Cancel the task (outside lock to avoid blocking other operations) logger.info('Cancelling task %s', context.task_id) From 34fd04a3bfb99eb1cd3a3f29b38c4d5accee20e6 Mon Sep 17 00:00:00 2001 From: Ryley Mao Date: Fri, 23 Jan 2026 11:19:43 -0500 Subject: [PATCH 6/6] style(a2a): Fix pyink formatting in a2a_agent_executor.py --- src/google/adk/a2a/executor/a2a_agent_executor.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/google/adk/a2a/executor/a2a_agent_executor.py b/src/google/adk/a2a/executor/a2a_agent_executor.py index 421097a3b8..0ae70c361a 100644 --- a/src/google/adk/a2a/executor/a2a_agent_executor.py +++ b/src/google/adk/a2a/executor/a2a_agent_executor.py @@ -141,9 +141,7 @@ async def cancel(self, context: RequestContext, event_queue: EventQueue): task = self._active_tasks.pop(context.task_id, None) if not task: - logger.warning( - 'Task %s not found or already completed', context.task_id - ) + logger.warning('Task %s not found or already completed', context.task_id) return if task.done():