Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions docs/docs/guides/server-deployment.md
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,8 @@ Fluent-bit supports two modes depending on how you want to access logs.

=== "Full mode"

Logs are shipped to Fluent-bit and can be read back through the dstack UI and CLI via Elasticsearch or OpenSearch.
Use this mode when you want a complete integration with log viewing in dstack:
Logs are shipped to Fluent-bit and can be read back through the `dstack` UI and CLI via Elasticsearch or OpenSearch.
Use this mode when you want a complete integration with log viewing in `dstack`:

```shell
$ DSTACK_SERVER_FLUENTBIT_HOST=fluentbit.example.com \
Expand All @@ -244,7 +244,7 @@ Fluent-bit supports two modes depending on how you want to access logs.
The dstack UI/CLI will show empty logs. Use this mode when:

- You have an existing logging infrastructure (Kibana, Grafana, Datadog, etc.)
- You only need to forward logs without reading them back through dstack
- You only need to forward logs without reading them back through `dstack`
- You want to reduce operational complexity by not running Elasticsearch/OpenSearch

```shell
Expand Down
11 changes: 8 additions & 3 deletions src/dstack/_internal/server/services/logs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from typing import List, Optional
from uuid import UUID

from dstack._internal.core.errors import ServerClientError
from dstack._internal.core.models.logs import JobSubmissionLogs
from dstack._internal.server import settings
from dstack._internal.server.models import ProjectModel
Expand Down Expand Up @@ -105,9 +106,13 @@ def write_logs(


async def poll_logs_async(project: ProjectModel, request: PollLogsRequest) -> JobSubmissionLogs:
job_submission_logs = await run_async(
get_log_storage().poll_logs, project=project, request=request
)
try:
job_submission_logs = await run_async(
get_log_storage().poll_logs, project=project, request=request
)
except LogStorageError as e:
logger.error("Failed to poll logs from log storage: %s", repr(e))
raise ServerClientError("Failed to poll logs from log storage")
# Logs are stored in plaintext but transmitted in base64 for API/CLI backward compatibility.
# Old logs stored in base64 are encoded twice for transmission and shown as base64 in CLI/UI.
# We live with that.
Expand Down
13 changes: 2 additions & 11 deletions src/dstack/_internal/server/services/logs/fluentbit.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from datetime import datetime
from typing import List, Optional, Protocol
from uuid import UUID

Expand Down Expand Up @@ -99,7 +100,6 @@ def read(
try:
response = self._client.search(**search_params)
except ElasticsearchError as e:
logger.error("Elasticsearch/OpenSearch search error: %s", e)
raise LogStorageError(f"Elasticsearch/OpenSearch error: {e}") from e

hits = response.get("hits", {}).get("hits", [])
Expand All @@ -112,8 +112,6 @@ def read(
message = source.get("message", "")

if timestamp_str:
from datetime import datetime

try:
timestamp = datetime.fromisoformat(timestamp_str.replace("Z", "+00:00"))
except ValueError:
Expand Down Expand Up @@ -180,16 +178,10 @@ def write(self, tag: str, records: List[dict]) -> None:
)
response.raise_for_status()
except httpx.HTTPStatusError as e:
logger.error(
"Fluent-bit HTTP request failed with status %d: %s",
e.response.status_code,
e.response.text,
)
raise LogStorageError(
f"Fluent-bit HTTP error: status {e.response.status_code}"
) from e
except httpx.HTTPError as e:
logger.error("Failed to write log to Fluent-bit via HTTP: %s", e)
raise LogStorageError(f"Fluent-bit HTTP error: {e}") from e

def close(self) -> None:
Expand All @@ -206,7 +198,6 @@ def write(self, tag: str, records: List[dict]) -> None:
for record in records:
if not self._sender.emit(tag, record):
error = self._sender.last_error
logger.error("Failed to write log to Fluent-bit via Forward: %s", error)
self._sender.clear_last_error()
raise LogStorageError(f"Fluent-bit Forward error: {error}")

Expand Down Expand Up @@ -271,7 +262,7 @@ def __init__(
index=es_index,
api_key=es_api_key,
)
logger.debug(
logger.info(
"Fluent-bit log storage initialized with Elasticsearch/OpenSearch reader"
)
else:
Expand Down