From 3ec2f23cd07faeaa5f5522e43e336cf59f282aef Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Mon, 19 Jan 2026 13:54:40 +0500 Subject: [PATCH 1/4] Fix missing instance lock in delete_fleets --- .../_internal/server/services/fleets.py | 54 ++++++++++++------- 1 file changed, 34 insertions(+), 20 deletions(-) diff --git a/src/dstack/_internal/server/services/fleets.py b/src/dstack/_internal/server/services/fleets.py index 95ae519d0..2881ce5c5 100644 --- a/src/dstack/_internal/server/services/fleets.py +++ b/src/dstack/_internal/server/services/fleets.py @@ -42,7 +42,12 @@ ) from dstack._internal.core.models.projects import Project from dstack._internal.core.models.resources import ResourcesSpec -from dstack._internal.core.models.runs import JobProvisioningData, Requirements, get_policy_map +from dstack._internal.core.models.runs import ( + JobProvisioningData, + Requirements, + RunStatus, + get_policy_map, +) from dstack._internal.core.models.users import GlobalRole from dstack._internal.core.services import validate_dstack_resource_name from dstack._internal.core.services.diff import ModelDiff, copy_model, diff_models @@ -53,6 +58,7 @@ JobModel, MemberModel, ProjectModel, + RunModel, UserModel, ) from dstack._internal.server.services import events @@ -613,48 +619,56 @@ async def delete_fleets( instance_nums: Optional[List[int]] = None, ): res = await session.execute( - select(FleetModel) + select(FleetModel.id) .where( FleetModel.project_id == project.id, FleetModel.name.in_(names), FleetModel.deleted == False, ) - .options(joinedload(FleetModel.instances)) + .order_by(FleetModel.id) # take locks in order + .with_for_update(key_share=True) ) - fleet_models = res.scalars().unique().all() - fleets_ids = sorted([f.id for f in fleet_models]) - instances_ids = sorted([i.id for f in fleet_models for i in f.instances]) - await session.commit() - logger.info("Deleting fleets: %s", [v.name for v in fleet_models]) + fleets_ids = list(res.scalars().unique().all()) + res = await session.execute( + select(InstanceModel.id) + .where( + InstanceModel.fleet_id.in_(fleets_ids), + InstanceModel.deleted == False, + ) + .order_by(InstanceModel.id) # take locks in order + .with_for_update(key_share=True) + ) + instances_ids = list(res.scalars().unique().all()) + if is_db_sqlite(): + # Start new transaction to see committed changes after lock + await session.commit() async with ( get_locker(get_db().dialect_name).lock_ctx(FleetModel.__tablename__, fleets_ids), get_locker(get_db().dialect_name).lock_ctx(InstanceModel.__tablename__, instances_ids), ): - # Refetch after lock - # TODO: Lock instances with FOR UPDATE? - # TODO: Do not lock fleet when deleting only instances + # Refetch after lock. + # TODO: Do not lock fleet when deleting only instances. res = await session.execute( select(FleetModel) - .where( - FleetModel.project_id == project.id, - FleetModel.name.in_(names), - FleetModel.deleted == False, - ) + .where(FleetModel.id.in_(fleets_ids)) .options( - selectinload(FleetModel.instances) + joinedload(FleetModel.instances.and_(InstanceModel.id.in_(instances_ids))) .joinedload(InstanceModel.jobs) .load_only(JobModel.id) ) - .options(selectinload(FleetModel.runs)) + .options( + joinedload( + FleetModel.runs.and_(RunModel.status.not_in(RunStatus.finished_statuses())) + ) + ) .execution_options(populate_existing=True) - .order_by(FleetModel.id) # take locks in order - .with_for_update(key_share=True) ) fleet_models = res.scalars().unique().all() fleets = [fleet_model_to_fleet(m) for m in fleet_models] for fleet in fleets: if fleet.spec.configuration.ssh_config is not None: _check_can_manage_ssh_fleets(user=user, project=project) + logger.info("Deleting fleets: %s", [f.name for f in fleet_models]) for fleet_model in fleet_models: _terminate_fleet_instances(fleet_model=fleet_model, instance_nums=instance_nums) # TERMINATING fleets are deleted by process_fleets after instances are terminated From d1ea5e587f8505be474b77c84cde6354f02a2ab8 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Mon, 19 Jan 2026 13:54:56 +0500 Subject: [PATCH 2/4] Handle terminating deleted instances --- .../background/tasks/process_instances.py | 12 ++++---- .../tasks/test_process_instances.py | 28 +++++++++++++++++++ 2 files changed, 33 insertions(+), 7 deletions(-) diff --git a/src/dstack/_internal/server/background/tasks/process_instances.py b/src/dstack/_internal/server/background/tasks/process_instances.py index 9a14bdc30..454d6ee18 100644 --- a/src/dstack/_internal/server/background/tasks/process_instances.py +++ b/src/dstack/_internal/server/background/tasks/process_instances.py @@ -11,7 +11,7 @@ from pydantic import ValidationError from sqlalchemy import and_, delete, func, not_, select from sqlalchemy.ext.asyncio import AsyncSession -from sqlalchemy.orm import joinedload, with_loader_criteria +from sqlalchemy.orm import joinedload from dstack._internal import settings from dstack._internal.core.backends.base.compute import ( @@ -218,9 +218,8 @@ async def _process_instance(session: AsyncSession, instance: InstanceModel): .options(joinedload(InstanceModel.project).joinedload(ProjectModel.backends)) .options(joinedload(InstanceModel.jobs).load_only(JobModel.id, JobModel.status)) .options( - joinedload(InstanceModel.fleet).joinedload(FleetModel.instances), - with_loader_criteria( - InstanceModel, InstanceModel.deleted == False, include_aliases=True + joinedload(InstanceModel.fleet).joinedload( + FleetModel.instances.and_(InstanceModel.deleted == False) ), ) .execution_options(populate_existing=True) @@ -233,9 +232,8 @@ async def _process_instance(session: AsyncSession, instance: InstanceModel): .options(joinedload(InstanceModel.project)) .options(joinedload(InstanceModel.jobs).load_only(JobModel.id, JobModel.status)) .options( - joinedload(InstanceModel.fleet).joinedload(FleetModel.instances), - with_loader_criteria( - InstanceModel, InstanceModel.deleted == False, include_aliases=True + joinedload(InstanceModel.fleet).joinedload( + FleetModel.instances.and_(InstanceModel.deleted == False) ), ) .execution_options(populate_existing=True) diff --git a/src/tests/_internal/server/background/tasks/test_process_instances.py b/src/tests/_internal/server/background/tasks/test_process_instances.py index a72dc0c16..5eff68c8c 100644 --- a/src/tests/_internal/server/background/tasks/test_process_instances.py +++ b/src/tests/_internal/server/background/tasks/test_process_instances.py @@ -597,6 +597,34 @@ async def test_terminate(self, test_db, session: AsyncSession): assert instance.deleted_at is not None assert instance.finished_at is not None + @pytest.mark.asyncio + @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) + async def test_terminates_terminating_deleted_instance(self, test_db, session: AsyncSession): + # There was a race condition when instance could stay in Terminating while marked as deleted. + # TODO: + project = await create_project(session=session) + instance = await create_instance( + session=session, project=project, status=InstanceStatus.TERMINATING + ) + instance.deleted = True + instance.termination_reason = InstanceTerminationReason.IDLE_TIMEOUT + instance.last_job_processed_at = instance.deleted_at = ( + get_current_datetime() + dt.timedelta(minutes=-19) + ) + await session.commit() + + with self.mock_terminate_in_backend() as mock: + await process_instances() + mock.assert_called_once() + + await session.refresh(instance) + + assert instance is not None + assert instance.status == InstanceStatus.TERMINATED + assert instance.deleted == True + assert instance.deleted_at is not None + assert instance.finished_at is not None + @pytest.mark.asyncio @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) @pytest.mark.parametrize( From f2dc85a91f1a7e76cd6fd7f323ef3c475ec74fcd Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Mon, 19 Jan 2026 14:00:56 +0500 Subject: [PATCH 3/4] Fix comment --- .../_internal/server/background/tasks/test_process_instances.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/tests/_internal/server/background/tasks/test_process_instances.py b/src/tests/_internal/server/background/tasks/test_process_instances.py index 5eff68c8c..38bffc442 100644 --- a/src/tests/_internal/server/background/tasks/test_process_instances.py +++ b/src/tests/_internal/server/background/tasks/test_process_instances.py @@ -601,7 +601,7 @@ async def test_terminate(self, test_db, session: AsyncSession): @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) async def test_terminates_terminating_deleted_instance(self, test_db, session: AsyncSession): # There was a race condition when instance could stay in Terminating while marked as deleted. - # TODO: + # TODO: Drop this after all such "bad" instances are processed. project = await create_project(session=session) instance = await create_instance( session=session, project=project, status=InstanceStatus.TERMINATING From 3874da50d3e32f74a8b8710411d73ea08c62894d Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Mon, 19 Jan 2026 15:38:11 +0500 Subject: [PATCH 4/4] Fix log message --- src/dstack/_internal/server/services/fleets.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/dstack/_internal/server/services/fleets.py b/src/dstack/_internal/server/services/fleets.py index 2881ce5c5..588f34698 100644 --- a/src/dstack/_internal/server/services/fleets.py +++ b/src/dstack/_internal/server/services/fleets.py @@ -668,7 +668,12 @@ async def delete_fleets( for fleet in fleets: if fleet.spec.configuration.ssh_config is not None: _check_can_manage_ssh_fleets(user=user, project=project) - logger.info("Deleting fleets: %s", [f.name for f in fleet_models]) + if instance_nums is None: + logger.info("Deleting fleets: %s", [f.name for f in fleet_models]) + else: + logger.info( + "Deleting fleets %s instances %s", [f.name for f in fleet_models], instance_nums + ) for fleet_model in fleet_models: _terminate_fleet_instances(fleet_model=fleet_model, instance_nums=instance_nums) # TERMINATING fleets are deleted by process_fleets after instances are terminated