diff --git a/docs/docs/guides/server-deployment.md b/docs/docs/guides/server-deployment.md index dc5093f2f2..42365452aa 100644 --- a/docs/docs/guides/server-deployment.md +++ b/docs/docs/guides/server-deployment.md @@ -229,8 +229,8 @@ Fluent-bit supports two modes depending on how you want to access logs. === "Full mode" - Logs are shipped to Fluent-bit and can be read back through the dstack UI and CLI via Elasticsearch or OpenSearch. - Use this mode when you want a complete integration with log viewing in dstack: + Logs are shipped to Fluent-bit and can be read back through the `dstack` UI and CLI via Elasticsearch or OpenSearch. + Use this mode when you want a complete integration with log viewing in `dstack`: ```shell $ DSTACK_SERVER_FLUENTBIT_HOST=fluentbit.example.com \ @@ -244,7 +244,7 @@ Fluent-bit supports two modes depending on how you want to access logs. The dstack UI/CLI will show empty logs. Use this mode when: - You have an existing logging infrastructure (Kibana, Grafana, Datadog, etc.) - - You only need to forward logs without reading them back through dstack + - You only need to forward logs without reading them back through `dstack` - You want to reduce operational complexity by not running Elasticsearch/OpenSearch ```shell diff --git a/src/dstack/_internal/server/services/logs/__init__.py b/src/dstack/_internal/server/services/logs/__init__.py index 1f8565d49c..bc601688bc 100644 --- a/src/dstack/_internal/server/services/logs/__init__.py +++ b/src/dstack/_internal/server/services/logs/__init__.py @@ -2,6 +2,7 @@ from typing import List, Optional from uuid import UUID +from dstack._internal.core.errors import ServerClientError from dstack._internal.core.models.logs import JobSubmissionLogs from dstack._internal.server import settings from dstack._internal.server.models import ProjectModel @@ -105,9 +106,13 @@ def write_logs( async def poll_logs_async(project: ProjectModel, request: PollLogsRequest) -> JobSubmissionLogs: - job_submission_logs = await run_async( - get_log_storage().poll_logs, project=project, request=request - ) + try: + job_submission_logs = await run_async( + get_log_storage().poll_logs, project=project, request=request + ) + except LogStorageError as e: + logger.error("Failed to poll logs from log storage: %s", repr(e)) + raise ServerClientError("Failed to poll logs from log storage") # Logs are stored in plaintext but transmitted in base64 for API/CLI backward compatibility. # Old logs stored in base64 are encoded twice for transmission and shown as base64 in CLI/UI. # We live with that. diff --git a/src/dstack/_internal/server/services/logs/fluentbit.py b/src/dstack/_internal/server/services/logs/fluentbit.py index b45b2988d5..bb97e21f09 100644 --- a/src/dstack/_internal/server/services/logs/fluentbit.py +++ b/src/dstack/_internal/server/services/logs/fluentbit.py @@ -1,3 +1,4 @@ +from datetime import datetime from typing import List, Optional, Protocol from uuid import UUID @@ -99,7 +100,6 @@ def read( try: response = self._client.search(**search_params) except ElasticsearchError as e: - logger.error("Elasticsearch/OpenSearch search error: %s", e) raise LogStorageError(f"Elasticsearch/OpenSearch error: {e}") from e hits = response.get("hits", {}).get("hits", []) @@ -112,8 +112,6 @@ def read( message = source.get("message", "") if timestamp_str: - from datetime import datetime - try: timestamp = datetime.fromisoformat(timestamp_str.replace("Z", "+00:00")) except ValueError: @@ -180,16 +178,10 @@ def write(self, tag: str, records: List[dict]) -> None: ) response.raise_for_status() except httpx.HTTPStatusError as e: - logger.error( - "Fluent-bit HTTP request failed with status %d: %s", - e.response.status_code, - e.response.text, - ) raise LogStorageError( f"Fluent-bit HTTP error: status {e.response.status_code}" ) from e except httpx.HTTPError as e: - logger.error("Failed to write log to Fluent-bit via HTTP: %s", e) raise LogStorageError(f"Fluent-bit HTTP error: {e}") from e def close(self) -> None: @@ -206,7 +198,6 @@ def write(self, tag: str, records: List[dict]) -> None: for record in records: if not self._sender.emit(tag, record): error = self._sender.last_error - logger.error("Failed to write log to Fluent-bit via Forward: %s", error) self._sender.clear_last_error() raise LogStorageError(f"Fluent-bit Forward error: {error}") @@ -271,7 +262,7 @@ def __init__( index=es_index, api_key=es_api_key, ) - logger.debug( + logger.info( "Fluent-bit log storage initialized with Elasticsearch/OpenSearch reader" ) else: