From 3ec3eec69883cc941759a106ed28cc8a576c9f0a Mon Sep 17 00:00:00 2001 From: Luyang Wang Date: Sat, 17 Jan 2026 12:41:11 -0500 Subject: [PATCH 01/15] add clone session function --- .../adk/sessions/base_session_service.py | 28 +++ .../adk/sessions/database_session_service.py | 42 ++++ .../adk/sessions/in_memory_session_service.py | 44 +++++ .../adk/sessions/sqlite_session_service.py | 49 +++++ .../adk/sessions/vertex_ai_session_service.py | 35 ++++ .../sessions/test_session_service.py | 184 ++++++++++++++++++ 6 files changed, 382 insertions(+) diff --git a/src/google/adk/sessions/base_session_service.py b/src/google/adk/sessions/base_session_service.py index f2f6f9f22d..9efbc10e4b 100644 --- a/src/google/adk/sessions/base_session_service.py +++ b/src/google/adk/sessions/base_session_service.py @@ -102,6 +102,34 @@ async def delete_session( ) -> None: """Deletes a session.""" + @abc.abstractmethod + async def clone_session( + self, + *, + session: Session, + dst_user_id: Optional[str] = None, + dst_session_id: Optional[str] = None, + ) -> Session: + """Clones a session and its events to a new session. + + Creates a new session with the same events as the source session. + The destination session will have a new ID unless specified. + + Args: + session: The source session to clone. Must include events to be copied. + dst_user_id: The user ID for the destination session. If not provided, + uses the same user_id as the source session. + dst_session_id: The session ID for the destination session. If not + provided, a new ID will be generated. + + Returns: + The newly created session with cloned events. + + Raises: + ValueError: If the source session does not exist. + AlreadyExistsError: If a session with dst_session_id already exists. + """ + async def append_event(self, session: Session, event: Event) -> Event: """Appends an event to a session object.""" if event.partial: diff --git a/src/google/adk/sessions/database_session_service.py b/src/google/adk/sessions/database_session_service.py index 863bbfa861..7daaeb73d8 100644 --- a/src/google/adk/sessions/database_session_service.py +++ b/src/google/adk/sessions/database_session_service.py @@ -415,6 +415,48 @@ async def delete_session( await sql_session.execute(stmt) await sql_session.commit() + @override + async def clone_session( + self, + *, + session: Session, + dst_user_id: Optional[str] = None, + dst_session_id: Optional[str] = None, + ) -> Session: + await self._prepare_tables() + + # Use source values as defaults + dst_user_id = dst_user_id or session.user_id + + # Create the new session (without state to avoid side effects on app/user + # state) + new_session = await self.create_session( + app_name=session.app_name, + user_id=dst_user_id, + state=copy.deepcopy(session.state), + session_id=dst_session_id, + ) + + # Copy all events from source to destination + schema = self._get_schema_classes() + async with self.database_session_factory() as sql_session: + for event in session.events: + # Create a deep copy of the event and assign new IDs + cloned_event = copy.deepcopy(event) + new_storage_event = schema.StorageEvent.from_event( + new_session, cloned_event + ) + sql_session.add(new_storage_event) + + await sql_session.commit() + + # Return the new session with events + return await self.get_session( + app_name=new_session.app_name, + user_id=new_session.user_id, + session_id=new_session.id, + ) + @override async def append_event(self, session: Session, event: Event) -> Event: await self._prepare_tables() diff --git a/src/google/adk/sessions/in_memory_session_service.py b/src/google/adk/sessions/in_memory_session_service.py index 6ba7f0bb01..1f63db1156 100644 --- a/src/google/adk/sessions/in_memory_session_service.py +++ b/src/google/adk/sessions/in_memory_session_service.py @@ -286,6 +286,50 @@ def _delete_session_impl( self.sessions[app_name][user_id].pop(session_id) + @override + async def clone_session( + self, + *, + session: Session, + dst_user_id: Optional[str] = None, + dst_session_id: Optional[str] = None, + ) -> Session: + return self._clone_session_impl( + session=session, + dst_user_id=dst_user_id, + dst_session_id=dst_session_id, + ) + + def _clone_session_impl( + self, + *, + session: Session, + dst_user_id: Optional[str] = None, + dst_session_id: Optional[str] = None, + ) -> Session: + # Use source values as defaults + dst_user_id = dst_user_id or session.user_id + + # Create the new session with copied state + new_session = self._create_session_impl( + app_name=session.app_name, + user_id=dst_user_id, + state=copy.deepcopy(session.state), + session_id=dst_session_id, + ) + + # Get the storage session and copy events + storage_session = self.sessions[session.app_name][dst_user_id][new_session.id] + storage_session.events = copy.deepcopy(session.events) + storage_session.last_update_time = session.last_update_time + + # Return a copy with merged state + return self._get_session_impl( + app_name=new_session.app_name, + user_id=new_session.user_id, + session_id=new_session.id, + ) + @override async def append_event(self, session: Session, event: Event) -> Event: if event.partial: diff --git a/src/google/adk/sessions/sqlite_session_service.py b/src/google/adk/sessions/sqlite_session_service.py index 1d9516ec73..817a756ca0 100644 --- a/src/google/adk/sessions/sqlite_session_service.py +++ b/src/google/adk/sessions/sqlite_session_service.py @@ -356,6 +356,55 @@ async def delete_session( ) await db.commit() + @override + async def clone_session( + self, + *, + session: Session, + dst_user_id: Optional[str] = None, + dst_session_id: Optional[str] = None, + ) -> Session: + # Use source values as defaults + dst_user_id = dst_user_id or session.user_id + + # Create the new session (without state to avoid side effects on app/user + # state) + new_session = await self.create_session( + app_name=session.app_name, + user_id=dst_user_id, + state=copy.deepcopy(session.state), + session_id=dst_session_id, + ) + + # Copy all events from source to destination + async with self._get_db_connection() as db: + for event in session.events: + # Create a deep copy of the event + cloned_event = copy.deepcopy(event) + await db.execute( + """ + INSERT INTO events (id, app_name, user_id, session_id, invocation_id, timestamp, event_data) + VALUES (?, ?, ?, ?, ?, ?, ?) + """, + ( + cloned_event.id, + new_session.app_name, + new_session.user_id, + new_session.id, + cloned_event.invocation_id, + cloned_event.timestamp, + cloned_event.model_dump_json(exclude_none=True), + ), + ) + await db.commit() + + # Return the new session with events + return await self.get_session( + app_name=new_session.app_name, + user_id=new_session.user_id, + session_id=new_session.id, + ) + @override async def append_event(self, session: Session, event: Event) -> Event: if event.partial: diff --git a/src/google/adk/sessions/vertex_ai_session_service.py b/src/google/adk/sessions/vertex_ai_session_service.py index 3f9e514e03..e7a116641a 100644 --- a/src/google/adk/sessions/vertex_ai_session_service.py +++ b/src/google/adk/sessions/vertex_ai_session_service.py @@ -244,6 +244,41 @@ async def delete_session( logger.error('Error deleting session %s: %s', session_id, e) raise + @override + async def clone_session( + self, + *, + session: Session, + dst_user_id: Optional[str] = None, + dst_session_id: Optional[str] = None, + ) -> Session: + if dst_session_id: + raise ValueError( + 'User-provided Session id is not supported for' + ' VertexAISessionService.' + ) + + # Use source values as defaults + dst_user_id = dst_user_id or session.user_id + + # Create the new session with copied state + new_session = await self.create_session( + app_name=session.app_name, + user_id=dst_user_id, + state=session.state, + ) + + # Copy all events from source to destination + for event in session.events: + await self.append_event(new_session, event) + + # Return the new session with events + return await self.get_session( + app_name=new_session.app_name, + user_id=new_session.user_id, + session_id=new_session.id, + ) + @override async def append_event(self, session: Session, event: Event) -> Event: # Update the in-memory session. diff --git a/tests/unittests/sessions/test_session_service.py b/tests/unittests/sessions/test_session_service.py index 96d2f38726..248fc449d3 100644 --- a/tests/unittests/sessions/test_session_service.py +++ b/tests/unittests/sessions/test_session_service.py @@ -603,3 +603,187 @@ async def test_partial_events_are_not_persisted(session_service): app_name=app_name, user_id=user_id, session_id=session.id ) assert len(session_got.events) == 0 + + +@pytest.mark.asyncio +async def test_clone_session_basic(session_service): + """Test basic clone_session functionality.""" + app_name = 'my_app' + user_id = 'user' + + # Create source session with events + source_session = await session_service.create_session( + app_name=app_name, user_id=user_id, state={'key': 'value'} + ) + event1 = Event(invocation_id='inv1', author='user') + event2 = Event(invocation_id='inv2', author='model') + await session_service.append_event(source_session, event1) + await session_service.append_event(source_session, event2) + + # Clone the session + source_session = await session_service.get_session( + app_name=app_name, user_id=user_id, session_id=source_session.id + ) + cloned_session = await session_service.clone_session(session=source_session) + + # Verify the cloned session + assert cloned_session is not None + assert cloned_session.id != source_session.id + assert cloned_session.app_name == source_session.app_name + assert cloned_session.user_id == source_session.user_id + assert cloned_session.state == source_session.state + assert len(cloned_session.events) == 2 + assert cloned_session.events[0].invocation_id == event1.invocation_id + assert cloned_session.events[1].invocation_id == event2.invocation_id + + +@pytest.mark.asyncio +async def test_clone_session_with_different_user_id(session_service): + """Test clone_session with a different destination user_id.""" + app_name = 'my_app' + source_user_id = 'user1' + dest_user_id = 'user2' + + # Create source session + source_session = await session_service.create_session( + app_name=app_name, user_id=source_user_id, state={'key': 'value'} + ) + event = Event(invocation_id='inv1', author='user') + await session_service.append_event(source_session, event) + + # Clone to different user + source_session = await session_service.get_session( + app_name=app_name, user_id=source_user_id, session_id=source_session.id + ) + cloned_session = await session_service.clone_session( + session=source_session, dst_user_id=dest_user_id + ) + + # Verify + assert cloned_session.user_id == dest_user_id + assert cloned_session.app_name == source_session.app_name + assert len(cloned_session.events) == 1 + + # Verify the cloned session is persisted correctly + fetched_session = await session_service.get_session( + app_name=app_name, user_id=dest_user_id, session_id=cloned_session.id + ) + assert fetched_session is not None + assert fetched_session.user_id == dest_user_id + + +@pytest.mark.asyncio +async def test_clone_session_with_custom_session_id(session_service): + """Test clone_session with a custom destination session_id.""" + app_name = 'my_app' + user_id = 'user' + custom_session_id = 'custom_cloned_session' + + # Create source session + source_session = await session_service.create_session( + app_name=app_name, user_id=user_id + ) + event = Event(invocation_id='inv1', author='user') + await session_service.append_event(source_session, event) + + # Clone with custom ID + source_session = await session_service.get_session( + app_name=app_name, user_id=user_id, session_id=source_session.id + ) + cloned_session = await session_service.clone_session( + session=source_session, dst_session_id=custom_session_id + ) + + # Verify + assert cloned_session.id == custom_session_id + assert len(cloned_session.events) == 1 + + +@pytest.mark.asyncio +async def test_clone_session_with_existing_id_raises_error(session_service): + """Test that clone_session raises error if destination session_id exists.""" + app_name = 'my_app' + user_id = 'user' + + # Create source and target sessions + source_session = await session_service.create_session( + app_name=app_name, user_id=user_id, session_id='source' + ) + await session_service.create_session( + app_name=app_name, user_id=user_id, session_id='existing_target' + ) + + # Attempt to clone to existing session ID + with pytest.raises(AlreadyExistsError): + await session_service.clone_session( + session=source_session, dst_session_id='existing_target' + ) + + +@pytest.mark.asyncio +async def test_clone_session_preserves_event_content(session_service): + """Test that clone_session preserves full event content.""" + app_name = 'my_app' + user_id = 'user' + + # Create source session with detailed event + source_session = await session_service.create_session( + app_name=app_name, user_id=user_id + ) + event = Event( + invocation_id='invocation', + author='user', + content=types.Content(role='user', parts=[types.Part(text='test_text')]), + actions=EventActions( + artifact_delta={'file': 0}, + transfer_to_agent='agent', + ), + ) + await session_service.append_event(source_session, event) + + # Clone the session + source_session = await session_service.get_session( + app_name=app_name, user_id=user_id, session_id=source_session.id + ) + cloned_session = await session_service.clone_session(session=source_session) + + # Verify event content is preserved + assert len(cloned_session.events) == 1 + cloned_event = cloned_session.events[0] + assert cloned_event.invocation_id == event.invocation_id + assert cloned_event.author == event.author + assert cloned_event.content == event.content + assert cloned_event.actions.artifact_delta == event.actions.artifact_delta + assert cloned_event.actions.transfer_to_agent == event.actions.transfer_to_agent + + +@pytest.mark.asyncio +async def test_clone_session_does_not_affect_source(session_service): + """Test that cloning does not modify the source session.""" + app_name = 'my_app' + user_id = 'user' + + # Create source session + source_session = await session_service.create_session( + app_name=app_name, user_id=user_id, session_id='source_session' + ) + event = Event(invocation_id='inv1', author='user') + await session_service.append_event(source_session, event) + + original_source = await session_service.get_session( + app_name=app_name, user_id=user_id, session_id='source_session' + ) + original_event_count = len(original_source.events) + + # Clone the session + cloned_session = await session_service.clone_session(session=original_source) + + # Add event to cloned session + new_event = Event(invocation_id='inv2', author='model') + await session_service.append_event(cloned_session, new_event) + + # Verify source is unchanged + source_after_clone = await session_service.get_session( + app_name=app_name, user_id=user_id, session_id='source_session' + ) + assert len(source_after_clone.events) == original_event_count From 0c046521de89d347dcdbd951c6031e43505ed70b Mon Sep 17 00:00:00 2001 From: Luyang Wang Date: Sat, 17 Jan 2026 13:26:56 -0500 Subject: [PATCH 02/15] add new scenariors --- .../adk/sessions/base_session_service.py | 37 +++-- .../adk/sessions/database_session_service.py | 74 +++++++--- .../adk/sessions/in_memory_session_service.py | 88 +++++++++--- .../adk/sessions/sqlite_session_service.py | 96 +++++++++---- .../adk/sessions/vertex_ai_session_service.py | 69 +++++++-- .../sessions/test_session_service.py | 131 +++++++++++++++--- 6 files changed, 380 insertions(+), 115 deletions(-) diff --git a/src/google/adk/sessions/base_session_service.py b/src/google/adk/sessions/base_session_service.py index 9efbc10e4b..8fed2ba68b 100644 --- a/src/google/adk/sessions/base_session_service.py +++ b/src/google/adk/sessions/base_session_service.py @@ -106,28 +106,39 @@ async def delete_session( async def clone_session( self, *, - session: Session, - dst_user_id: Optional[str] = None, - dst_session_id: Optional[str] = None, + app_name: str, + src_user_id: str, + src_session_id: Optional[str] = None, + new_user_id: Optional[str] = None, + new_session_id: Optional[str] = None, ) -> Session: - """Clones a session and its events to a new session. + """Clones session(s) and their events to a new session. + + This method supports two modes: - Creates a new session with the same events as the source session. - The destination session will have a new ID unless specified. + 1. Single session clone: When `src_session_id` is provided, clones that + specific session to the new session. + + 2. All sessions clone: When `src_session_id` is NOT provided, finds all + sessions for `src_user_id` and merges ALL their events into a single + new session. Args: - session: The source session to clone. Must include events to be copied. - dst_user_id: The user ID for the destination session. If not provided, - uses the same user_id as the source session. - dst_session_id: The session ID for the destination session. If not - provided, a new ID will be generated. + app_name: The name of the app. + src_user_id: The source user ID whose session(s) to clone. + src_session_id: The source session ID to clone. If not provided, all + sessions for the source user will be merged into one new session. + new_user_id: The user ID for the new session. If not provided, uses the + same user_id as the source. + new_session_id: The session ID for the new session. If not provided, a + new ID will be auto-generated (UUID4). Returns: The newly created session with cloned events. Raises: - ValueError: If the source session does not exist. - AlreadyExistsError: If a session with dst_session_id already exists. + ValueError: If no source sessions are found. + AlreadyExistsError: If a session with new_session_id already exists. """ async def append_event(self, session: Session, event: Event) -> Event: diff --git a/src/google/adk/sessions/database_session_service.py b/src/google/adk/sessions/database_session_service.py index 7daaeb73d8..91181d948a 100644 --- a/src/google/adk/sessions/database_session_service.py +++ b/src/google/adk/sessions/database_session_service.py @@ -419,34 +419,72 @@ async def delete_session( async def clone_session( self, *, - session: Session, - dst_user_id: Optional[str] = None, - dst_session_id: Optional[str] = None, + app_name: str, + src_user_id: str, + src_session_id: Optional[str] = None, + new_user_id: Optional[str] = None, + new_session_id: Optional[str] = None, ) -> Session: await self._prepare_tables() # Use source values as defaults - dst_user_id = dst_user_id or session.user_id + new_user_id = new_user_id or src_user_id + + # Collect source sessions and their events + source_sessions = [] + if src_session_id: + # Single session clone + session = await self.get_session( + app_name=app_name, + user_id=src_user_id, + session_id=src_session_id, + ) + if not session: + raise ValueError( + f"Source session {src_session_id} not found for user" + f" {src_user_id}." + ) + source_sessions.append(session) + else: + # All sessions clone - get all sessions for the user + list_response = await self.list_sessions( + app_name=app_name, user_id=src_user_id + ) + if not list_response.sessions: + raise ValueError(f"No sessions found for user {src_user_id}.") + # Fetch each session with events + for sess in list_response.sessions: + full_session = await self.get_session( + app_name=app_name, + user_id=src_user_id, + session_id=sess.id, + ) + if full_session: + source_sessions.append(full_session) - # Create the new session (without state to avoid side effects on app/user - # state) + # Merge states from all source sessions + merged_state = {} + for session in source_sessions: + merged_state.update(copy.deepcopy(session.state)) + + # Create the new session (new_session_id=None triggers UUID4 generation) new_session = await self.create_session( - app_name=session.app_name, - user_id=dst_user_id, - state=copy.deepcopy(session.state), - session_id=dst_session_id, + app_name=app_name, + user_id=new_user_id, + state=merged_state, + session_id=new_session_id, ) - # Copy all events from source to destination + # Copy all events from all source sessions to the new session schema = self._get_schema_classes() async with self.database_session_factory() as sql_session: - for event in session.events: - # Create a deep copy of the event and assign new IDs - cloned_event = copy.deepcopy(event) - new_storage_event = schema.StorageEvent.from_event( - new_session, cloned_event - ) - sql_session.add(new_storage_event) + for session in source_sessions: + for event in session.events: + cloned_event = copy.deepcopy(event) + new_storage_event = schema.StorageEvent.from_event( + new_session, cloned_event + ) + sql_session.add(new_storage_event) await sql_session.commit() diff --git a/src/google/adk/sessions/in_memory_session_service.py b/src/google/adk/sessions/in_memory_session_service.py index 1f63db1156..3d87ad00aa 100644 --- a/src/google/adk/sessions/in_memory_session_service.py +++ b/src/google/adk/sessions/in_memory_session_service.py @@ -290,38 +290,88 @@ def _delete_session_impl( async def clone_session( self, *, - session: Session, - dst_user_id: Optional[str] = None, - dst_session_id: Optional[str] = None, + app_name: str, + src_user_id: str, + src_session_id: Optional[str] = None, + new_user_id: Optional[str] = None, + new_session_id: Optional[str] = None, ) -> Session: return self._clone_session_impl( - session=session, - dst_user_id=dst_user_id, - dst_session_id=dst_session_id, + app_name=app_name, + src_user_id=src_user_id, + src_session_id=src_session_id, + new_user_id=new_user_id, + new_session_id=new_session_id, ) def _clone_session_impl( self, *, - session: Session, - dst_user_id: Optional[str] = None, - dst_session_id: Optional[str] = None, + app_name: str, + src_user_id: str, + src_session_id: Optional[str] = None, + new_user_id: Optional[str] = None, + new_session_id: Optional[str] = None, ) -> Session: # Use source values as defaults - dst_user_id = dst_user_id or session.user_id + new_user_id = new_user_id or src_user_id + + # Collect source sessions and their events + source_sessions = [] + if src_session_id: + # Single session clone + session = self._get_session_impl( + app_name=app_name, + user_id=src_user_id, + session_id=src_session_id, + ) + if not session: + raise ValueError( + f'Source session {src_session_id} not found for user' + f' {src_user_id}.' + ) + source_sessions.append(session) + else: + # All sessions clone - get all sessions for the user + list_response = self._list_sessions_impl( + app_name=app_name, user_id=src_user_id + ) + if not list_response.sessions: + raise ValueError(f'No sessions found for user {src_user_id}.') + # Fetch each session with events + for sess in list_response.sessions: + full_session = self._get_session_impl( + app_name=app_name, + user_id=src_user_id, + session_id=sess.id, + ) + if full_session: + source_sessions.append(full_session) - # Create the new session with copied state + # Merge states from all source sessions + merged_state = {} + for session in source_sessions: + merged_state.update(copy.deepcopy(session.state)) + + # Create the new session (new_session_id=None triggers UUID4 generation) new_session = self._create_session_impl( - app_name=session.app_name, - user_id=dst_user_id, - state=copy.deepcopy(session.state), - session_id=dst_session_id, + app_name=app_name, + user_id=new_user_id, + state=merged_state, + session_id=new_session_id, ) - # Get the storage session and copy events - storage_session = self.sessions[session.app_name][dst_user_id][new_session.id] - storage_session.events = copy.deepcopy(session.events) - storage_session.last_update_time = session.last_update_time + # Get the storage session and copy all events from all source sessions + storage_session = self.sessions[app_name][new_user_id][new_session.id] + all_events = [] + latest_update_time = 0.0 + for session in source_sessions: + all_events.extend(copy.deepcopy(session.events)) + if session.last_update_time > latest_update_time: + latest_update_time = session.last_update_time + + storage_session.events = all_events + storage_session.last_update_time = latest_update_time # Return a copy with merged state return self._get_session_impl( diff --git a/src/google/adk/sessions/sqlite_session_service.py b/src/google/adk/sessions/sqlite_session_service.py index 817a756ca0..82accb54d4 100644 --- a/src/google/adk/sessions/sqlite_session_service.py +++ b/src/google/adk/sessions/sqlite_session_service.py @@ -360,42 +360,80 @@ async def delete_session( async def clone_session( self, *, - session: Session, - dst_user_id: Optional[str] = None, - dst_session_id: Optional[str] = None, + app_name: str, + src_user_id: str, + src_session_id: Optional[str] = None, + new_user_id: Optional[str] = None, + new_session_id: Optional[str] = None, ) -> Session: # Use source values as defaults - dst_user_id = dst_user_id or session.user_id + new_user_id = new_user_id or src_user_id + + # Collect source sessions and their events + source_sessions = [] + if src_session_id: + # Single session clone + session = await self.get_session( + app_name=app_name, + user_id=src_user_id, + session_id=src_session_id, + ) + if not session: + raise ValueError( + f"Source session {src_session_id} not found for user" + f" {src_user_id}." + ) + source_sessions.append(session) + else: + # All sessions clone - get all sessions for the user + list_response = await self.list_sessions( + app_name=app_name, user_id=src_user_id + ) + if not list_response.sessions: + raise ValueError(f"No sessions found for user {src_user_id}.") + # Fetch each session with events + for sess in list_response.sessions: + full_session = await self.get_session( + app_name=app_name, + user_id=src_user_id, + session_id=sess.id, + ) + if full_session: + source_sessions.append(full_session) - # Create the new session (without state to avoid side effects on app/user - # state) + # Merge states from all source sessions + merged_state = {} + for session in source_sessions: + merged_state.update(copy.deepcopy(session.state)) + + # Create the new session (new_session_id=None triggers UUID4 generation) new_session = await self.create_session( - app_name=session.app_name, - user_id=dst_user_id, - state=copy.deepcopy(session.state), - session_id=dst_session_id, + app_name=app_name, + user_id=new_user_id, + state=merged_state, + session_id=new_session_id, ) - # Copy all events from source to destination + # Copy all events from all source sessions to the new session async with self._get_db_connection() as db: - for event in session.events: - # Create a deep copy of the event - cloned_event = copy.deepcopy(event) - await db.execute( - """ - INSERT INTO events (id, app_name, user_id, session_id, invocation_id, timestamp, event_data) - VALUES (?, ?, ?, ?, ?, ?, ?) - """, - ( - cloned_event.id, - new_session.app_name, - new_session.user_id, - new_session.id, - cloned_event.invocation_id, - cloned_event.timestamp, - cloned_event.model_dump_json(exclude_none=True), - ), - ) + for session in source_sessions: + for event in session.events: + cloned_event = copy.deepcopy(event) + await db.execute( + """ + INSERT INTO events (id, app_name, user_id, session_id, invocation_id, timestamp, event_data) + VALUES (?, ?, ?, ?, ?, ?, ?) + """, + ( + cloned_event.id, + new_session.app_name, + new_session.user_id, + new_session.id, + cloned_event.invocation_id, + cloned_event.timestamp, + cloned_event.model_dump_json(exclude_none=True), + ), + ) await db.commit() # Return the new session with events diff --git a/src/google/adk/sessions/vertex_ai_session_service.py b/src/google/adk/sessions/vertex_ai_session_service.py index e7a116641a..9d379debf2 100644 --- a/src/google/adk/sessions/vertex_ai_session_service.py +++ b/src/google/adk/sessions/vertex_ai_session_service.py @@ -248,29 +248,70 @@ async def delete_session( async def clone_session( self, *, - session: Session, - dst_user_id: Optional[str] = None, - dst_session_id: Optional[str] = None, + app_name: str, + src_user_id: str, + src_session_id: Optional[str] = None, + new_user_id: Optional[str] = None, + new_session_id: Optional[str] = None, ) -> Session: - if dst_session_id: + if new_session_id: raise ValueError( - 'User-provided Session id is not supported for' - ' VertexAISessionService.' + 'User-provided session id (new_session_id) is not supported for' + ' VertexAISessionService. The session ID is auto-generated by the' + ' Vertex AI backend.' ) # Use source values as defaults - dst_user_id = dst_user_id or session.user_id + new_user_id = new_user_id or src_user_id + + # Collect source sessions and their events + source_sessions = [] + if src_session_id: + # Single session clone + session = await self.get_session( + app_name=app_name, + user_id=src_user_id, + session_id=src_session_id, + ) + if not session: + raise ValueError( + f'Source session {src_session_id} not found for user' + f' {src_user_id}.' + ) + source_sessions.append(session) + else: + # All sessions clone - get all sessions for the user + list_response = await self.list_sessions( + app_name=app_name, user_id=src_user_id + ) + if not list_response.sessions: + raise ValueError(f'No sessions found for user {src_user_id}.') + # Fetch each session with events + for sess in list_response.sessions: + full_session = await self.get_session( + app_name=app_name, + user_id=src_user_id, + session_id=sess.id, + ) + if full_session: + source_sessions.append(full_session) + + # Merge states from all source sessions + merged_state = {} + for session in source_sessions: + merged_state.update(session.state) - # Create the new session with copied state + # Create the new session (ID is auto-generated by Vertex AI backend) new_session = await self.create_session( - app_name=session.app_name, - user_id=dst_user_id, - state=session.state, + app_name=app_name, + user_id=new_user_id, + state=merged_state, ) - # Copy all events from source to destination - for event in session.events: - await self.append_event(new_session, event) + # Copy all events from all source sessions to the new session + for session in source_sessions: + for event in session.events: + await self.append_event(new_session, event) # Return the new session with events return await self.get_session( diff --git a/tests/unittests/sessions/test_session_service.py b/tests/unittests/sessions/test_session_service.py index 248fc449d3..c17e994d70 100644 --- a/tests/unittests/sessions/test_session_service.py +++ b/tests/unittests/sessions/test_session_service.py @@ -607,7 +607,7 @@ async def test_partial_events_are_not_persisted(session_service): @pytest.mark.asyncio async def test_clone_session_basic(session_service): - """Test basic clone_session functionality.""" + """Test basic clone_session functionality with specific session_id.""" app_name = 'my_app' user_id = 'user' @@ -620,18 +620,19 @@ async def test_clone_session_basic(session_service): await session_service.append_event(source_session, event1) await session_service.append_event(source_session, event2) - # Clone the session - source_session = await session_service.get_session( - app_name=app_name, user_id=user_id, session_id=source_session.id + # Clone the session using source identifiers + cloned_session = await session_service.clone_session( + app_name=app_name, + src_user_id=user_id, + src_session_id=source_session.id, ) - cloned_session = await session_service.clone_session(session=source_session) # Verify the cloned session assert cloned_session is not None assert cloned_session.id != source_session.id - assert cloned_session.app_name == source_session.app_name - assert cloned_session.user_id == source_session.user_id - assert cloned_session.state == source_session.state + assert cloned_session.app_name == app_name + assert cloned_session.user_id == user_id + assert cloned_session.state == {'key': 'value'} assert len(cloned_session.events) == 2 assert cloned_session.events[0].invocation_id == event1.invocation_id assert cloned_session.events[1].invocation_id == event2.invocation_id @@ -652,16 +653,16 @@ async def test_clone_session_with_different_user_id(session_service): await session_service.append_event(source_session, event) # Clone to different user - source_session = await session_service.get_session( - app_name=app_name, user_id=source_user_id, session_id=source_session.id - ) cloned_session = await session_service.clone_session( - session=source_session, dst_user_id=dest_user_id + app_name=app_name, + src_user_id=source_user_id, + src_session_id=source_session.id, + new_user_id=dest_user_id, ) # Verify assert cloned_session.user_id == dest_user_id - assert cloned_session.app_name == source_session.app_name + assert cloned_session.app_name == app_name assert len(cloned_session.events) == 1 # Verify the cloned session is persisted correctly @@ -687,11 +688,11 @@ async def test_clone_session_with_custom_session_id(session_service): await session_service.append_event(source_session, event) # Clone with custom ID - source_session = await session_service.get_session( - app_name=app_name, user_id=user_id, session_id=source_session.id - ) cloned_session = await session_service.clone_session( - session=source_session, dst_session_id=custom_session_id + app_name=app_name, + src_user_id=user_id, + src_session_id=source_session.id, + new_session_id=custom_session_id, ) # Verify @@ -716,7 +717,10 @@ async def test_clone_session_with_existing_id_raises_error(session_service): # Attempt to clone to existing session ID with pytest.raises(AlreadyExistsError): await session_service.clone_session( - session=source_session, dst_session_id='existing_target' + app_name=app_name, + src_user_id=user_id, + src_session_id=source_session.id, + new_session_id='existing_target', ) @@ -742,10 +746,11 @@ async def test_clone_session_preserves_event_content(session_service): await session_service.append_event(source_session, event) # Clone the session - source_session = await session_service.get_session( - app_name=app_name, user_id=user_id, session_id=source_session.id + cloned_session = await session_service.clone_session( + app_name=app_name, + src_user_id=user_id, + src_session_id=source_session.id, ) - cloned_session = await session_service.clone_session(session=source_session) # Verify event content is preserved assert len(cloned_session.events) == 1 @@ -776,7 +781,11 @@ async def test_clone_session_does_not_affect_source(session_service): original_event_count = len(original_source.events) # Clone the session - cloned_session = await session_service.clone_session(session=original_source) + cloned_session = await session_service.clone_session( + app_name=app_name, + src_user_id=user_id, + src_session_id='source_session', + ) # Add event to cloned session new_event = Event(invocation_id='inv2', author='model') @@ -787,3 +796,81 @@ async def test_clone_session_does_not_affect_source(session_service): app_name=app_name, user_id=user_id, session_id='source_session' ) assert len(source_after_clone.events) == original_event_count + + +@pytest.mark.asyncio +async def test_clone_all_user_sessions(session_service): + """Test clone_session without src_session_id merges all user sessions.""" + app_name = 'my_app' + source_user_id = 'user1' + dest_user_id = 'user2' + + # Create multiple source sessions for user1 + session1 = await session_service.create_session( + app_name=app_name, + user_id=source_user_id, + session_id='session1', + state={'key1': 'value1'}, + ) + session2 = await session_service.create_session( + app_name=app_name, + user_id=source_user_id, + session_id='session2', + state={'key2': 'value2'}, + ) + + # Add events to each session + event1 = Event(invocation_id='inv1', author='user') + event2 = Event(invocation_id='inv2', author='model') + event3 = Event(invocation_id='inv3', author='user') + await session_service.append_event(session1, event1) + await session_service.append_event(session1, event2) + await session_service.append_event(session2, event3) + + # Clone ALL sessions for user1 to user2 (no src_session_id) + cloned_session = await session_service.clone_session( + app_name=app_name, + src_user_id=source_user_id, + new_user_id=dest_user_id, + new_session_id='merged_session', + ) + + # Verify merged session + assert cloned_session is not None + assert cloned_session.user_id == dest_user_id + assert cloned_session.id == 'merged_session' + # Should have all 3 events from both source sessions + assert len(cloned_session.events) == 3 + # State should be merged from both sessions + assert 'key1' in cloned_session.state + assert 'key2' in cloned_session.state + + +@pytest.mark.asyncio +async def test_clone_session_no_source_raises_error(session_service): + """Test that clone_session raises error if source session not found.""" + app_name = 'my_app' + user_id = 'user' + + # Try to clone non-existent session + with pytest.raises(ValueError, match='not found'): + await session_service.clone_session( + app_name=app_name, + src_user_id=user_id, + src_session_id='non_existent_session', + ) + + +@pytest.mark.asyncio +async def test_clone_all_sessions_no_sessions_raises_error(session_service): + """Test clone_session raises error when user has no sessions.""" + app_name = 'my_app' + user_id = 'user_with_no_sessions' + + # Try to clone all sessions for user with no sessions + with pytest.raises(ValueError, match='No sessions found'): + await session_service.clone_session( + app_name=app_name, + src_user_id=user_id, + # No src_session_id means clone all sessions + ) From 312cf0811bcba4f19c1011d34fb57bca65b890b2 Mon Sep 17 00:00:00 2001 From: Luyang Wang Date: Sat, 17 Jan 2026 15:54:24 -0500 Subject: [PATCH 03/15] add dedup --- .../adk/sessions/base_session_service.py | 3 ++ .../adk/sessions/database_session_service.py | 25 ++++++---- .../adk/sessions/in_memory_session_service.py | 12 +++-- .../adk/sessions/sqlite_session_service.py | 47 +++++++++++-------- .../adk/sessions/vertex_ai_session_service.py | 13 ++++- .../sessions/test_session_service.py | 40 ++++++++++++++++ 6 files changed, 108 insertions(+), 32 deletions(-) diff --git a/src/google/adk/sessions/base_session_service.py b/src/google/adk/sessions/base_session_service.py index 8fed2ba68b..b839c0ee3f 100644 --- a/src/google/adk/sessions/base_session_service.py +++ b/src/google/adk/sessions/base_session_service.py @@ -123,6 +123,9 @@ async def clone_session( sessions for `src_user_id` and merges ALL their events into a single new session. + Events are automatically deduplicated by event ID - only the first + occurrence of each event ID is kept. + Args: app_name: The name of the app. src_user_id: The source user ID whose session(s) to clone. diff --git a/src/google/adk/sessions/database_session_service.py b/src/google/adk/sessions/database_session_service.py index 91181d948a..1b5590acbc 100644 --- a/src/google/adk/sessions/database_session_service.py +++ b/src/google/adk/sessions/database_session_service.py @@ -475,16 +475,25 @@ async def clone_session( session_id=new_session_id, ) - # Copy all events from all source sessions to the new session + # Collect all events, deduplicating by event ID (first occurrence wins) + all_events = [] + seen_event_ids = set() + for session in source_sessions: + for event in session.events: + if event.id in seen_event_ids: + continue + seen_event_ids.add(event.id) + all_events.append(event) + + # Copy events to the new session schema = self._get_schema_classes() async with self.database_session_factory() as sql_session: - for session in source_sessions: - for event in session.events: - cloned_event = copy.deepcopy(event) - new_storage_event = schema.StorageEvent.from_event( - new_session, cloned_event - ) - sql_session.add(new_storage_event) + for event in all_events: + cloned_event = copy.deepcopy(event) + new_storage_event = schema.StorageEvent.from_event( + new_session, cloned_event + ) + sql_session.add(new_storage_event) await sql_session.commit() diff --git a/src/google/adk/sessions/in_memory_session_service.py b/src/google/adk/sessions/in_memory_session_service.py index 3d87ad00aa..35b2afaa95 100644 --- a/src/google/adk/sessions/in_memory_session_service.py +++ b/src/google/adk/sessions/in_memory_session_service.py @@ -361,15 +361,21 @@ def _clone_session_impl( session_id=new_session_id, ) - # Get the storage session and copy all events from all source sessions - storage_session = self.sessions[app_name][new_user_id][new_session.id] + # Collect all events, deduplicating by event ID (first occurrence wins) all_events = [] + seen_event_ids = set() latest_update_time = 0.0 for session in source_sessions: - all_events.extend(copy.deepcopy(session.events)) + for event in session.events: + if event.id in seen_event_ids: + continue + seen_event_ids.add(event.id) + all_events.append(copy.deepcopy(event)) if session.last_update_time > latest_update_time: latest_update_time = session.last_update_time + # Get the storage session and set events + storage_session = self.sessions[app_name][new_user_id][new_session.id] storage_session.events = all_events storage_session.last_update_time = latest_update_time diff --git a/src/google/adk/sessions/sqlite_session_service.py b/src/google/adk/sessions/sqlite_session_service.py index 82accb54d4..eab5aaece1 100644 --- a/src/google/adk/sessions/sqlite_session_service.py +++ b/src/google/adk/sessions/sqlite_session_service.py @@ -414,26 +414,35 @@ async def clone_session( session_id=new_session_id, ) - # Copy all events from all source sessions to the new session + # Collect all events, deduplicating by event ID (first occurrence wins) + all_events = [] + seen_event_ids = set() + for session in source_sessions: + for event in session.events: + if event.id in seen_event_ids: + continue + seen_event_ids.add(event.id) + all_events.append(event) + + # Copy events to the new session async with self._get_db_connection() as db: - for session in source_sessions: - for event in session.events: - cloned_event = copy.deepcopy(event) - await db.execute( - """ - INSERT INTO events (id, app_name, user_id, session_id, invocation_id, timestamp, event_data) - VALUES (?, ?, ?, ?, ?, ?, ?) - """, - ( - cloned_event.id, - new_session.app_name, - new_session.user_id, - new_session.id, - cloned_event.invocation_id, - cloned_event.timestamp, - cloned_event.model_dump_json(exclude_none=True), - ), - ) + for event in all_events: + cloned_event = copy.deepcopy(event) + await db.execute( + """ + INSERT INTO events (id, app_name, user_id, session_id, invocation_id, timestamp, event_data) + VALUES (?, ?, ?, ?, ?, ?, ?) + """, + ( + cloned_event.id, + new_session.app_name, + new_session.user_id, + new_session.id, + cloned_event.invocation_id, + cloned_event.timestamp, + cloned_event.model_dump_json(exclude_none=True), + ), + ) await db.commit() # Return the new session with events diff --git a/src/google/adk/sessions/vertex_ai_session_service.py b/src/google/adk/sessions/vertex_ai_session_service.py index 9d379debf2..7a2a56b077 100644 --- a/src/google/adk/sessions/vertex_ai_session_service.py +++ b/src/google/adk/sessions/vertex_ai_session_service.py @@ -308,10 +308,19 @@ async def clone_session( state=merged_state, ) - # Copy all events from all source sessions to the new session + # Collect all events, deduplicating by event ID (first occurrence wins) + all_events = [] + seen_event_ids = set() for session in source_sessions: for event in session.events: - await self.append_event(new_session, event) + if event.id in seen_event_ids: + continue + seen_event_ids.add(event.id) + all_events.append(event) + + # Copy events to the new session + for event in all_events: + await self.append_event(new_session, event) # Return the new session with events return await self.get_session( diff --git a/tests/unittests/sessions/test_session_service.py b/tests/unittests/sessions/test_session_service.py index c17e994d70..e1f1789954 100644 --- a/tests/unittests/sessions/test_session_service.py +++ b/tests/unittests/sessions/test_session_service.py @@ -874,3 +874,43 @@ async def test_clone_all_sessions_no_sessions_raises_error(session_service): src_user_id=user_id, # No src_session_id means clone all sessions ) + + +@pytest.mark.asyncio +async def test_clone_session_deduplicates_events(session_service): + """Test clone_session automatically deduplicates events by ID.""" + app_name = 'my_app' + user_id = 'user' + + # Create two sessions with some events having the same ID + session1 = await session_service.create_session( + app_name=app_name, user_id=user_id, session_id='session1' + ) + session2 = await session_service.create_session( + app_name=app_name, user_id=user_id, session_id='session2' + ) + + # Create events - event1 and event3 have the same ID (duplicate) + event1 = Event(id='shared_event_id', invocation_id='inv1', author='user') + event2 = Event(id='unique_event_1', invocation_id='inv2', author='model') + event3 = Event(id='shared_event_id', invocation_id='inv3', author='user') + event4 = Event(id='unique_event_2', invocation_id='inv4', author='model') + + await session_service.append_event(session1, event1) + await session_service.append_event(session1, event2) + await session_service.append_event(session2, event3) + await session_service.append_event(session2, event4) + + # Clone - should have 3 events (duplicate automatically removed) + cloned_session = await session_service.clone_session( + app_name=app_name, + src_user_id=user_id, + ) + assert len(cloned_session.events) == 3 + # Verify the unique event IDs + event_ids = [e.id for e in cloned_session.events] + assert 'shared_event_id' in event_ids + assert 'unique_event_1' in event_ids + assert 'unique_event_2' in event_ids + # Count occurrences - shared_event_id should appear only once + assert event_ids.count('shared_event_id') == 1 From 063c0669bafa2990e3d28d0700c9ef6c1ce31703 Mon Sep 17 00:00:00 2001 From: Luyang Wang Date: Sat, 17 Jan 2026 16:21:21 -0500 Subject: [PATCH 04/15] address feedback --- .../adk/sessions/database_session_service.py | 9 +++------ .../adk/sessions/in_memory_session_service.py | 10 ++++------ .../adk/sessions/sqlite_session_service.py | 9 +++------ .../adk/sessions/vertex_ai_session_service.py | 18 ++++++++---------- 4 files changed, 18 insertions(+), 28 deletions(-) diff --git a/src/google/adk/sessions/database_session_service.py b/src/google/adk/sessions/database_session_service.py index 1b5590acbc..d911c12a71 100644 --- a/src/google/adk/sessions/database_session_service.py +++ b/src/google/adk/sessions/database_session_service.py @@ -497,12 +497,9 @@ async def clone_session( await sql_session.commit() - # Return the new session with events - return await self.get_session( - app_name=new_session.app_name, - user_id=new_session.user_id, - session_id=new_session.id, - ) + # Return the new session with events (avoid redundant DB query) + new_session.events = all_events + return new_session @override async def append_event(self, session: Session, event: Event) -> Event: diff --git a/src/google/adk/sessions/in_memory_session_service.py b/src/google/adk/sessions/in_memory_session_service.py index 35b2afaa95..86f95b27d1 100644 --- a/src/google/adk/sessions/in_memory_session_service.py +++ b/src/google/adk/sessions/in_memory_session_service.py @@ -379,12 +379,10 @@ def _clone_session_impl( storage_session.events = all_events storage_session.last_update_time = latest_update_time - # Return a copy with merged state - return self._get_session_impl( - app_name=new_session.app_name, - user_id=new_session.user_id, - session_id=new_session.id, - ) + # Return the new session with events (avoid redundant lookup) + new_session.events = all_events + new_session.last_update_time = latest_update_time + return new_session @override async def append_event(self, session: Session, event: Event) -> Event: diff --git a/src/google/adk/sessions/sqlite_session_service.py b/src/google/adk/sessions/sqlite_session_service.py index eab5aaece1..1147ac0c29 100644 --- a/src/google/adk/sessions/sqlite_session_service.py +++ b/src/google/adk/sessions/sqlite_session_service.py @@ -445,12 +445,9 @@ async def clone_session( ) await db.commit() - # Return the new session with events - return await self.get_session( - app_name=new_session.app_name, - user_id=new_session.user_id, - session_id=new_session.id, - ) + # Return the new session with events (avoid redundant DB query) + new_session.events = all_events + return new_session @override async def append_event(self, session: Session, event: Event) -> Event: diff --git a/src/google/adk/sessions/vertex_ai_session_service.py b/src/google/adk/sessions/vertex_ai_session_service.py index 7a2a56b077..1060ae090c 100644 --- a/src/google/adk/sessions/vertex_ai_session_service.py +++ b/src/google/adk/sessions/vertex_ai_session_service.py @@ -14,6 +14,7 @@ from __future__ import annotations import asyncio +import copy import datetime import json import logging @@ -296,10 +297,10 @@ async def clone_session( if full_session: source_sessions.append(full_session) - # Merge states from all source sessions + # Merge states from all source sessions (deep copy to avoid mutations) merged_state = {} for session in source_sessions: - merged_state.update(session.state) + merged_state.update(copy.deepcopy(session.state)) # Create the new session (ID is auto-generated by Vertex AI backend) new_session = await self.create_session( @@ -318,16 +319,13 @@ async def clone_session( seen_event_ids.add(event.id) all_events.append(event) - # Copy events to the new session + # Copy events to the new session (deep copy to avoid mutations) for event in all_events: - await self.append_event(new_session, event) + await self.append_event(new_session, copy.deepcopy(event)) - # Return the new session with events - return await self.get_session( - app_name=new_session.app_name, - user_id=new_session.user_id, - session_id=new_session.id, - ) + # Return the new session with events (already populated via append_event) + new_session.events = all_events + return new_session @override async def append_event(self, session: Session, event: Event) -> Event: From 12b994c126090421691d9cdab79df7010b08f83c Mon Sep 17 00:00:00 2001 From: Luyang Wang Date: Sat, 17 Jan 2026 20:32:01 -0500 Subject: [PATCH 05/15] address n+1 query problem --- .../adk/sessions/database_session_service.py | 42 ++++++++++++++----- 1 file changed, 31 insertions(+), 11 deletions(-) diff --git a/src/google/adk/sessions/database_session_service.py b/src/google/adk/sessions/database_session_service.py index d911c12a71..7e163a9fab 100644 --- a/src/google/adk/sessions/database_session_service.py +++ b/src/google/adk/sessions/database_session_service.py @@ -430,10 +430,12 @@ async def clone_session( # Use source values as defaults new_user_id = new_user_id or src_user_id + schema = self._get_schema_classes() + # Collect source sessions and their events source_sessions = [] if src_session_id: - # Single session clone + # Single session clone - use get_session (no N+1 issue) session = await self.get_session( app_name=app_name, user_id=src_user_id, @@ -446,21 +448,40 @@ async def clone_session( ) source_sessions.append(session) else: - # All sessions clone - get all sessions for the user + # All sessions clone - optimized to avoid N+1 query problem + # Step 1: Get all sessions with state (no events) list_response = await self.list_sessions( app_name=app_name, user_id=src_user_id ) if not list_response.sessions: raise ValueError(f"No sessions found for user {src_user_id}.") - # Fetch each session with events - for sess in list_response.sessions: - full_session = await self.get_session( - app_name=app_name, - user_id=src_user_id, - session_id=sess.id, + + session_ids = [sess.id for sess in list_response.sessions] + + # Step 2: Fetch ALL events for all session IDs in a single query + async with self.database_session_factory() as sql_session: + stmt = ( + select(schema.StorageEvent) + .filter(schema.StorageEvent.app_name == app_name) + .filter(schema.StorageEvent.user_id == src_user_id) + .filter(schema.StorageEvent.session_id.in_(session_ids)) + .order_by(schema.StorageEvent.timestamp.asc()) ) - if full_session: - source_sessions.append(full_session) + result = await sql_session.execute(stmt) + all_storage_events = result.scalars().all() + + # Step 3: Map events back to sessions + events_by_session_id = {} + for storage_event in all_storage_events: + sid = storage_event.session_id + if sid not in events_by_session_id: + events_by_session_id[sid] = [] + events_by_session_id[sid].append(storage_event.to_event()) + + # Build full session objects with events + for sess in list_response.sessions: + sess.events = events_by_session_id.get(sess.id, []) + source_sessions.append(sess) # Merge states from all source sessions merged_state = {} @@ -486,7 +507,6 @@ async def clone_session( all_events.append(event) # Copy events to the new session - schema = self._get_schema_classes() async with self.database_session_factory() as sql_session: for event in all_events: cloned_event = copy.deepcopy(event) From 31d085805b91f93c065496690f1b223770914bff Mon Sep 17 00:00:00 2001 From: Luyang Wang Date: Sat, 17 Jan 2026 20:36:13 -0500 Subject: [PATCH 06/15] n+1 fix --- .../adk/sessions/in_memory_session_service.py | 31 +++++++------- .../adk/sessions/sqlite_session_service.py | 40 ++++++++++++++----- .../adk/sessions/vertex_ai_session_service.py | 22 +++++----- 3 files changed, 60 insertions(+), 33 deletions(-) diff --git a/src/google/adk/sessions/in_memory_session_service.py b/src/google/adk/sessions/in_memory_session_service.py index 86f95b27d1..b88e2dbd27 100644 --- a/src/google/adk/sessions/in_memory_session_service.py +++ b/src/google/adk/sessions/in_memory_session_service.py @@ -332,21 +332,24 @@ def _clone_session_impl( ) source_sessions.append(session) else: - # All sessions clone - get all sessions for the user - list_response = self._list_sessions_impl( - app_name=app_name, user_id=src_user_id - ) - if not list_response.sessions: + # All sessions clone - optimized direct access to avoid N+1 lookups + if ( + app_name not in self.sessions + or src_user_id not in self.sessions[app_name] + ): raise ValueError(f'No sessions found for user {src_user_id}.') - # Fetch each session with events - for sess in list_response.sessions: - full_session = self._get_session_impl( - app_name=app_name, - user_id=src_user_id, - session_id=sess.id, - ) - if full_session: - source_sessions.append(full_session) + + user_sessions = self.sessions[app_name][src_user_id] + if not user_sessions: + raise ValueError(f'No sessions found for user {src_user_id}.') + + # Directly access storage sessions and build full session objects + for session_id, storage_session in user_sessions.items(): + # Deep copy the session to avoid mutations + copied_session = copy.deepcopy(storage_session) + # Merge state with app and user state + copied_session = self._merge_state(app_name, src_user_id, copied_session) + source_sessions.append(copied_session) # Merge states from all source sessions merged_state = {} diff --git a/src/google/adk/sessions/sqlite_session_service.py b/src/google/adk/sessions/sqlite_session_service.py index 1147ac0c29..8e285426e3 100644 --- a/src/google/adk/sessions/sqlite_session_service.py +++ b/src/google/adk/sessions/sqlite_session_service.py @@ -372,7 +372,7 @@ async def clone_session( # Collect source sessions and their events source_sessions = [] if src_session_id: - # Single session clone + # Single session clone - use get_session (no N+1 issue) session = await self.get_session( app_name=app_name, user_id=src_user_id, @@ -385,21 +385,41 @@ async def clone_session( ) source_sessions.append(session) else: - # All sessions clone - get all sessions for the user + # All sessions clone - optimized to avoid N+1 query problem + # Step 1: Get all sessions with state (no events) list_response = await self.list_sessions( app_name=app_name, user_id=src_user_id ) if not list_response.sessions: raise ValueError(f"No sessions found for user {src_user_id}.") - # Fetch each session with events - for sess in list_response.sessions: - full_session = await self.get_session( - app_name=app_name, - user_id=src_user_id, - session_id=sess.id, + + session_ids = [sess.id for sess in list_response.sessions] + + # Step 2: Fetch ALL events for all session IDs in a single query + async with self._get_db_connection() as db: + placeholders = ",".join("?" * len(session_ids)) + query = f""" + SELECT session_id, event_data FROM events + WHERE app_name=? AND user_id=? AND session_id IN ({placeholders}) + ORDER BY timestamp ASC + """ + params = [app_name, src_user_id] + session_ids + event_rows = await db.execute_fetchall(query, params) + + # Step 3: Map events back to sessions + events_by_session_id = {} + for row in event_rows: + sid = row["session_id"] + if sid not in events_by_session_id: + events_by_session_id[sid] = [] + events_by_session_id[sid].append( + Event.model_validate_json(row["event_data"]) ) - if full_session: - source_sessions.append(full_session) + + # Build full session objects with events + for sess in list_response.sessions: + sess.events = events_by_session_id.get(sess.id, []) + source_sessions.append(sess) # Merge states from all source sessions merged_state = {} diff --git a/src/google/adk/sessions/vertex_ai_session_service.py b/src/google/adk/sessions/vertex_ai_session_service.py index 1060ae090c..361aef457b 100644 --- a/src/google/adk/sessions/vertex_ai_session_service.py +++ b/src/google/adk/sessions/vertex_ai_session_service.py @@ -287,15 +287,19 @@ async def clone_session( ) if not list_response.sessions: raise ValueError(f'No sessions found for user {src_user_id}.') - # Fetch each session with events - for sess in list_response.sessions: - full_session = await self.get_session( - app_name=app_name, - user_id=src_user_id, - session_id=sess.id, - ) - if full_session: - source_sessions.append(full_session) + + # Fetch all sessions with events in parallel using asyncio.gather + # (Vertex AI API doesn't support batch retrieval, so we parallelize) + fetch_tasks = [ + self.get_session( + app_name=app_name, + user_id=src_user_id, + session_id=sess.id, + ) + for sess in list_response.sessions + ] + fetched_sessions = await asyncio.gather(*fetch_tasks) + source_sessions = [s for s in fetched_sessions if s is not None] # Merge states from all source sessions (deep copy to avoid mutations) merged_state = {} From cb1e3a6f2f66376624d0f03b9d2b00f7f79bb25f Mon Sep 17 00:00:00 2001 From: Luyang Wang Date: Sat, 17 Jan 2026 21:00:44 -0500 Subject: [PATCH 07/15] add sorting --- .../adk/sessions/database_session_service.py | 36 +++++++----- .../adk/sessions/in_memory_session_service.py | 30 ++++++---- .../adk/sessions/sqlite_session_service.py | 56 +++++++++++-------- .../adk/sessions/vertex_ai_session_service.py | 22 +++++--- 4 files changed, 89 insertions(+), 55 deletions(-) diff --git a/src/google/adk/sessions/database_session_service.py b/src/google/adk/sessions/database_session_service.py index 7e163a9fab..7d4ebb4c28 100644 --- a/src/google/adk/sessions/database_session_service.py +++ b/src/google/adk/sessions/database_session_service.py @@ -483,6 +483,9 @@ async def clone_session( sess.events = events_by_session_id.get(sess.id, []) source_sessions.append(sess) + # Sort sessions by update time for deterministic state merging + source_sessions.sort(key=lambda s: s.last_update_time) + # Merge states from all source sessions merged_state = {} for session in source_sessions: @@ -496,25 +499,28 @@ async def clone_session( session_id=new_session_id, ) - # Collect all events, deduplicating by event ID (first occurrence wins) + # Collect all events, sort by timestamp, then deduplicate + # to ensure chronological "first occurrence wins" + all_source_events = [] + for session in source_sessions: + all_source_events.extend(session.events) + all_source_events.sort(key=lambda e: e.timestamp) + all_events = [] seen_event_ids = set() - for session in source_sessions: - for event in session.events: - if event.id in seen_event_ids: - continue - seen_event_ids.add(event.id) - all_events.append(event) + for event in all_source_events: + if event.id in seen_event_ids: + continue + seen_event_ids.add(event.id) + all_events.append(event) - # Copy events to the new session + # Copy events to the new session using bulk insert async with self.database_session_factory() as sql_session: - for event in all_events: - cloned_event = copy.deepcopy(event) - new_storage_event = schema.StorageEvent.from_event( - new_session, cloned_event - ) - sql_session.add(new_storage_event) - + new_storage_events = [ + schema.StorageEvent.from_event(new_session, copy.deepcopy(event)) + for event in all_events + ] + sql_session.add_all(new_storage_events) await sql_session.commit() # Return the new session with events (avoid redundant DB query) diff --git a/src/google/adk/sessions/in_memory_session_service.py b/src/google/adk/sessions/in_memory_session_service.py index b88e2dbd27..1ac3ef44b4 100644 --- a/src/google/adk/sessions/in_memory_session_service.py +++ b/src/google/adk/sessions/in_memory_session_service.py @@ -351,6 +351,9 @@ def _clone_session_impl( copied_session = self._merge_state(app_name, src_user_id, copied_session) source_sessions.append(copied_session) + # Sort sessions by update time for deterministic state merging + source_sessions.sort(key=lambda s: s.last_update_time) + # Merge states from all source sessions merged_state = {} for session in source_sessions: @@ -364,18 +367,25 @@ def _clone_session_impl( session_id=new_session_id, ) - # Collect all events, deduplicating by event ID (first occurrence wins) + # Collect all events, sort by timestamp, then deduplicate + # to ensure chronological "first occurrence wins" + all_source_events = [] + for session in source_sessions: + all_source_events.extend(session.events) + all_source_events.sort(key=lambda e: e.timestamp) + all_events = [] seen_event_ids = set() - latest_update_time = 0.0 - for session in source_sessions: - for event in session.events: - if event.id in seen_event_ids: - continue - seen_event_ids.add(event.id) - all_events.append(copy.deepcopy(event)) - if session.last_update_time > latest_update_time: - latest_update_time = session.last_update_time + for event in all_source_events: + if event.id in seen_event_ids: + continue + seen_event_ids.add(event.id) + all_events.append(copy.deepcopy(event)) + + # Get latest update time from sorted sessions + latest_update_time = ( + source_sessions[-1].last_update_time if source_sessions else 0.0 + ) # Get the storage session and set events storage_session = self.sessions[app_name][new_user_id][new_session.id] diff --git a/src/google/adk/sessions/sqlite_session_service.py b/src/google/adk/sessions/sqlite_session_service.py index 8e285426e3..86977d2bda 100644 --- a/src/google/adk/sessions/sqlite_session_service.py +++ b/src/google/adk/sessions/sqlite_session_service.py @@ -421,6 +421,9 @@ async def clone_session( sess.events = events_by_session_id.get(sess.id, []) source_sessions.append(sess) + # Sort sessions by update time for deterministic state merging + source_sessions.sort(key=lambda s: s.last_update_time) + # Merge states from all source sessions merged_state = {} for session in source_sessions: @@ -434,35 +437,42 @@ async def clone_session( session_id=new_session_id, ) - # Collect all events, deduplicating by event ID (first occurrence wins) + # Collect all events, sort by timestamp, then deduplicate + # to ensure chronological "first occurrence wins" + all_source_events = [] + for session in source_sessions: + all_source_events.extend(session.events) + all_source_events.sort(key=lambda e: e.timestamp) + all_events = [] seen_event_ids = set() - for session in source_sessions: - for event in session.events: - if event.id in seen_event_ids: - continue - seen_event_ids.add(event.id) - all_events.append(event) + for event in all_source_events: + if event.id in seen_event_ids: + continue + seen_event_ids.add(event.id) + all_events.append(event) - # Copy events to the new session + # Copy events to the new session using bulk insert async with self._get_db_connection() as db: + event_params = [] for event in all_events: cloned_event = copy.deepcopy(event) - await db.execute( - """ - INSERT INTO events (id, app_name, user_id, session_id, invocation_id, timestamp, event_data) - VALUES (?, ?, ?, ?, ?, ?, ?) - """, - ( - cloned_event.id, - new_session.app_name, - new_session.user_id, - new_session.id, - cloned_event.invocation_id, - cloned_event.timestamp, - cloned_event.model_dump_json(exclude_none=True), - ), - ) + event_params.append(( + cloned_event.id, + new_session.app_name, + new_session.user_id, + new_session.id, + cloned_event.invocation_id, + cloned_event.timestamp, + cloned_event.model_dump_json(exclude_none=True), + )) + await db.executemany( + """ + INSERT INTO events (id, app_name, user_id, session_id, invocation_id, timestamp, event_data) + VALUES (?, ?, ?, ?, ?, ?, ?) + """, + event_params, + ) await db.commit() # Return the new session with events (avoid redundant DB query) diff --git a/src/google/adk/sessions/vertex_ai_session_service.py b/src/google/adk/sessions/vertex_ai_session_service.py index 361aef457b..2951a02e22 100644 --- a/src/google/adk/sessions/vertex_ai_session_service.py +++ b/src/google/adk/sessions/vertex_ai_session_service.py @@ -301,6 +301,9 @@ async def clone_session( fetched_sessions = await asyncio.gather(*fetch_tasks) source_sessions = [s for s in fetched_sessions if s is not None] + # Sort sessions by update time for deterministic state merging + source_sessions.sort(key=lambda s: s.last_update_time) + # Merge states from all source sessions (deep copy to avoid mutations) merged_state = {} for session in source_sessions: @@ -313,15 +316,20 @@ async def clone_session( state=merged_state, ) - # Collect all events, deduplicating by event ID (first occurrence wins) + # Collect all events, sort by timestamp, then deduplicate + # to ensure chronological "first occurrence wins" + all_source_events = [] + for session in source_sessions: + all_source_events.extend(session.events) + all_source_events.sort(key=lambda e: e.timestamp) + all_events = [] seen_event_ids = set() - for session in source_sessions: - for event in session.events: - if event.id in seen_event_ids: - continue - seen_event_ids.add(event.id) - all_events.append(event) + for event in all_source_events: + if event.id in seen_event_ids: + continue + seen_event_ids.add(event.id) + all_events.append(event) # Copy events to the new session (deep copy to avoid mutations) for event in all_events: From d67baecb20740f2c7d641c5c720d49ae3423ad7f Mon Sep 17 00:00:00 2001 From: Luyang Wang Date: Sat, 17 Jan 2026 21:21:51 -0500 Subject: [PATCH 08/15] consolidate duplicate logic to _prepare_sessions_for_cloning helper function --- .../adk/sessions/base_session_service.py | 47 +++++++++++++++++++ .../adk/sessions/database_session_service.py | 26 ++-------- .../adk/sessions/in_memory_session_service.py | 28 +++-------- .../adk/sessions/sqlite_session_service.py | 26 ++-------- .../adk/sessions/vertex_ai_session_service.py | 26 ++-------- 5 files changed, 65 insertions(+), 88 deletions(-) diff --git a/src/google/adk/sessions/base_session_service.py b/src/google/adk/sessions/base_session_service.py index b839c0ee3f..9ecff370a0 100644 --- a/src/google/adk/sessions/base_session_service.py +++ b/src/google/adk/sessions/base_session_service.py @@ -15,6 +15,7 @@ from __future__ import annotations import abc +import copy from typing import Any from typing import Optional @@ -144,6 +145,52 @@ async def clone_session( AlreadyExistsError: If a session with new_session_id already exists. """ + def _prepare_sessions_for_cloning( + self, source_sessions: list[Session] + ) -> tuple[dict[str, Any], list[Event]]: + """Prepares source sessions for cloning by merging states and deduplicating events. + + This is a shared helper method used by all clone_session implementations + to ensure consistent behavior across different session service backends. + + The method: + 1. Sorts sessions by last_update_time for deterministic state merging + 2. Merges states from all sessions (later sessions overwrite earlier ones) + 3. Collects all events, sorts by timestamp, and deduplicates by event ID + + Args: + source_sessions: List of source sessions to process. + + Returns: + A tuple of (merged_state, deduplicated_events): + - merged_state: Combined state from all sessions (deep copied) + - deduplicated_events: Chronologically sorted, deduplicated events + """ + # Sort sessions by update time for deterministic state merging + source_sessions.sort(key=lambda s: s.last_update_time) + + # Merge states from all source sessions + merged_state: dict[str, Any] = {} + for session in source_sessions: + merged_state.update(copy.deepcopy(session.state)) + + # Collect all events, sort by timestamp, then deduplicate + # to ensure chronological "first occurrence wins" + all_source_events: list[Event] = [] + for session in source_sessions: + all_source_events.extend(session.events) + all_source_events.sort(key=lambda e: e.timestamp) + + all_events: list[Event] = [] + seen_event_ids: set[str] = set() + for event in all_source_events: + if event.id in seen_event_ids: + continue + seen_event_ids.add(event.id) + all_events.append(event) + + return merged_state, all_events + async def append_event(self, session: Session, event: Event) -> Event: """Appends an event to a session object.""" if event.partial: diff --git a/src/google/adk/sessions/database_session_service.py b/src/google/adk/sessions/database_session_service.py index 7d4ebb4c28..9040084eb0 100644 --- a/src/google/adk/sessions/database_session_service.py +++ b/src/google/adk/sessions/database_session_service.py @@ -483,13 +483,10 @@ async def clone_session( sess.events = events_by_session_id.get(sess.id, []) source_sessions.append(sess) - # Sort sessions by update time for deterministic state merging - source_sessions.sort(key=lambda s: s.last_update_time) - - # Merge states from all source sessions - merged_state = {} - for session in source_sessions: - merged_state.update(copy.deepcopy(session.state)) + # Use shared helper for state merging and event deduplication + merged_state, all_events = self._prepare_sessions_for_cloning( + source_sessions + ) # Create the new session (new_session_id=None triggers UUID4 generation) new_session = await self.create_session( @@ -499,21 +496,6 @@ async def clone_session( session_id=new_session_id, ) - # Collect all events, sort by timestamp, then deduplicate - # to ensure chronological "first occurrence wins" - all_source_events = [] - for session in source_sessions: - all_source_events.extend(session.events) - all_source_events.sort(key=lambda e: e.timestamp) - - all_events = [] - seen_event_ids = set() - for event in all_source_events: - if event.id in seen_event_ids: - continue - seen_event_ids.add(event.id) - all_events.append(event) - # Copy events to the new session using bulk insert async with self.database_session_factory() as sql_session: new_storage_events = [ diff --git a/src/google/adk/sessions/in_memory_session_service.py b/src/google/adk/sessions/in_memory_session_service.py index 1ac3ef44b4..5e34d0a9eb 100644 --- a/src/google/adk/sessions/in_memory_session_service.py +++ b/src/google/adk/sessions/in_memory_session_service.py @@ -351,13 +351,12 @@ def _clone_session_impl( copied_session = self._merge_state(app_name, src_user_id, copied_session) source_sessions.append(copied_session) - # Sort sessions by update time for deterministic state merging - source_sessions.sort(key=lambda s: s.last_update_time) - - # Merge states from all source sessions - merged_state = {} - for session in source_sessions: - merged_state.update(copy.deepcopy(session.state)) + # Use shared helper for state merging and event deduplication + merged_state, all_events = self._prepare_sessions_for_cloning( + source_sessions + ) + # Deep copy events for in-memory storage isolation + all_events = [copy.deepcopy(event) for event in all_events] # Create the new session (new_session_id=None triggers UUID4 generation) new_session = self._create_session_impl( @@ -367,21 +366,6 @@ def _clone_session_impl( session_id=new_session_id, ) - # Collect all events, sort by timestamp, then deduplicate - # to ensure chronological "first occurrence wins" - all_source_events = [] - for session in source_sessions: - all_source_events.extend(session.events) - all_source_events.sort(key=lambda e: e.timestamp) - - all_events = [] - seen_event_ids = set() - for event in all_source_events: - if event.id in seen_event_ids: - continue - seen_event_ids.add(event.id) - all_events.append(copy.deepcopy(event)) - # Get latest update time from sorted sessions latest_update_time = ( source_sessions[-1].last_update_time if source_sessions else 0.0 diff --git a/src/google/adk/sessions/sqlite_session_service.py b/src/google/adk/sessions/sqlite_session_service.py index 86977d2bda..ac5fa8acfd 100644 --- a/src/google/adk/sessions/sqlite_session_service.py +++ b/src/google/adk/sessions/sqlite_session_service.py @@ -421,13 +421,10 @@ async def clone_session( sess.events = events_by_session_id.get(sess.id, []) source_sessions.append(sess) - # Sort sessions by update time for deterministic state merging - source_sessions.sort(key=lambda s: s.last_update_time) - - # Merge states from all source sessions - merged_state = {} - for session in source_sessions: - merged_state.update(copy.deepcopy(session.state)) + # Use shared helper for state merging and event deduplication + merged_state, all_events = self._prepare_sessions_for_cloning( + source_sessions + ) # Create the new session (new_session_id=None triggers UUID4 generation) new_session = await self.create_session( @@ -437,21 +434,6 @@ async def clone_session( session_id=new_session_id, ) - # Collect all events, sort by timestamp, then deduplicate - # to ensure chronological "first occurrence wins" - all_source_events = [] - for session in source_sessions: - all_source_events.extend(session.events) - all_source_events.sort(key=lambda e: e.timestamp) - - all_events = [] - seen_event_ids = set() - for event in all_source_events: - if event.id in seen_event_ids: - continue - seen_event_ids.add(event.id) - all_events.append(event) - # Copy events to the new session using bulk insert async with self._get_db_connection() as db: event_params = [] diff --git a/src/google/adk/sessions/vertex_ai_session_service.py b/src/google/adk/sessions/vertex_ai_session_service.py index 2951a02e22..9048617149 100644 --- a/src/google/adk/sessions/vertex_ai_session_service.py +++ b/src/google/adk/sessions/vertex_ai_session_service.py @@ -301,13 +301,10 @@ async def clone_session( fetched_sessions = await asyncio.gather(*fetch_tasks) source_sessions = [s for s in fetched_sessions if s is not None] - # Sort sessions by update time for deterministic state merging - source_sessions.sort(key=lambda s: s.last_update_time) - - # Merge states from all source sessions (deep copy to avoid mutations) - merged_state = {} - for session in source_sessions: - merged_state.update(copy.deepcopy(session.state)) + # Use shared helper for state merging and event deduplication + merged_state, all_events = self._prepare_sessions_for_cloning( + source_sessions + ) # Create the new session (ID is auto-generated by Vertex AI backend) new_session = await self.create_session( @@ -316,21 +313,6 @@ async def clone_session( state=merged_state, ) - # Collect all events, sort by timestamp, then deduplicate - # to ensure chronological "first occurrence wins" - all_source_events = [] - for session in source_sessions: - all_source_events.extend(session.events) - all_source_events.sort(key=lambda e: e.timestamp) - - all_events = [] - seen_event_ids = set() - for event in all_source_events: - if event.id in seen_event_ids: - continue - seen_event_ids.add(event.id) - all_events.append(event) - # Copy events to the new session (deep copy to avoid mutations) for event in all_events: await self.append_event(new_session, copy.deepcopy(event)) From 42e7d364511155de44a527b26f4e40e35faa388d Mon Sep 17 00:00:00 2001 From: Luyang Wang Date: Sat, 17 Jan 2026 21:30:48 -0500 Subject: [PATCH 09/15] explicit max calculation --- src/google/adk/sessions/base_session_service.py | 7 ++++--- src/google/adk/sessions/in_memory_session_service.py | 4 ++-- src/google/adk/sessions/vertex_ai_session_service.py | 3 +++ 3 files changed, 9 insertions(+), 5 deletions(-) diff --git a/src/google/adk/sessions/base_session_service.py b/src/google/adk/sessions/base_session_service.py index 9ecff370a0..4c073b77e4 100644 --- a/src/google/adk/sessions/base_session_service.py +++ b/src/google/adk/sessions/base_session_service.py @@ -167,17 +167,18 @@ def _prepare_sessions_for_cloning( - deduplicated_events: Chronologically sorted, deduplicated events """ # Sort sessions by update time for deterministic state merging - source_sessions.sort(key=lambda s: s.last_update_time) + # Use sorted() to avoid modifying the input list in-place + sorted_sessions = sorted(source_sessions, key=lambda s: s.last_update_time) # Merge states from all source sessions merged_state: dict[str, Any] = {} - for session in source_sessions: + for session in sorted_sessions: merged_state.update(copy.deepcopy(session.state)) # Collect all events, sort by timestamp, then deduplicate # to ensure chronological "first occurrence wins" all_source_events: list[Event] = [] - for session in source_sessions: + for session in sorted_sessions: all_source_events.extend(session.events) all_source_events.sort(key=lambda e: e.timestamp) diff --git a/src/google/adk/sessions/in_memory_session_service.py b/src/google/adk/sessions/in_memory_session_service.py index 5e34d0a9eb..34b3942d71 100644 --- a/src/google/adk/sessions/in_memory_session_service.py +++ b/src/google/adk/sessions/in_memory_session_service.py @@ -366,9 +366,9 @@ def _clone_session_impl( session_id=new_session_id, ) - # Get latest update time from sorted sessions + # Get latest update time explicitly (don't rely on sorting side effects) latest_update_time = ( - source_sessions[-1].last_update_time if source_sessions else 0.0 + max(s.last_update_time for s in source_sessions) if source_sessions else 0.0 ) # Get the storage session and set events diff --git a/src/google/adk/sessions/vertex_ai_session_service.py b/src/google/adk/sessions/vertex_ai_session_service.py index 9048617149..c8b088f278 100644 --- a/src/google/adk/sessions/vertex_ai_session_service.py +++ b/src/google/adk/sessions/vertex_ai_session_service.py @@ -314,6 +314,9 @@ async def clone_session( ) # Copy events to the new session (deep copy to avoid mutations) + # Note: Each event requires a separate API call to Vertex AI. For sessions + # with many events, this may be slow. Vertex AI does not currently support + # batch event appending. for event in all_events: await self.append_event(new_session, copy.deepcopy(event)) From 608979b72f8faed638628a5f539245f421eb928f Mon Sep 17 00:00:00 2001 From: lwangverizon Date: Sat, 17 Jan 2026 21:35:17 -0500 Subject: [PATCH 10/15] Update src/google/adk/sessions/vertex_ai_session_service.py Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- src/google/adk/sessions/vertex_ai_session_service.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/google/adk/sessions/vertex_ai_session_service.py b/src/google/adk/sessions/vertex_ai_session_service.py index c8b088f278..a2c24a8a43 100644 --- a/src/google/adk/sessions/vertex_ai_session_service.py +++ b/src/google/adk/sessions/vertex_ai_session_service.py @@ -300,6 +300,11 @@ async def clone_session( ] fetched_sessions = await asyncio.gather(*fetch_tasks) source_sessions = [s for s in fetched_sessions if s is not None] + if not source_sessions and list_response.sessions: + raise ValueError( + f'Could not retrieve any source sessions for user {src_user_id}. ' + 'They may have been deleted after being listed.' + ) # Use shared helper for state merging and event deduplication merged_state, all_events = self._prepare_sessions_for_cloning( From 74dd8e742c1df47377819aee3c0f804381130c19 Mon Sep 17 00:00:00 2001 From: lwangverizon Date: Sat, 17 Jan 2026 21:35:29 -0500 Subject: [PATCH 11/15] Update src/google/adk/sessions/sqlite_session_service.py Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- src/google/adk/sessions/sqlite_session_service.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/src/google/adk/sessions/sqlite_session_service.py b/src/google/adk/sessions/sqlite_session_service.py index ac5fa8acfd..f0d84f76ca 100644 --- a/src/google/adk/sessions/sqlite_session_service.py +++ b/src/google/adk/sessions/sqlite_session_service.py @@ -409,11 +409,8 @@ async def clone_session( # Step 3: Map events back to sessions events_by_session_id = {} for row in event_rows: - sid = row["session_id"] - if sid not in events_by_session_id: - events_by_session_id[sid] = [] - events_by_session_id[sid].append( - Event.model_validate_json(row["event_data"]) + events_by_session_id.setdefault(row['session_id'], []).append( + Event.model_validate_json(row['event_data']) ) # Build full session objects with events From 6f38fc20cf8ee75bd05f0bb27dfcc38bd98360ce Mon Sep 17 00:00:00 2001 From: lwangverizon Date: Sat, 17 Jan 2026 21:35:41 -0500 Subject: [PATCH 12/15] Update src/google/adk/sessions/database_session_service.py Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- src/google/adk/sessions/database_session_service.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/google/adk/sessions/database_session_service.py b/src/google/adk/sessions/database_session_service.py index 9040084eb0..dd2cf80073 100644 --- a/src/google/adk/sessions/database_session_service.py +++ b/src/google/adk/sessions/database_session_service.py @@ -473,10 +473,9 @@ async def clone_session( # Step 3: Map events back to sessions events_by_session_id = {} for storage_event in all_storage_events: - sid = storage_event.session_id - if sid not in events_by_session_id: - events_by_session_id[sid] = [] - events_by_session_id[sid].append(storage_event.to_event()) + events_by_session_id.setdefault(storage_event.session_id, []).append( + storage_event.to_event() + ) # Build full session objects with events for sess in list_response.sessions: From 8d05a3fcbe076968beaf593172ce3f2358e734b1 Mon Sep 17 00:00:00 2001 From: lwangverizon Date: Sat, 17 Jan 2026 21:35:55 -0500 Subject: [PATCH 13/15] Update src/google/adk/sessions/base_session_service.py Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- src/google/adk/sessions/base_session_service.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/google/adk/sessions/base_session_service.py b/src/google/adk/sessions/base_session_service.py index 4c073b77e4..2b6dd8bb00 100644 --- a/src/google/adk/sessions/base_session_service.py +++ b/src/google/adk/sessions/base_session_service.py @@ -185,10 +185,9 @@ def _prepare_sessions_for_cloning( all_events: list[Event] = [] seen_event_ids: set[str] = set() for event in all_source_events: - if event.id in seen_event_ids: - continue - seen_event_ids.add(event.id) - all_events.append(event) + if event.id not in seen_event_ids: + seen_event_ids.add(event.id) + all_events.append(event) return merged_state, all_events From bc8839ab2d2cf8c4061568b185d67edfd5c5cbca Mon Sep 17 00:00:00 2001 From: Luyang Wang Date: Sat, 17 Jan 2026 21:47:16 -0500 Subject: [PATCH 14/15] format clone_session only --- src/google/adk/sessions/database_session_service.py | 7 +++---- src/google/adk/sessions/in_memory_session_service.py | 11 +++++++---- src/google/adk/sessions/sqlite_session_service.py | 7 +++---- src/google/adk/sessions/vertex_ai_session_service.py | 3 +-- tests/unittests/sessions/test_session_service.py | 4 +++- 5 files changed, 17 insertions(+), 15 deletions(-) diff --git a/src/google/adk/sessions/database_session_service.py b/src/google/adk/sessions/database_session_service.py index dd2cf80073..09de7c6417 100644 --- a/src/google/adk/sessions/database_session_service.py +++ b/src/google/adk/sessions/database_session_service.py @@ -443,8 +443,7 @@ async def clone_session( ) if not session: raise ValueError( - f"Source session {src_session_id} not found for user" - f" {src_user_id}." + f"Source session {src_session_id} not found for user {src_user_id}." ) source_sessions.append(session) else: @@ -529,8 +528,8 @@ async def append_event(self, session: Session, event: Event) -> Event: if storage_session.update_timestamp_tz > session.last_update_time: raise ValueError( "The last_update_time provided in the session object" - f" {datetime.fromtimestamp(session.last_update_time):'%Y-%m-%d %H:%M:%S'} is" - " earlier than the update_time in the storage_session" + f" {datetime.fromtimestamp(session.last_update_time):'%Y-%m-%d %H:%M:%S'}" + " is earlier than the update_time in the storage_session" f" {datetime.fromtimestamp(storage_session.update_timestamp_tz):'%Y-%m-%d %H:%M:%S'}." " Please check if it is a stale session." ) diff --git a/src/google/adk/sessions/in_memory_session_service.py b/src/google/adk/sessions/in_memory_session_service.py index 34b3942d71..47c900cf81 100644 --- a/src/google/adk/sessions/in_memory_session_service.py +++ b/src/google/adk/sessions/in_memory_session_service.py @@ -327,8 +327,7 @@ def _clone_session_impl( ) if not session: raise ValueError( - f'Source session {src_session_id} not found for user' - f' {src_user_id}.' + f'Source session {src_session_id} not found for user {src_user_id}.' ) source_sessions.append(session) else: @@ -348,7 +347,9 @@ def _clone_session_impl( # Deep copy the session to avoid mutations copied_session = copy.deepcopy(storage_session) # Merge state with app and user state - copied_session = self._merge_state(app_name, src_user_id, copied_session) + copied_session = self._merge_state( + app_name, src_user_id, copied_session + ) source_sessions.append(copied_session) # Use shared helper for state merging and event deduplication @@ -368,7 +369,9 @@ def _clone_session_impl( # Get latest update time explicitly (don't rely on sorting side effects) latest_update_time = ( - max(s.last_update_time for s in source_sessions) if source_sessions else 0.0 + max(s.last_update_time for s in source_sessions) + if source_sessions + else 0.0 ) # Get the storage session and set events diff --git a/src/google/adk/sessions/sqlite_session_service.py b/src/google/adk/sessions/sqlite_session_service.py index f0d84f76ca..6592709594 100644 --- a/src/google/adk/sessions/sqlite_session_service.py +++ b/src/google/adk/sessions/sqlite_session_service.py @@ -380,8 +380,7 @@ async def clone_session( ) if not session: raise ValueError( - f"Source session {src_session_id} not found for user" - f" {src_user_id}." + f"Source session {src_session_id} not found for user {src_user_id}." ) source_sessions.append(session) else: @@ -409,8 +408,8 @@ async def clone_session( # Step 3: Map events back to sessions events_by_session_id = {} for row in event_rows: - events_by_session_id.setdefault(row['session_id'], []).append( - Event.model_validate_json(row['event_data']) + events_by_session_id.setdefault(row["session_id"], []).append( + Event.model_validate_json(row["event_data"]) ) # Build full session objects with events diff --git a/src/google/adk/sessions/vertex_ai_session_service.py b/src/google/adk/sessions/vertex_ai_session_service.py index a2c24a8a43..decea6bd13 100644 --- a/src/google/adk/sessions/vertex_ai_session_service.py +++ b/src/google/adk/sessions/vertex_ai_session_service.py @@ -276,8 +276,7 @@ async def clone_session( ) if not session: raise ValueError( - f'Source session {src_session_id} not found for user' - f' {src_user_id}.' + f'Source session {src_session_id} not found for user {src_user_id}.' ) source_sessions.append(session) else: diff --git a/tests/unittests/sessions/test_session_service.py b/tests/unittests/sessions/test_session_service.py index e1f1789954..0b3223ac5b 100644 --- a/tests/unittests/sessions/test_session_service.py +++ b/tests/unittests/sessions/test_session_service.py @@ -759,7 +759,9 @@ async def test_clone_session_preserves_event_content(session_service): assert cloned_event.author == event.author assert cloned_event.content == event.content assert cloned_event.actions.artifact_delta == event.actions.artifact_delta - assert cloned_event.actions.transfer_to_agent == event.actions.transfer_to_agent + assert ( + cloned_event.actions.transfer_to_agent == event.actions.transfer_to_agent + ) @pytest.mark.asyncio From d4f4e9b0e8c72486a651b0126c5b1b0ba8d0f225 Mon Sep 17 00:00:00 2001 From: Luyang Wang Date: Sat, 17 Jan 2026 22:38:33 -0500 Subject: [PATCH 15/15] consolidate test cases --- .../sessions/test_session_service.py | 126 ++++++------------ 1 file changed, 43 insertions(+), 83 deletions(-) diff --git a/tests/unittests/sessions/test_session_service.py b/tests/unittests/sessions/test_session_service.py index 0b3223ac5b..d909951679 100644 --- a/tests/unittests/sessions/test_session_service.py +++ b/tests/unittests/sessions/test_session_service.py @@ -606,98 +606,64 @@ async def test_partial_events_are_not_persisted(session_service): @pytest.mark.asyncio -async def test_clone_session_basic(session_service): - """Test basic clone_session functionality with specific session_id.""" +@pytest.mark.parametrize( + 'new_user_id,new_session_id', + [ + (None, None), # Basic clone - same user, auto-generated session ID + ('user2', None), # Different user, auto-generated session ID + (None, 'custom_session'), # Same user, custom session ID + ('user2', 'custom_session'), # Different user and custom session ID + ], + ids=['basic', 'different_user', 'custom_session_id', 'both_custom'], +) +async def test_clone_session_single_session( + session_service, new_user_id, new_session_id +): + """Test clone_session with various parameter combinations.""" app_name = 'my_app' - user_id = 'user' + source_user_id = 'user' # Create source session with events source_session = await session_service.create_session( - app_name=app_name, user_id=user_id, state={'key': 'value'} + app_name=app_name, user_id=source_user_id, state={'key': 'value'} ) event1 = Event(invocation_id='inv1', author='user') event2 = Event(invocation_id='inv2', author='model') await session_service.append_event(source_session, event1) await session_service.append_event(source_session, event2) - # Clone the session using source identifiers + # Clone the session cloned_session = await session_service.clone_session( app_name=app_name, - src_user_id=user_id, + src_user_id=source_user_id, src_session_id=source_session.id, + new_user_id=new_user_id, + new_session_id=new_session_id, ) + # Determine expected values + expected_user_id = new_user_id if new_user_id else source_user_id + # Verify the cloned session assert cloned_session is not None assert cloned_session.id != source_session.id assert cloned_session.app_name == app_name - assert cloned_session.user_id == user_id + assert cloned_session.user_id == expected_user_id assert cloned_session.state == {'key': 'value'} assert len(cloned_session.events) == 2 assert cloned_session.events[0].invocation_id == event1.invocation_id assert cloned_session.events[1].invocation_id == event2.invocation_id - -@pytest.mark.asyncio -async def test_clone_session_with_different_user_id(session_service): - """Test clone_session with a different destination user_id.""" - app_name = 'my_app' - source_user_id = 'user1' - dest_user_id = 'user2' - - # Create source session - source_session = await session_service.create_session( - app_name=app_name, user_id=source_user_id, state={'key': 'value'} - ) - event = Event(invocation_id='inv1', author='user') - await session_service.append_event(source_session, event) - - # Clone to different user - cloned_session = await session_service.clone_session( - app_name=app_name, - src_user_id=source_user_id, - src_session_id=source_session.id, - new_user_id=dest_user_id, - ) - - # Verify - assert cloned_session.user_id == dest_user_id - assert cloned_session.app_name == app_name - assert len(cloned_session.events) == 1 + # Verify custom session ID if provided + if new_session_id: + assert cloned_session.id == new_session_id # Verify the cloned session is persisted correctly fetched_session = await session_service.get_session( - app_name=app_name, user_id=dest_user_id, session_id=cloned_session.id + app_name=app_name, user_id=expected_user_id, session_id=cloned_session.id ) assert fetched_session is not None - assert fetched_session.user_id == dest_user_id - - -@pytest.mark.asyncio -async def test_clone_session_with_custom_session_id(session_service): - """Test clone_session with a custom destination session_id.""" - app_name = 'my_app' - user_id = 'user' - custom_session_id = 'custom_cloned_session' - - # Create source session - source_session = await session_service.create_session( - app_name=app_name, user_id=user_id - ) - event = Event(invocation_id='inv1', author='user') - await session_service.append_event(source_session, event) - - # Clone with custom ID - cloned_session = await session_service.clone_session( - app_name=app_name, - src_user_id=user_id, - src_session_id=source_session.id, - new_session_id=custom_session_id, - ) - - # Verify - assert cloned_session.id == custom_session_id - assert len(cloned_session.events) == 1 + assert fetched_session.user_id == expected_user_id @pytest.mark.asyncio @@ -849,32 +815,26 @@ async def test_clone_all_user_sessions(session_service): @pytest.mark.asyncio -async def test_clone_session_no_source_raises_error(session_service): - """Test that clone_session raises error if source session not found.""" - app_name = 'my_app' - user_id = 'user' - - # Try to clone non-existent session - with pytest.raises(ValueError, match='not found'): - await session_service.clone_session( - app_name=app_name, - src_user_id=user_id, - src_session_id='non_existent_session', - ) - - -@pytest.mark.asyncio -async def test_clone_all_sessions_no_sessions_raises_error(session_service): - """Test clone_session raises error when user has no sessions.""" +@pytest.mark.parametrize( + 'src_session_id,error_match', + [ + ('non_existent_session', 'not found'), # Specific session not found + (None, 'No sessions found'), # User has no sessions (clone all mode) + ], + ids=['session_not_found', 'no_sessions_for_user'], +) +async def test_clone_session_source_not_found_raises_error( + session_service, src_session_id, error_match +): + """Test clone_session raises ValueError when source cannot be found.""" app_name = 'my_app' user_id = 'user_with_no_sessions' - # Try to clone all sessions for user with no sessions - with pytest.raises(ValueError, match='No sessions found'): + with pytest.raises(ValueError, match=error_match): await session_service.clone_session( app_name=app_name, src_user_id=user_id, - # No src_session_id means clone all sessions + src_session_id=src_session_id, )