Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
757ee12
feat: Span streaming & new span API
sentrivana Jan 15, 2026
705790a
More refactor, fixing some types
sentrivana Jan 15, 2026
b3f4f98
Cant use sentry_sdk.trace as that already exists
sentrivana Jan 15, 2026
758cdf3
Merge branch 'master' into feat/span-first-2
sentrivana Jan 15, 2026
97cf291
bubu
sentrivana Jan 15, 2026
0d2097b
dsc, sampling
sentrivana Jan 15, 2026
b01caab
.
sentrivana Jan 16, 2026
e946df6
.
sentrivana Jan 16, 2026
c47a0d0
fix? some types
sentrivana Jan 16, 2026
131f61c
safeguards, some type fixes
sentrivana Jan 16, 2026
40878e3
.
sentrivana Jan 16, 2026
3f985c4
move to traces.py
sentrivana Jan 20, 2026
d9874db
fix multiple envelopes being sent from the batcher
sentrivana Jan 20, 2026
a173352
send outside of lock
sentrivana Jan 20, 2026
fcb8ae5
.
sentrivana Jan 20, 2026
bc2fd79
old py
sentrivana Jan 20, 2026
73c6b73
more old py
sentrivana Jan 20, 2026
8519669
Merge branch 'master' into feat/span-first-2
sentrivana Jan 21, 2026
d40367d
trace propagation
sentrivana Jan 22, 2026
89d6084
fix tracing utils
sentrivana Jan 22, 2026
0e8ab89
move stuff around
sentrivana Jan 22, 2026
4b5c205
profiler, source, op
sentrivana Jan 22, 2026
546c2f0
prepare for profiler changes
sentrivana Jan 22, 2026
9b2b592
source, segment attrs/props
sentrivana Jan 22, 2026
0e198f2
add todo
sentrivana Jan 22, 2026
dca0870
profiler fixes, asgi first pass, sampling on start
sentrivana Jan 22, 2026
68cf5d2
.
sentrivana Jan 22, 2026
dedfaf0
starlette
sentrivana Jan 22, 2026
3ca46dc
asgi fixes, set_origin
sentrivana Jan 22, 2026
6673978
stdlib
sentrivana Jan 22, 2026
e72d3cd
httpx fix
sentrivana Jan 22, 2026
016c341
sqlalchemy
sentrivana Jan 22, 2026
802c7e9
fix
sentrivana Jan 22, 2026
7c35261
ctx mng things
sentrivana Jan 22, 2026
3f2747c
fix
sentrivana Jan 22, 2026
be9094b
mypy
sentrivana Jan 22, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 137 additions & 0 deletions sentry_sdk/_span_batcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
import threading
from collections import defaultdict
from datetime import datetime, timezone
from typing import TYPE_CHECKING

from sentry_sdk._batcher import Batcher
from sentry_sdk.consts import SPANSTATUS
from sentry_sdk.envelope import Envelope, Item, PayloadRef
from sentry_sdk.utils import format_timestamp, serialize_attribute, safe_repr

if TYPE_CHECKING:
from typing import Any, Callable, Optional
from sentry_sdk.traces import SpanStatus, StreamedSpan
from sentry_sdk._types import SerializedAttributeValue


class SpanBatcher(Batcher["StreamedSpan"]):
# TODO[span-first]: size-based flushes
# TODO[span-first]: adjust flush/drop defaults
MAX_BEFORE_FLUSH = 1000
MAX_BEFORE_DROP = 5000
FLUSH_WAIT_TIME = 5.0

TYPE = "span"
CONTENT_TYPE = "application/vnd.sentry.items.span.v2+json"

def __init__(
self,
capture_func: "Callable[[Envelope], None]",
record_lost_func: "Callable[..., None]",
) -> None:
# Spans from different traces cannot be emitted in the same envelope
# since the envelope contains a shared trace header. That's why we bucket
# by trace_id, so that we can then send the buckets each in its own
# envelope.
# trace_id -> span buffer
self._span_buffer: dict[str, list["StreamedSpan"]] = defaultdict(list)
self._capture_func = capture_func
self._record_lost_func = record_lost_func
self._running = True
self._lock = threading.Lock()

self._flush_event: "threading.Event" = threading.Event()

self._flusher: "Optional[threading.Thread]" = None
self._flusher_pid: "Optional[int]" = None

def get_size(self) -> int:
# caller is responsible for locking before checking this
return sum(len(buffer) for buffer in self._span_buffer.values())

def add(self, span: "StreamedSpan") -> None:
if not self._ensure_thread() or self._flusher is None:
return None

with self._lock:
size = self.get_size()
if size >= self.MAX_BEFORE_DROP:
self._record_lost_func(
reason="queue_overflow",
data_category="span",
quantity=1,
)
return None

self._span_buffer[span.trace_id].append(span)
if size + 1 >= self.MAX_BEFORE_FLUSH:
self._flush_event.set()

@staticmethod
def _to_transport_format(item: "StreamedSpan") -> "Any":
res: "dict[str, Any]" = {
"trace_id": item.trace_id,
"span_id": item.span_id,
"name": item.get_name(),
"status": item.status.value,
"is_segment": item.is_segment(),
"start_timestamp": item.start_timestamp.timestamp(), # TODO[span-first]
}

if item.timestamp:
# this is here to make mypy happy
res["end_timestamp"] = item.timestamp.timestamp()

if item.parent_span_id:
res["parent_span_id"] = item.parent_span_id

if item.attributes:
res["attributes"] = {
k: serialize_attribute(v) for (k, v) in item.attributes.items()
}

return res

def _flush(self) -> None:
with self._lock:
if len(self._span_buffer) == 0:
return None

envelopes = []
for trace_id, spans in self._span_buffer.items():
if spans:
for span in spans:
print(span.name)
dsc = spans[0].dynamic_sampling_context()

envelope = Envelope(
headers={
"sent_at": format_timestamp(datetime.now(timezone.utc)),
"trace": dsc,
}
)

envelope.add_item(
Item(
type="span",
content_type="application/vnd.sentry.items.span.v2+json",
headers={
"item_count": len(spans),
},
payload=PayloadRef(
json={
"items": [
self._to_transport_format(span)
for span in spans
]
}
),
)
)

envelopes.append(envelope)

self._span_buffer.clear()

for envelope in envelopes:
self._capture_func(envelope)
15 changes: 14 additions & 1 deletion sentry_sdk/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from sentry_sdk.consts import INSTRUMENTER
from sentry_sdk.scope import Scope, _ScopeManager, new_scope, isolation_scope
from sentry_sdk.tracing import NoOpSpan, Transaction, trace
from sentry_sdk.traces import StreamedSpan
from sentry_sdk.crons import monitor

from typing import TYPE_CHECKING
Expand Down Expand Up @@ -385,7 +386,9 @@ def set_measurement(name: str, value: float, unit: "MeasurementUnit" = "") -> No
transaction.set_measurement(name, value, unit)


def get_current_span(scope: "Optional[Scope]" = None) -> "Optional[Span]":
def get_current_span(
scope: "Optional[Scope]" = None,
) -> "Optional[Union[Span, StreamedSpan]]":
"""
Returns the currently active span if there is one running, otherwise `None`
"""
Expand Down Expand Up @@ -501,6 +504,16 @@ def update_current_span(
if current_span is None:
return

if isinstance(current_span, StreamedSpan):
warnings.warn(
"The `update_current_span` API isn't available in streaming mode. "
"Retrieve the current span with get_current_span() and use its API "
"directly.",
DeprecationWarning,
stacklevel=2,
)
return

if op is not None:
current_span.op = op

Expand Down
29 changes: 28 additions & 1 deletion sentry_sdk/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import sentry_sdk
from sentry_sdk._compat import PY37, check_uwsgi_thread_support
from sentry_sdk._metrics_batcher import MetricsBatcher
from sentry_sdk._span_batcher import SpanBatcher
from sentry_sdk.utils import (
AnnotatedValue,
ContextVar,
Expand All @@ -31,6 +32,7 @@
)
from sentry_sdk.serializer import serialize
from sentry_sdk.tracing import trace
from sentry_sdk.tracing_utils import has_span_streaming_enabled
from sentry_sdk.transport import BaseHttpTransport, make_transport
from sentry_sdk.consts import (
SPANDATA,
Expand Down Expand Up @@ -67,6 +69,7 @@
from sentry_sdk.scope import Scope
from sentry_sdk.session import Session
from sentry_sdk.spotlight import SpotlightClient
from sentry_sdk.traces import StreamedSpan
from sentry_sdk.transport import Transport, Item
from sentry_sdk._log_batcher import LogBatcher
from sentry_sdk._metrics_batcher import MetricsBatcher
Expand Down Expand Up @@ -188,6 +191,7 @@ def __init__(self, options: "Optional[Dict[str, Any]]" = None) -> None:
self.monitor: "Optional[Monitor]" = None
self.log_batcher: "Optional[LogBatcher]" = None
self.metrics_batcher: "Optional[MetricsBatcher]" = None
self.span_batcher: "Optional[SpanBatcher]" = None
self.integrations: "dict[str, Integration]" = {}

def __getstate__(self, *args: "Any", **kwargs: "Any") -> "Any":
Expand Down Expand Up @@ -224,6 +228,9 @@ def _capture_log(self, log: "Log", scope: "Scope") -> None:
def _capture_metric(self, metric: "Metric", scope: "Scope") -> None:
pass

def _capture_span(self, span: "StreamedSpan", scope: "Scope") -> None:
pass

def capture_session(self, *args: "Any", **kwargs: "Any") -> None:
return None

Expand Down Expand Up @@ -399,6 +406,13 @@ def _record_lost_event(
record_lost_func=_record_lost_event,
)

self.span_batcher = None
if has_span_streaming_enabled(self.options):
self.span_batcher = SpanBatcher(
capture_func=_capture_envelope,
record_lost_func=_record_lost_event,
)

max_request_body_size = ("always", "never", "small", "medium")
if self.options["max_request_body_size"] not in max_request_body_size:
raise ValueError(
Expand Down Expand Up @@ -909,7 +923,10 @@ def capture_event(
return return_value

def _capture_telemetry(
self, telemetry: "Optional[Union[Log, Metric]]", ty: str, scope: "Scope"
self,
telemetry: "Optional[Union[Log, Metric, StreamedSpan]]",
ty: str,
scope: "Scope",
) -> None:
# Capture attributes-based telemetry (logs, metrics, spansV2)
if telemetry is None:
Expand All @@ -922,6 +939,7 @@ def _capture_telemetry(
before_send = get_before_send_log(self.options)
elif ty == "metric":
before_send = get_before_send_metric(self.options) # type: ignore
# no before_send for spans

if before_send is not None:
telemetry = before_send(telemetry, {}) # type: ignore
Expand All @@ -934,6 +952,8 @@ def _capture_telemetry(
batcher = self.log_batcher
elif ty == "metric":
batcher = self.metrics_batcher # type: ignore
elif ty == "span":
batcher = self.span_batcher # type: ignore

if batcher is not None:
batcher.add(telemetry) # type: ignore
Expand All @@ -944,6 +964,9 @@ def _capture_log(self, log: "Optional[Log]", scope: "Scope") -> None:
def _capture_metric(self, metric: "Optional[Metric]", scope: "Scope") -> None:
self._capture_telemetry(metric, "metric", scope)

def _capture_span(self, span: "Optional[StreamedSpan]", scope: "Scope") -> None:
self._capture_telemetry(span, "span", scope)

def capture_session(
self,
session: "Session",
Expand Down Expand Up @@ -993,6 +1016,8 @@ def close(
self.log_batcher.kill()
if self.metrics_batcher is not None:
self.metrics_batcher.kill()
if self.span_batcher is not None:
self.span_batcher.kill()
if self.monitor:
self.monitor.kill()
self.transport.kill()
Expand All @@ -1018,6 +1043,8 @@ def flush(
self.log_batcher.flush()
if self.metrics_batcher is not None:
self.metrics_batcher.flush()
if self.span_batcher is not None:
self.span_batcher.flush()
self.transport.flush(timeout=timeout, callback=callback)

def __enter__(self) -> "_Client":
Expand Down
1 change: 1 addition & 0 deletions sentry_sdk/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ class CompressionAlgo(Enum):
"before_send_log": Optional[Callable[[Log, Hint], Optional[Log]]],
"enable_metrics": Optional[bool],
"before_send_metric": Optional[Callable[[Metric, Hint], Optional[Metric]]],
"trace_lifecycle": Optional[Literal["static", "stream"]],
},
total=False,
)
Expand Down
2 changes: 2 additions & 0 deletions sentry_sdk/envelope.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,8 @@ def data_category(self) -> "EventDataCategory":
return "session"
elif ty == "attachment":
return "attachment"
elif ty == "span":
return "span"
elif ty == "transaction":
return "transaction"
elif ty == "event":
Expand Down
Loading
Loading