diff --git a/Cargo.lock b/Cargo.lock index 65a4bfdbeb0..f1f46a4d264 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2870,6 +2870,7 @@ dependencies = [ "object_store", "parking_lot", "petgraph 0.8.3", + "portable-atomic", "priority-queue", "prometheus", "prost", diff --git a/core/src/subgraph/context/mod.rs b/core/src/subgraph/context/mod.rs index e6f485e2552..2b7d560dfc1 100644 --- a/core/src/subgraph/context/mod.rs +++ b/core/src/subgraph/context/mod.rs @@ -23,7 +23,9 @@ use graph::{ slog::Logger, }; use std::collections::HashMap; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; + +use graph::parking_lot::RwLock; use tokio::sync::mpsc; use self::instance::SubgraphInstance; @@ -44,18 +46,18 @@ impl SubgraphKeepAlive { } pub fn remove(&self, deployment_id: &DeploymentId) { - self.alive_map.write().unwrap().remove(deployment_id); + self.alive_map.write().remove(deployment_id); self.sg_metrics.running_count.dec(); } pub fn insert(&self, deployment_id: DeploymentId, guard: CancelGuard) { - let old = self.alive_map.write().unwrap().insert(deployment_id, guard); + let old = self.alive_map.write().insert(deployment_id, guard); if old.is_none() { self.sg_metrics.running_count.inc(); } } pub fn contains(&self, deployment_id: &DeploymentId) -> bool { - self.alive_map.read().unwrap().contains_key(deployment_id) + self.alive_map.read().contains_key(deployment_id) } } diff --git a/docs/environment-variables.md b/docs/environment-variables.md index 560e5fe87a4..9c4fc5dc8b4 100644 --- a/docs/environment-variables.md +++ b/docs/environment-variables.md @@ -216,6 +216,11 @@ those. decisions. Set to `true` to turn simulation on, defaults to `false` - `GRAPH_STORE_CONNECTION_TIMEOUT`: How long to wait to connect to a database before assuming the database is down in ms. Defaults to 5000ms. +- `GRAPH_STORE_CONNECTION_UNAVAILABLE_RETRY`: When a database shard is marked + unavailable due to connection timeouts, this controls how often to allow a + single probe request through to check if the database has recovered. Only one + request per interval will attempt a connection; all others fail instantly. + Value is in seconds and defaults to 2s. - `EXPERIMENTAL_SUBGRAPH_VERSION_SWITCHING_MODE`: default is `instant`, set to `synced` to only switch a named subgraph to a new deployment once it has synced, making the new deployment the "Pending" version. diff --git a/graph/Cargo.toml b/graph/Cargo.toml index 33cfbd40eb0..7fa31765100 100644 --- a/graph/Cargo.toml +++ b/graph/Cargo.toml @@ -79,6 +79,7 @@ futures03 = { version = "0.3.31", package = "futures", features = ["compat"] } wasmparser = "0.118.1" thiserror = { workspace = true } parking_lot = "0.12.5" +portable-atomic = { version = "1.11", features = ["fallback"] } itertools = "0.14.0" defer = "0.2" diff --git a/graph/src/components/metrics/registry.rs b/graph/src/components/metrics/registry.rs index cb210040952..4777ea6f62f 100644 --- a/graph/src/components/metrics/registry.rs +++ b/graph/src/components/metrics/registry.rs @@ -1,5 +1,7 @@ use std::collections::HashMap; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; + +use crate::parking_lot::RwLock; use prometheus::{labels, Histogram, IntCounterVec}; use prometheus::{IntCounter, IntGauge}; @@ -109,14 +111,13 @@ impl MetricsRegistry { }; let counters = CounterVec::new(opts, variable_labels)?; let id = counters.desc().first().unwrap().id; - let maybe_counter = self.global_counter_vecs.read().unwrap().get(&id).cloned(); + let maybe_counter = self.global_counter_vecs.read().get(&id).cloned(); if let Some(counters) = maybe_counter { Ok(counters) } else { self.register(name, Box::new(counters.clone())); self.global_counter_vecs .write() - .unwrap() .insert(id, counters.clone()); Ok(counters) } @@ -161,15 +162,12 @@ impl MetricsRegistry { ) -> Result { let counter = counter_with_labels(name, help, const_labels)?; let id = counter.desc().first().unwrap().id; - let maybe_counter = self.global_counters.read().unwrap().get(&id).cloned(); + let maybe_counter = self.global_counters.read().get(&id).cloned(); if let Some(counter) = maybe_counter { Ok(counter) } else { self.register(name, Box::new(counter.clone())); - self.global_counters - .write() - .unwrap() - .insert(id, counter.clone()); + self.global_counters.write().insert(id, counter.clone()); Ok(counter) } } @@ -210,15 +208,12 @@ impl MetricsRegistry { ) -> Result { let gauge = gauge_with_labels(name, help, const_labels)?; let id = gauge.desc().first().unwrap().id; - let maybe_gauge = self.global_gauges.read().unwrap().get(&id).cloned(); + let maybe_gauge = self.global_gauges.read().get(&id).cloned(); if let Some(gauge) = maybe_gauge { Ok(gauge) } else { self.register(name, Box::new(gauge.clone())); - self.global_gauges - .write() - .unwrap() - .insert(id, gauge.clone()); + self.global_gauges.write().insert(id, gauge.clone()); Ok(gauge) } } @@ -232,15 +227,12 @@ impl MetricsRegistry { let opts = Opts::new(name, help); let gauges = GaugeVec::new(opts, variable_labels)?; let id = gauges.desc().first().unwrap().id; - let maybe_gauge = self.global_gauge_vecs.read().unwrap().get(&id).cloned(); + let maybe_gauge = self.global_gauge_vecs.read().get(&id).cloned(); if let Some(gauges) = maybe_gauge { Ok(gauges) } else { self.register(name, Box::new(gauges.clone())); - self.global_gauge_vecs - .write() - .unwrap() - .insert(id, gauges.clone()); + self.global_gauge_vecs.write().insert(id, gauges.clone()); Ok(gauges) } } @@ -254,14 +246,13 @@ impl MetricsRegistry { let opts = HistogramOpts::new(name, help); let histograms = HistogramVec::new(opts, variable_labels)?; let id = histograms.desc().first().unwrap().id; - let maybe_histogram = self.global_histogram_vecs.read().unwrap().get(&id).cloned(); + let maybe_histogram = self.global_histogram_vecs.read().get(&id).cloned(); if let Some(histograms) = maybe_histogram { Ok(histograms) } else { self.register(name, Box::new(histograms.clone())); self.global_histogram_vecs .write() - .unwrap() .insert(id, histograms.clone()); Ok(histograms) } diff --git a/graph/src/components/store/mod.rs b/graph/src/components/store/mod.rs index 77675967c25..a9b65e01f9a 100644 --- a/graph/src/components/store/mod.rs +++ b/graph/src/components/store/mod.rs @@ -25,7 +25,7 @@ use std::collections::{BTreeMap, BTreeSet, HashSet}; use std::fmt; use std::fmt::Display; use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; use std::time::Duration; use async_trait::async_trait; @@ -42,7 +42,7 @@ use crate::env::ENV_VARS; use crate::internal_error; use crate::prelude::{s, Attribute, DeploymentHash, ValueType}; use crate::schema::{ast as sast, EntityKey, EntityType, InputSchema}; -use crate::util::stats::MovingStats; +use crate::util::stats::AtomicMovingStats; #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct EntityFilterDerivative(bool); @@ -742,8 +742,8 @@ impl Display for DeploymentLocator { } // The type that the connection pool uses to track wait times for -// connection checkouts -pub type PoolWaitStats = Arc>; +// connection checkouts. Uses lock-free atomic operations internally. +pub type PoolWaitStats = Arc; /// Determines which columns should be selected in a table. #[derive(Clone, Debug, PartialEq, Eq, Hash)] diff --git a/graph/src/data/graphql/load_manager.rs b/graph/src/data/graphql/load_manager.rs index b8bdb4a63d0..e1053360e18 100644 --- a/graph/src/data/graphql/load_manager.rs +++ b/graph/src/data/graphql/load_manager.rs @@ -4,9 +4,11 @@ use prometheus::core::GenericCounter; use rand::{prelude::Rng, rng}; use std::collections::{HashMap, HashSet}; use std::iter::FromIterator; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; use std::time::{Duration, Instant}; +use crate::parking_lot::RwLock; + use crate::components::metrics::{Counter, GaugeVec, MetricsRegistry}; use crate::components::store::{DeploymentId, PoolWaitStats}; use crate::data::graphql::shape_hash::shape_hash; @@ -57,7 +59,7 @@ impl ShardEffort { } pub fn add(&self, shard: &str, qref: QueryRef, duration: Duration, gauge: &GaugeVec) { - let mut inner = self.inner.write().unwrap(); + let mut inner = self.inner.write(); inner.add(qref, duration); gauge .with_label_values(&[shard]) @@ -70,7 +72,7 @@ impl ShardEffort { /// data for the particular query, return `None` as the effort /// for the query pub fn current_effort(&self, qref: &QueryRef) -> (Option, Duration) { - let inner = self.inner.read().unwrap(); + let inner = self.inner.read(); let total_effort = inner.total.duration(); let query_effort = inner.effort.get(qref).map(|stats| stats.duration()); (query_effort, total_effort) @@ -381,7 +383,7 @@ impl LoadManager { let qref = QueryRef::new(deployment, shape_hash); - if self.jailed_queries.read().unwrap().contains(&qref) { + if self.jailed_queries.read().contains(&qref) { return if ENV_VARS.load_simulate { Proceed } else { @@ -426,7 +428,7 @@ impl LoadManager { "query_effort_ms" => query_effort, "total_effort_ms" => total_effort, "ratio" => format!("{:.4}", query_effort/total_effort)); - self.jailed_queries.write().unwrap().insert(qref); + self.jailed_queries.write().insert(qref); return if ENV_VARS.load_simulate { Proceed } else { @@ -457,7 +459,7 @@ impl LoadManager { } fn overloaded(&self, wait_stats: &PoolWaitStats) -> (bool, Duration) { - let store_avg = wait_stats.read().unwrap().average(); + let store_avg = wait_stats.average(); let overloaded = store_avg .map(|average| average > ENV_VARS.load_threshold) .unwrap_or(false); @@ -465,7 +467,7 @@ impl LoadManager { } fn kill_state(&self, shard: &str) -> (f64, Instant) { - let state = self.kill_state.get(shard).unwrap().read().unwrap(); + let state = self.kill_state.get(shard).unwrap().read(); (state.kill_rate, state.last_update) } @@ -505,7 +507,7 @@ impl LoadManager { kill_rate = (kill_rate - KILL_RATE_STEP_DOWN).max(0.0); } let event = { - let mut state = self.kill_state.get(shard).unwrap().write().unwrap(); + let mut state = self.kill_state.get(shard).unwrap().write(); state.kill_rate = kill_rate; state.last_update = now; state.log_event(now, kill_rate, overloaded) diff --git a/graph/src/env/store.rs b/graph/src/env/store.rs index 7a3c16d5b81..cc29cb107bf 100644 --- a/graph/src/env/store.rs +++ b/graph/src/env/store.rs @@ -163,6 +163,20 @@ pub struct EnvVarsStore { /// Disables storing or reading `eth_call` results from the store call cache. /// Set by `GRAPH_STORE_DISABLE_CALL_CACHE`. Defaults to false. pub disable_call_cache: bool, + /// Set by `GRAPH_STORE_DISABLE_CHAIN_HEAD_PTR_CACHE`. Default is false. + /// Set to true to disable chain_head_ptr caching (safety escape hatch). + pub disable_chain_head_ptr_cache: bool, + /// Minimum idle time before running connection health check (SELECT 67). + /// Connections used more recently than this threshold skip validation. + /// Set to 0 to always validate (previous behavior). + /// Set by `GRAPH_STORE_CONNECTION_VALIDATION_IDLE_SECS`. Default is 30 seconds. + pub connection_validation_idle_secs: Duration, + /// When a database shard is marked unavailable due to connection timeouts, + /// this controls how often to allow a single probe request through to check + /// if the database has recovered. Only one request per interval will attempt + /// a connection; all others fail instantly with DatabaseUnavailable. + /// Set by `GRAPH_STORE_CONNECTION_UNAVAILABLE_RETRY`. Default is 2 seconds. + pub connection_unavailable_retry: Duration, } // This does not print any values avoid accidentally leaking any sensitive env vars @@ -224,6 +238,11 @@ impl TryFrom for EnvVarsStore { account_like_min_versions_count: x.account_like_min_versions_count, account_like_max_unique_ratio: x.account_like_max_unique_ratio.map(|r| r.0), disable_call_cache: x.disable_call_cache, + disable_chain_head_ptr_cache: x.disable_chain_head_ptr_cache, + connection_validation_idle_secs: Duration::from_secs(x.connection_validation_idle_secs), + connection_unavailable_retry: Duration::from_secs( + x.connection_unavailable_retry_in_secs, + ), }; if let Some(timeout) = vars.batch_timeout { if timeout < 2 * vars.batch_target_duration { @@ -331,6 +350,12 @@ pub struct InnerStore { account_like_max_unique_ratio: Option, #[envconfig(from = "GRAPH_STORE_DISABLE_CALL_CACHE", default = "false")] disable_call_cache: bool, + #[envconfig(from = "GRAPH_STORE_DISABLE_CHAIN_HEAD_PTR_CACHE", default = "false")] + disable_chain_head_ptr_cache: bool, + #[envconfig(from = "GRAPH_STORE_CONNECTION_VALIDATION_IDLE_SECS", default = "30")] + connection_validation_idle_secs: u64, + #[envconfig(from = "GRAPH_STORE_CONNECTION_UNAVAILABLE_RETRY", default = "2")] + connection_unavailable_retry_in_secs: u64, } #[derive(Clone, Copy, Debug)] diff --git a/graph/src/lib.rs b/graph/src/lib.rs index 0607cab5937..e076d64c736 100644 --- a/graph/src/lib.rs +++ b/graph/src/lib.rs @@ -170,7 +170,7 @@ pub mod prelude { pub use crate::log::split::split_logger; pub use crate::util::cache_weight::CacheWeight; pub use crate::util::futures::{retry, TimeoutError}; - pub use crate::util::stats::MovingStats; + pub use crate::util::stats::{AtomicMovingStats, MovingStats}; macro_rules! static_graphql { ($m:ident, $m2:ident, {$($n:ident,)*}) => { diff --git a/graph/src/util/stats.rs b/graph/src/util/stats.rs index ac608b56dcb..ccfae84574b 100644 --- a/graph/src/util/stats.rs +++ b/graph/src/util/stats.rs @@ -1,6 +1,8 @@ use std::collections::VecDeque; +use std::sync::atomic::Ordering; use std::time::{Duration, Instant}; +use portable_atomic::AtomicU128; use prometheus::Gauge; use crate::prelude::ENV_VARS; @@ -166,6 +168,196 @@ impl MovingStats { } } +/// Packed bin for atomic operations: epoch (32 bits) | count (32 bits) | duration_nanos (64 bits) +/// Fits in a single AtomicU128 for lock-free CAS updates. +#[repr(transparent)] +struct PackedBin(AtomicU128); + +impl PackedBin { + fn new() -> Self { + Self(AtomicU128::new(0)) + } + + /// Pack epoch, count, and duration into a single u128 value. + fn pack(epoch: u32, count: u32, duration_nanos: u64) -> u128 { + ((epoch as u128) << 96) | ((count as u128) << 64) | (duration_nanos as u128) + } + + /// Unpack a u128 value into (epoch, count, duration_nanos). + fn unpack(packed: u128) -> (u32, u32, u64) { + let epoch = (packed >> 96) as u32; + let count = (packed >> 64) as u32; + let duration_nanos = packed as u64; + (epoch, count, duration_nanos) + } +} + +/// Lock-free moving statistics using an epoch-based ring buffer. +/// +/// This is a thread-safe, lock-free alternative to `MovingStats` that uses +/// atomic operations instead of locks. It tracks durations over a sliding +/// time window, storing values in fixed-size bins. +/// +/// Writers use CAS loops to atomically update bins, while readers can +/// scan all bins without blocking writers. +pub struct AtomicMovingStats { + start_time: Instant, + bin_size: Duration, + bins: Box<[PackedBin]>, +} + +impl Default for AtomicMovingStats { + fn default() -> Self { + Self::new(ENV_VARS.load_window_size, ENV_VARS.load_bin_size) + } +} + +impl AtomicMovingStats { + /// Create a new AtomicMovingStats with the given window and bin sizes. + /// + /// # Panics + /// + /// Panics if `window_size` or `bin_size` is `0`, or if `bin_size` >= `window_size` + pub fn new(window_size: Duration, bin_size: Duration) -> Self { + assert!(window_size.as_millis() > 0); + assert!(bin_size.as_millis() > 0); + assert!(window_size > bin_size); + + let num_bins = window_size.as_millis() as usize / bin_size.as_millis() as usize; + let bins: Vec = (0..num_bins).map(|_| PackedBin::new()).collect(); + + Self { + start_time: Instant::now(), + bin_size, + bins: bins.into_boxed_slice(), + } + } + + /// Calculate the epoch number for a given instant. + fn epoch_at(&self, now: Instant) -> u32 { + let elapsed = now.saturating_duration_since(self.start_time); + (elapsed.as_millis() / self.bin_size.as_millis()) as u32 + } + + /// Add a duration measurement at the current time. + pub fn add(&self, duration: Duration) { + self.add_at(Instant::now(), duration); + } + + /// Add a duration measurement at a specific time. + /// + /// Note: It is expected that subsequent calls to `add_at` happen with + /// monotonically increasing `now` values for optimal accuracy. + pub fn add_at(&self, now: Instant, duration: Duration) { + let current_epoch = self.epoch_at(now); + let bin_idx = current_epoch as usize % self.bins.len(); + let bin = &self.bins[bin_idx]; + let duration_nanos = duration.as_nanos() as u64; + + loop { + let current = bin.0.load(Ordering::Acquire); + let (bin_epoch, count, total_nanos) = PackedBin::unpack(current); + + let new_packed = if bin_epoch == current_epoch { + // Same epoch: increment existing values + PackedBin::pack( + current_epoch, + count.saturating_add(1), + total_nanos.saturating_add(duration_nanos), + ) + } else { + // Stale epoch: reset bin with new measurement + PackedBin::pack(current_epoch, 1, duration_nanos) + }; + + match bin.0.compare_exchange_weak( + current, + new_packed, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => break, + Err(_) => continue, // CAS failed, retry + } + } + } + + /// Calculate the average duration over the current window. + /// + /// Returns `None` if no measurements have been recorded in the window. + pub fn average(&self) -> Option { + self.average_at(Instant::now()) + } + + /// Calculate the average duration at a specific time. + fn average_at(&self, now: Instant) -> Option { + let current_epoch = self.epoch_at(now); + let num_bins = self.bins.len() as u32; + let mut total_count: u64 = 0; + let mut total_nanos: u128 = 0; + + for bin in self.bins.iter() { + let (bin_epoch, count, duration_nanos) = + PackedBin::unpack(bin.0.load(Ordering::Acquire)); + // Valid if within window (handles epoch wraparound) + if current_epoch.wrapping_sub(bin_epoch) < num_bins { + total_count += count as u64; + total_nanos += duration_nanos as u128; + } + } + + if total_count > 0 { + Some(Duration::from_nanos( + (total_nanos / total_count as u128) as u64, + )) + } else { + None + } + } + + /// Return `true` if the average of measurements within the window + /// is above `duration`. + pub fn average_gt(&self, duration: Duration) -> bool { + self.average().map(|avg| avg > duration).unwrap_or(false) + } + + /// Return `true` if the average at a specific time is above `duration`. + #[cfg(test)] + fn average_gt_at(&self, now: Instant, duration: Duration) -> bool { + self.average_at(now) + .map(|avg| avg > duration) + .unwrap_or(false) + } + + /// Return the total duration recorded in the current window. + pub fn duration(&self) -> Duration { + self.duration_at(Instant::now()) + } + + /// Return the total duration at a specific time. + fn duration_at(&self, now: Instant) -> Duration { + let current_epoch = self.epoch_at(now); + let num_bins = self.bins.len() as u32; + let mut total_nanos: u128 = 0; + + for bin in self.bins.iter() { + let (bin_epoch, _, duration_nanos) = PackedBin::unpack(bin.0.load(Ordering::Acquire)); + if current_epoch.wrapping_sub(bin_epoch) < num_bins { + total_nanos += duration_nanos as u128; + } + } + + Duration::from_nanos(total_nanos as u64) + } + + /// Adds `duration` to the stats, and register the average ms to `avg_gauge`. + pub fn add_and_register(&self, duration: Duration, avg_gauge: &Gauge) { + self.add(duration); + let wait_avg = self.average().map(|avg| avg.as_millis()).unwrap_or(0); + avg_gauge.set(wait_avg as f64); + } +} + #[cfg(test)] mod tests { use super::*; @@ -219,4 +411,77 @@ mod tests { assert_eq!(20, stats.total.count); assert_eq!(Duration::from_secs(5 * 86 + 16 * 10), stats.total.duration); } + + #[test] + fn atomic_add_one_const() { + let stats = AtomicMovingStats::new(Duration::from_secs(5), Duration::from_secs(1)); + let start = stats.start_time; + for i in 0..10 { + stats.add_at(start + Duration::from_secs(i), Duration::from_secs(1)); + } + // After 10 seconds with 5-second window, only last 5 entries are valid + assert_eq!(5, stats.bins.len()); + // Query at time 10 seconds (end of data range) + let query_time = start + Duration::from_secs(10); + // Average should be 1 second + let avg = stats.average_at(query_time).unwrap(); + assert_eq!(Duration::from_secs(1), avg); + assert!(stats.average_gt_at(query_time, Duration::from_millis(900))); + assert!(!stats.average_gt_at(query_time, Duration::from_secs(1))); + } + + #[test] + fn atomic_add_four_linear() { + let stats = AtomicMovingStats::new(Duration::from_secs(5), Duration::from_secs(1)); + let start = stats.start_time; + for i in 0..40u64 { + stats.add_at( + start + Duration::from_millis(250 * i), + Duration::from_secs(i), + ); + } + assert_eq!(5, stats.bins.len()); + // Query at time 9.999 seconds (just before epoch 10 to include epoch 5) + // At epoch 9, valid bins are epochs 5-9 (9 - bin_epoch < 5) + let query_time = start + Duration::from_millis(9999); + // Total duration in window: 4 entries per bin, bins 5-9 contain entries 20-39 + // Bin 5: entries 20,21,22,23 -> sum = 86 + // Bin 6: entries 24,25,26,27 -> sum = 102 + // ... + // Total count = 20, total duration = 5*86 + 16*10 = 590 + assert_eq!( + Duration::from_secs(5 * 86 + 16 * 10), + stats.duration_at(query_time) + ); + } + + #[test] + fn atomic_empty_average() { + let stats = AtomicMovingStats::new(Duration::from_secs(5), Duration::from_secs(1)); + // Query at start_time (no data yet) + assert!(stats.average_at(stats.start_time).is_none()); + assert!(!stats.average_gt_at(stats.start_time, Duration::from_secs(1))); + } + + #[test] + fn atomic_pack_unpack() { + // Test edge cases of packing/unpacking + let packed = PackedBin::pack(u32::MAX, u32::MAX, u64::MAX); + let (epoch, count, nanos) = PackedBin::unpack(packed); + assert_eq!(u32::MAX, epoch); + assert_eq!(u32::MAX, count); + assert_eq!(u64::MAX, nanos); + + let packed = PackedBin::pack(0, 0, 0); + let (epoch, count, nanos) = PackedBin::unpack(packed); + assert_eq!(0, epoch); + assert_eq!(0, count); + assert_eq!(0, nanos); + + let packed = PackedBin::pack(12345, 67890, 123456789012345); + let (epoch, count, nanos) = PackedBin::unpack(packed); + assert_eq!(12345, epoch); + assert_eq!(67890, count); + assert_eq!(123456789012345, nanos); + } } diff --git a/graph/src/util/timed_cache.rs b/graph/src/util/timed_cache.rs index 8f64c844630..587a49a176a 100644 --- a/graph/src/util/timed_cache.rs +++ b/graph/src/util/timed_cache.rs @@ -3,10 +3,12 @@ use std::{ cmp::Eq, collections::HashMap, hash::Hash, - sync::{Arc, RwLock}, + sync::Arc, time::{Duration, Instant}, }; +use crate::parking_lot::RwLock; + /// Caching of values for a specified amount of time #[derive(Debug)] struct CacheEntry { @@ -49,7 +51,7 @@ impl TimedCache { K: Borrow + Eq + Hash, Q: Hash + Eq + ?Sized, { - match self.entries.read().unwrap().get(key) { + match self.entries.read().get(key) { Some(CacheEntry { value, expires }) if expires >= &now => Some(value.clone()), _ => None, } @@ -72,11 +74,11 @@ impl TimedCache { value, expires: now + self.ttl, }; - self.entries.write().unwrap().insert(key, entry); + self.entries.write().insert(key, entry); } pub fn clear(&self) { - self.entries.write().unwrap().clear(); + self.entries.write().clear(); } pub fn find(&self, pred: F) -> Option> @@ -85,7 +87,6 @@ impl TimedCache { { self.entries .read() - .unwrap() .values() .find(move |entry| pred(entry.value.as_ref())) .map(|entry| entry.value.clone()) @@ -101,7 +102,6 @@ impl TimedCache { { self.entries .write() - .unwrap() .remove(key) .map(|CacheEntry { value, expires }| (value, expires >= Instant::now())) } diff --git a/store/postgres/src/block_store.rs b/store/postgres/src/block_store.rs index 674c274ac5c..48ea768c033 100644 --- a/store/postgres/src/block_store.rs +++ b/store/postgres/src/block_store.rs @@ -1,8 +1,6 @@ -use std::{ - collections::HashMap, - sync::{Arc, RwLock}, - time::Duration, -}; +use std::{collections::HashMap, sync::Arc, time::Duration}; + +use graph::parking_lot::RwLock; use anyhow::anyhow; use async_trait::async_trait; @@ -321,7 +319,6 @@ impl BlockStore { let configured_chains = block_store .stores .read() - .unwrap() .keys() .cloned() .collect::>(); @@ -410,7 +407,6 @@ impl BlockStore { let store = Arc::new(store); self.stores .write() - .unwrap() .insert(chain.name.clone(), store.clone()); Ok(store) } @@ -475,12 +471,7 @@ impl BlockStore { } async fn store(&self, chain: &str) -> Option> { - let store = self - .stores - .read() - .unwrap() - .get(chain) - .map(CheapClone::cheap_clone); + let store = self.stores.read().get(chain).map(CheapClone::cheap_clone); if store.is_some() { return store; } @@ -506,7 +497,7 @@ impl BlockStore { chain_store.drop_chain().await?; - self.stores.write().unwrap().remove(chain); + self.stores.write().remove(chain); Ok(()) } @@ -516,7 +507,6 @@ impl BlockStore { fn stores(&self) -> Vec> { self.stores .read() - .unwrap() .values() .map(CheapClone::cheap_clone) .collect() diff --git a/store/postgres/src/chain_store.rs b/store/postgres/src/chain_store.rs index cc3b6949fa8..ba2819b7096 100644 --- a/store/postgres/src/chain_store.rs +++ b/store/postgres/src/chain_store.rs @@ -17,6 +17,8 @@ use graph::util::herd_cache::HerdCache; use std::collections::BTreeMap; use std::future::Future; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::time::{Duration, Instant}; use std::{ collections::HashMap, convert::{TryFrom, TryInto}, @@ -1873,6 +1875,10 @@ pub struct ChainStoreMetrics { chain_head_cache_latest_block_num: Box, chain_head_cache_hits: Box, chain_head_cache_misses: Box, + // Metrics for chain_head_ptr() cache + chain_head_ptr_cache_hits: Box, + chain_head_ptr_cache_misses: Box, + chain_head_ptr_cache_block_time_ms: Box, } impl ChainStoreMetrics { @@ -1914,12 +1920,37 @@ impl ChainStoreMetrics { ) .expect("Can't register the counter"); + let chain_head_ptr_cache_hits = registry + .new_counter_vec( + "chain_head_ptr_cache_hits", + "Number of times the chain_head_ptr cache was hit", + vec!["network".to_string()], + ) + .expect("Can't register the counter"); + let chain_head_ptr_cache_misses = registry + .new_counter_vec( + "chain_head_ptr_cache_misses", + "Number of times the chain_head_ptr cache was missed", + vec!["network".to_string()], + ) + .expect("Can't register the counter"); + let chain_head_ptr_cache_block_time_ms = registry + .new_gauge_vec( + "chain_head_ptr_cache_block_time_ms", + "Estimated block time in milliseconds used for adaptive cache TTL", + vec!["network".to_string()], + ) + .expect("Can't register the gauge"); + Self { chain_head_cache_size, chain_head_cache_oldest_block_num, chain_head_cache_latest_block_num, chain_head_cache_hits, chain_head_cache_misses, + chain_head_ptr_cache_hits, + chain_head_ptr_cache_misses, + chain_head_ptr_cache_block_time_ms, } } @@ -1959,6 +1990,143 @@ impl ChainStoreMetrics { .unwrap() .inc_by(misses as f64); } + + pub fn record_chain_head_ptr_cache_hit(&self, network: &str) { + self.chain_head_ptr_cache_hits + .with_label_values(&[network]) + .inc(); + } + + pub fn record_chain_head_ptr_cache_miss(&self, network: &str) { + self.chain_head_ptr_cache_misses + .with_label_values(&[network]) + .inc(); + } + + pub fn set_chain_head_ptr_block_time(&self, network: &str, block_time_ms: u64) { + self.chain_head_ptr_cache_block_time_ms + .with_label_values(&[network]) + .set(block_time_ms as f64); + } +} + +const MIN_TTL_MS: u64 = 20; +const MAX_TTL_MS: u64 = 2000; +const MIN_OBSERVATIONS: u64 = 5; + +/// Adaptive cache for chain_head_ptr() that learns optimal TTL from block frequency. +struct ChainHeadPtrCache { + /// Cached value and when it expires + entry: RwLock>, + /// Estimated milliseconds between blocks (EWMA) + estimated_block_time_ms: AtomicU64, + /// When we last observed the chain head change + last_change: RwLock, + /// Number of block changes observed (for warmup) + observations: AtomicU64, + /// Metrics for recording cache hits/misses + metrics: Arc, + /// Chain name for metric labels + chain: String, +} + +impl ChainHeadPtrCache { + fn new(metrics: Arc, chain: String) -> Self { + Self { + entry: RwLock::new(None), + estimated_block_time_ms: AtomicU64::new(0), + last_change: RwLock::new(Instant::now()), + observations: AtomicU64::new(0), + metrics, + chain, + } + } + + /// Returns cached value if still valid, or None if cache is disabled/missed. + /// Records hit/miss metrics automatically. + fn get(&self) -> Option { + if ENV_VARS.store.disable_chain_head_ptr_cache { + return None; + } + let guard = self.entry.read(); + if let Some((value, expires)) = guard.as_ref() { + if Instant::now() < *expires { + self.metrics.record_chain_head_ptr_cache_hit(&self.chain); + return Some(value.clone()); + } + } + self.metrics.record_chain_head_ptr_cache_miss(&self.chain); + None + } + + /// Compute current TTL - MIN_TTL during warmup, then 1/4 of estimated block time + fn current_ttl(&self) -> Duration { + let obs = AtomicU64::load(&self.observations, Ordering::Relaxed); + if obs < MIN_OBSERVATIONS { + return Duration::from_millis(MIN_TTL_MS); + } + + let block_time = AtomicU64::load(&self.estimated_block_time_ms, Ordering::Relaxed); + let ttl_ms = (block_time / 4).clamp(MIN_TTL_MS, MAX_TTL_MS); + Duration::from_millis(ttl_ms) + } + + /// Cache a new value, updating block time estimate if value changed. + /// Does nothing if cache is disabled. + fn set(&self, new_value: BlockPtr) { + if ENV_VARS.store.disable_chain_head_ptr_cache { + return; + } + let now = Instant::now(); + + // Check if block changed + let old_value = { + let guard = self.entry.read(); + guard.as_ref().map(|(v, _)| v.clone()) + }; + + // Only update estimate if we have a previous value and block number advanced + // (skip reorgs where new block number <= old) + if let Some(old_ptr) = old_value.as_ref() { + if new_value.number > old_ptr.number { + let mut last_change = self.last_change.write(); + let delta_ms = now.duration_since(*last_change).as_millis() as u64; + *last_change = now; + + let blocks_advanced = (new_value.number - old_ptr.number) as u64; + + // Increment observation count + let obs = AtomicU64::fetch_add(&self.observations, 1, Ordering::Relaxed); + + // Ignore unreasonable deltas (> 60s) + if delta_ms > 0 && delta_ms < 60_000 { + let per_block_ms = delta_ms / blocks_advanced; + let new_estimate = if obs == 0 { + // First observation - use as initial estimate + per_block_ms + } else { + // EWMA: new = 0.8 * old + 0.2 * observed + let old_estimate = + AtomicU64::load(&self.estimated_block_time_ms, Ordering::Relaxed); + (old_estimate * 4 + per_block_ms) / 5 + }; + AtomicU64::store( + &self.estimated_block_time_ms, + new_estimate, + Ordering::Relaxed, + ); + + // Update metric gauge + self.metrics + .set_chain_head_ptr_block_time(&self.chain, new_estimate); + } + } + } + + // Compute TTL and store with expiry + let ttl = self.current_ttl(); + *self.entry.write() = Some((new_value, now + ttl)); + } } pub struct ChainStore { @@ -1980,6 +2148,10 @@ pub struct ChainStore { blocks_by_number_cache: HerdCache>, StoreError>>>, ancestor_cache: HerdCache, StoreError>>>, + /// Adaptive cache for chain_head_ptr() + chain_head_ptr_cache: ChainHeadPtrCache, + /// Herd cache to prevent thundering herd on chain_head_ptr() lookups + chain_head_ptr_herd: HerdCache, StoreError>>>, } impl ChainStore { @@ -1994,10 +2166,12 @@ impl ChainStore { metrics: Arc, ) -> Self { let recent_blocks_cache = - RecentBlocksCache::new(recent_blocks_cache_capacity, chain.clone(), metrics); + RecentBlocksCache::new(recent_blocks_cache_capacity, chain.clone(), metrics.clone()); let blocks_by_hash_cache = HerdCache::new(format!("chain_{}_blocks_by_hash", chain)); let blocks_by_number_cache = HerdCache::new(format!("chain_{}_blocks_by_number", chain)); let ancestor_cache = HerdCache::new(format!("chain_{}_ancestor", chain)); + let chain_head_ptr_cache = ChainHeadPtrCache::new(metrics, chain.clone()); + let chain_head_ptr_herd = HerdCache::new(format!("chain_{}_head_ptr", chain)); ChainStore { logger, pool, @@ -2009,6 +2183,8 @@ impl ChainStore { blocks_by_hash_cache, blocks_by_number_cache, ancestor_cache, + chain_head_ptr_cache, + chain_head_ptr_herd, } } @@ -2351,31 +2527,55 @@ impl ChainHeadStore for ChainStore { async fn chain_head_ptr(self: Arc) -> Result, Error> { use public::ethereum_networks::dsl::*; - let mut conn = self.pool.get_permitted().await?; - Ok(ethereum_networks - .select((head_block_hash, head_block_number)) - .filter(name.eq(&self.chain)) - .load::<(Option, Option)>(&mut conn) - .await - .map(|rows| { - rows.as_slice() - .first() - .map(|(hash_opt, number_opt)| match (hash_opt, number_opt) { - (Some(hash), Some(number)) => Some( - ( - // FIXME: - // - // workaround for arweave - H256::from_slice(&hex::decode(hash).unwrap()[..32]), - *number, - ) - .into(), - ), - (None, None) => None, - _ => unreachable!(), - }) - .and_then(|opt: Option| opt) - })?) + // Check TTL cache first (handles disabled check and metrics internally) + if let Some(cached) = self.chain_head_ptr_cache.get() { + return Ok(Some(cached)); + } + + // Use HerdCache to ensure only one caller does the DB lookup + // when cache is expired. Other callers await the in-flight query. + let pool = self.pool.clone(); + let chain = self.chain.clone(); + let lookup = async move { + let mut conn = pool.get_permitted().await?; + ethereum_networks + .select((head_block_hash, head_block_number)) + .filter(name.eq(&chain)) + .load::<(Option, Option)>(&mut conn) + .await + .map(|rows| { + rows.as_slice() + .first() + .map(|(hash_opt, number_opt)| match (hash_opt, number_opt) { + (Some(hash), Some(number)) => Some( + ( + // FIXME: + // + // workaround for arweave + H256::from_slice(&hex::decode(hash).unwrap()[..32]), + *number, + ) + .into(), + ), + (None, None) => None, + _ => unreachable!(), + }) + .and_then(|opt: Option| opt) + }) + .map_err(StoreError::from) + }; + + let (result, _cached) = self + .cached_lookup(&self.chain_head_ptr_herd, &self.chain, lookup) + .await; + + // Update TTL cache with the result + // (set() handles disabled check internally) + if let Ok(Some(ref ptr)) = result { + self.chain_head_ptr_cache.set(ptr.clone()); + } + + result.map_err(Error::from) } async fn chain_head_cursor(&self) -> Result, Error> { diff --git a/store/postgres/src/pool/manager.rs b/store/postgres/src/pool/manager.rs index 4677ea6276b..dbbcb288b83 100644 --- a/store/postgres/src/pool/manager.rs +++ b/store/postgres/src/pool/manager.rs @@ -10,20 +10,20 @@ use diesel_async::pooled_connection::{PoolError as DieselPoolError, PoolableConn use diesel_async::{AsyncConnection, RunQueryDsl}; use graph::env::ENV_VARS; use graph::prelude::error; +use graph::prelude::AtomicMovingStats; use graph::prelude::Counter; use graph::prelude::Gauge; use graph::prelude::MetricsRegistry; -use graph::prelude::MovingStats; use graph::prelude::PoolWaitStats; use graph::slog::info; use graph::slog::Logger; use std::collections::HashMap; use std::sync::atomic::AtomicBool; +use std::sync::atomic::AtomicU64; use std::sync::atomic::Ordering; use std::sync::Arc; -use std::sync::RwLock; -use std::time::{Duration, Instant}; +use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use crate::pool::AsyncPool; @@ -36,6 +36,8 @@ pub struct ConnectionManager { connection_url: String, state_tracker: StateTracker, error_counter: Counter, + /// Connections idle for less than this threshold skip the SELECT 67 health check + validation_idle_threshold: Duration, } impl ConnectionManager { @@ -45,6 +47,7 @@ impl ConnectionManager { state_tracker: StateTracker, registry: &MetricsRegistry, const_labels: HashMap, + validation_idle_threshold: Duration, ) -> Self { let error_counter = registry .global_counter( @@ -59,6 +62,7 @@ impl ConnectionManager { connection_url, state_tracker, error_counter, + validation_idle_threshold, } } @@ -106,11 +110,20 @@ impl deadpool::managed::Manager for ConnectionManager { async fn recycle( &self, obj: &mut Self::Type, - _metrics: &deadpool::managed::Metrics, + metrics: &deadpool::managed::Metrics, ) -> RecycleResult { if std::thread::panicking() || obj.is_broken() { return Err(RecycleError::Message("Broken connection".into())); } + + // Skip health check if connection was used recently + if self.validation_idle_threshold > Duration::ZERO + && metrics.last_used() < self.validation_idle_threshold + { + return Ok(()); + } + + // Run SELECT 67 only for idle connections let res = diesel::select(67_i32.into_sql::()) .execute(obj) .await @@ -129,6 +142,9 @@ pub(super) struct StateTracker { logger: Logger, available: Arc, ignore_timeout: Arc, + /// Timestamp (as millis since UNIX_EPOCH) when we can next probe. + /// 0 means available/no limit. + next_probe_at: Arc, } impl StateTracker { @@ -137,14 +153,16 @@ impl StateTracker { logger, available: Arc::new(AtomicBool::new(true)), ignore_timeout: Arc::new(AtomicBool::new(false)), + next_probe_at: Arc::new(AtomicU64::new(0)), } } pub(super) fn mark_available(&self) { if !self.is_available() { - info!(self.logger, "Conection checkout"; "event" => "available"); + info!(self.logger, "Connection checkout"; "event" => "available"); } self.available.store(true, Ordering::Relaxed); + self.next_probe_at.store(0, Ordering::Relaxed); } pub(super) fn mark_unavailable(&self, waited: Duration) { @@ -159,10 +177,47 @@ impl StateTracker { } } self.available.store(false, Ordering::Relaxed); + + // Set next probe time + let retry_interval = ENV_VARS.store.connection_unavailable_retry.as_millis() as u64; + let next_probe = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis() as u64 + + retry_interval; + self.next_probe_at.store(next_probe, Ordering::Relaxed); } pub(super) fn is_available(&self) -> bool { - AtomicBool::load(&self.available, Ordering::Relaxed) + if AtomicBool::load(&self.available, Ordering::Relaxed) { + return true; + } + + // Allow one probe through every `connection_unavailable_retry` interval + let next_probe = AtomicU64::load(&self.next_probe_at, Ordering::Relaxed); + let now_millis = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis() as u64; + + if now_millis >= next_probe { + // Try to claim this probe slot with CAS + let retry_interval = ENV_VARS.store.connection_unavailable_retry.as_millis() as u64; + if self + .next_probe_at + .compare_exchange( + next_probe, + now_millis + retry_interval, + Ordering::Relaxed, + Ordering::Relaxed, + ) + .is_ok() + { + // We claimed the probe - allow this request through + return true; + } + } + false } pub(super) fn timeout_is_ignored(&self) -> bool { @@ -297,7 +352,7 @@ impl WaitMeter { const_labels, ) .expect("failed to create `store_connection_wait_time_ms` counter"); - let wait_stats = Arc::new(RwLock::new(MovingStats::default())); + let wait_stats = Arc::new(AtomicMovingStats::default()); Self { wait_gauge, @@ -306,9 +361,6 @@ impl WaitMeter { } pub(crate) fn add_conn_wait_time(&self, duration: Duration) { - self.wait_stats - .write() - .unwrap() - .add_and_register(duration, &self.wait_gauge); + self.wait_stats.add_and_register(duration, &self.wait_gauge); } } diff --git a/store/postgres/src/pool/mod.rs b/store/postgres/src/pool/mod.rs index 20d332616a2..afb0aef4ebf 100644 --- a/store/postgres/src/pool/mod.rs +++ b/store/postgres/src/pool/mod.rs @@ -11,7 +11,7 @@ use graph::derive::CheapClone; use graph::internal_error; use graph::prelude::tokio::time::Instant; use graph::prelude::{ - anyhow::anyhow, crit, debug, error, info, o, Gauge, Logger, MovingStats, PoolWaitStats, + anyhow::anyhow, crit, debug, error, info, o, AtomicMovingStats, Gauge, Logger, PoolWaitStats, StoreError, ENV_VARS, }; use graph::prelude::{tokio, MetricsRegistry}; @@ -19,11 +19,11 @@ use graph::slog::warn; use graph::util::timed_rw_lock::TimedMutex; use tokio::sync::OwnedSemaphorePermit; +use std::collections::HashMap; use std::fmt::{self}; use std::ops::{Deref, DerefMut}; use std::sync::Arc; use std::time::Duration; -use std::{collections::HashMap, sync::RwLock}; use crate::catalog; use crate::pool::manager::{ConnectionManager, WaitMeter}; @@ -433,7 +433,7 @@ pub struct PoolInner { // that waiting queries consume few resources. Still this is placed here because the semaphore // is sized acording to the DB connection pool size. query_semaphore: Arc, - semaphore_wait_stats: Arc>, + semaphore_wait_stats: Arc, semaphore_wait_gauge: Box, // Limits concurrent indexing operations to prevent pool exhaustion @@ -442,7 +442,7 @@ pub struct PoolInner { // during diesel-async migration. It also avoids timeouts because of // pool exhaustion when getting a connection. indexing_semaphore: Arc, - indexing_semaphore_wait_stats: Arc>, + indexing_semaphore_wait_stats: Arc, indexing_semaphore_wait_gauge: Box, } @@ -476,6 +476,7 @@ impl PoolInner { state_tracker.clone(), ®istry, const_labels.clone(), + ENV_VARS.store.connection_validation_idle_secs, ); let timeouts = Timeouts { @@ -558,11 +559,11 @@ impl PoolInner { fdw_pool, wait_meter, state_tracker, - semaphore_wait_stats: Arc::new(RwLock::new(MovingStats::default())), + semaphore_wait_stats: Arc::new(AtomicMovingStats::default()), query_semaphore, semaphore_wait_gauge, indexing_semaphore, - indexing_semaphore_wait_stats: Arc::new(RwLock::new(MovingStats::default())), + indexing_semaphore_wait_stats: Arc::new(AtomicMovingStats::default()), indexing_semaphore_wait_gauge, } } @@ -719,8 +720,6 @@ impl PoolInner { let start = Instant::now(); let permit = self.query_semaphore.cheap_clone().acquire_owned().await; self.semaphore_wait_stats - .write() - .unwrap() .add_and_register(start.elapsed(), &self.semaphore_wait_gauge); permit.unwrap() } @@ -733,8 +732,6 @@ impl PoolInner { let permit = self.indexing_semaphore.cheap_clone().acquire_owned().await; let elapsed = start.elapsed(); self.indexing_semaphore_wait_stats - .write() - .unwrap() .add_and_register(elapsed, &self.indexing_semaphore_wait_gauge); (permit.unwrap(), elapsed) } diff --git a/store/postgres/src/store_events.rs b/store/postgres/src/store_events.rs index 6189120f602..572c3a339a3 100644 --- a/store/postgres/src/store_events.rs +++ b/store/postgres/src/store_events.rs @@ -2,7 +2,9 @@ use graph::futures01::Stream; use graph::futures03::compat::Stream01CompatExt; use graph::futures03::stream::StreamExt; use graph::futures03::TryStreamExt; -use std::sync::{atomic::Ordering, Arc, RwLock}; +use std::sync::{atomic::Ordering, Arc}; + +use graph::parking_lot::RwLock; use std::{collections::HashMap, sync::atomic::AtomicUsize}; use tokio::sync::mpsc::{channel, Sender}; use tokio_stream::wrappers::ReceiverStream; @@ -127,7 +129,7 @@ impl SubscriptionManager { // Send to `subscriptions`. { - let senders = subscriptions.read().unwrap().clone(); + let senders = subscriptions.read().clone(); // Write change to all matching subscription streams; remove subscriptions // whose receiving end has been dropped @@ -138,7 +140,7 @@ impl SubscriptionManager { "Failed to send store event to subscriber {}: {}", id, e ); // Receiver was dropped - subscriptions.write().unwrap().remove(&id); + subscriptions.write().remove(&id); } } } @@ -187,7 +189,7 @@ impl SubscriptionManager { // Cleanup `subscriptions`. { - let mut subscriptions = subscriptions.write().unwrap(); + let mut subscriptions = subscriptions.write(); // Obtain IDs of subscriptions whose receiving end has gone let stale_ids = subscriptions @@ -218,7 +220,7 @@ impl SubscriptionManagerTrait for SubscriptionManager { let (sender, receiver) = channel(100); // Add the new subscription - self.subscriptions.write().unwrap().insert(id, sender); + self.subscriptions.write().insert(id, sender); // Return the subscription ID and entity change stream ReceiverStream::new(receiver) diff --git a/store/postgres/src/writable.rs b/store/postgres/src/writable.rs index ff5ffb2d45b..928cbdbe76f 100644 --- a/store/postgres/src/writable.rs +++ b/store/postgres/src/writable.rs @@ -1,7 +1,9 @@ use std::collections::BTreeSet; use std::ops::{Deref, Range}; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::{Mutex, RwLock, TryLockError as RwLockError}; +use std::sync::Mutex; + +use graph::parking_lot::RwLock; use std::time::Instant; use std::{collections::BTreeMap, sync::Arc}; @@ -574,7 +576,7 @@ impl BlockTracker { // processed. let res = queue.find_map(|req| match req.as_ref() { Request::Write { batch, .. } => { - let batch = batch.read().unwrap(); + let batch = batch.read(); tracker.write(&batch.block_ptr); if batch.first_block <= tracker.revert { let res = f(batch.deref(), tracker.revert); @@ -613,7 +615,7 @@ impl BlockTracker { let accum = queue.fold(init, |accum, req| { match req.as_ref() { Request::Write { batch, .. } => { - let batch = batch.read().unwrap(); + let batch = batch.read(); let mut accum = accum; tracker.write(&batch.block_ptr); if batch.first_block <= tracker.revert { @@ -740,7 +742,7 @@ impl std::fmt::Debug for Request { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { Self::Write { batch, store, .. } => { - let batch = batch.read().unwrap(); + let batch = batch.read(); write!( f, "write[{}, {:p}, {} entities]", @@ -811,7 +813,7 @@ impl Request { } => { let start = Instant::now(); - let batch = batch.write().unwrap().close(); + let batch = batch.write().close(); if let Some(err) = &batch.error { // This can happen when appending to the batch failed @@ -850,7 +852,7 @@ impl Request { fn should_process(&self) -> bool { match self { Request::Write { queued, batch, .. } => { - batch.read().unwrap().weight() >= ENV_VARS.store.write_batch_size + batch.read().weight() >= ENV_VARS.store.write_batch_size || queued.elapsed() >= ENV_VARS.store.write_batch_duration } Request::RevertTo { .. } | Request::Stop => true, @@ -1169,7 +1171,7 @@ impl Queue { // duration of the write, and we do not want to // slow down queueing requests unnecessarily match existing.try_write() { - Ok(mut existing) => { + Some(mut existing) => { if existing.weight() < ENV_VARS.store.write_batch_size { let res = existing.append(batch).map(|()| None); if existing.weight() >= ENV_VARS.store.write_batch_size { @@ -1180,16 +1182,13 @@ impl Queue { Ok(Some(batch)) } } - Err(RwLockError::WouldBlock) => { + None => { // This branch can cause batches that // are not 'full' at the head of the // queue, something that start_writer // has to take into account Ok(Some(batch)) } - Err(RwLockError::Poisoned(e)) => { - panic!("rwlock on batch was poisoned {:?}", e); - } } } else { Ok(Some(batch))