diff --git a/StackExchange.Redis.sln.DotSettings b/StackExchange.Redis.sln.DotSettings
index 216edbcca..8dd9095d9 100644
--- a/StackExchange.Redis.sln.DotSettings
+++ b/StackExchange.Redis.sln.DotSettings
@@ -12,9 +12,12 @@
True
True
True
+ True
True
True
+ True
True
+ True
True
True
True
diff --git a/src/StackExchange.Redis/ChannelMessage.cs b/src/StackExchange.Redis/ChannelMessage.cs
new file mode 100644
index 000000000..330aedee4
--- /dev/null
+++ b/src/StackExchange.Redis/ChannelMessage.cs
@@ -0,0 +1,64 @@
+namespace StackExchange.Redis;
+
+///
+/// Represents a message that is broadcast via publish/subscribe.
+///
+public readonly struct ChannelMessage
+{
+ // this is *smaller* than storing a RedisChannel for the subscribed channel
+ private readonly ChannelMessageQueue _queue;
+
+ ///
+ /// The Channel:Message string representation.
+ ///
+ public override string ToString() => ((string?)Channel) + ":" + ((string?)Message);
+
+ ///
+ public override int GetHashCode() => Channel.GetHashCode() ^ Message.GetHashCode();
+
+ ///
+ public override bool Equals(object? obj) => obj is ChannelMessage cm
+ && cm.Channel == Channel && cm.Message == Message;
+
+ internal ChannelMessage(ChannelMessageQueue queue, in RedisChannel channel, in RedisValue value)
+ {
+ _queue = queue;
+ _channel = channel;
+ _message = value;
+ }
+
+ ///
+ /// The channel that the subscription was created from.
+ ///
+ public RedisChannel SubscriptionChannel => _queue.Channel;
+
+ private readonly RedisChannel _channel;
+
+ ///
+ /// The channel that the message was broadcast to.
+ ///
+ public RedisChannel Channel => _channel;
+
+ private readonly RedisValue _message;
+
+ ///
+ /// The value that was broadcast.
+ ///
+ public RedisValue Message => _message;
+
+ ///
+ /// Checks if 2 messages are .Equal().
+ ///
+ public static bool operator ==(ChannelMessage left, ChannelMessage right) => left.Equals(right);
+
+ ///
+ /// Checks if 2 messages are not .Equal().
+ ///
+ public static bool operator !=(ChannelMessage left, ChannelMessage right) => !left.Equals(right);
+
+ ///
+ /// If the channel is either a keyspace or keyevent notification, resolve the key and event type.
+ ///
+ public bool TryParseKeyNotification(out KeyNotification notification)
+ => KeyNotification.TryParse(in _channel, in _message, out notification);
+}
diff --git a/src/StackExchange.Redis/ChannelMessageQueue.cs b/src/StackExchange.Redis/ChannelMessageQueue.cs
index e58fb393b..9f962e52a 100644
--- a/src/StackExchange.Redis/ChannelMessageQueue.cs
+++ b/src/StackExchange.Redis/ChannelMessageQueue.cs
@@ -1,385 +1,353 @@
using System;
+using System.Buffers.Text;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
#if NETCOREAPP3_1
+using System.Diagnostics;
using System.Reflection;
#endif
-namespace StackExchange.Redis
+namespace StackExchange.Redis;
+
+///
+/// Represents a message queue of ordered pub/sub notifications.
+///
+///
+/// To create a ChannelMessageQueue, use
+/// or .
+///
+public sealed class ChannelMessageQueue : IAsyncEnumerable
{
+ private readonly Channel _queue;
+
///
- /// Represents a message that is broadcast via publish/subscribe.
+ /// The Channel that was subscribed for this queue.
///
- public readonly struct ChannelMessage
- {
- // this is *smaller* than storing a RedisChannel for the subscribed channel
- private readonly ChannelMessageQueue _queue;
+ public RedisChannel Channel { get; }
- ///
- /// The Channel:Message string representation.
- ///
- public override string ToString() => ((string?)Channel) + ":" + ((string?)Message);
+ private RedisSubscriber? _parent;
- ///
- public override int GetHashCode() => Channel.GetHashCode() ^ Message.GetHashCode();
-
- ///
- public override bool Equals(object? obj) => obj is ChannelMessage cm
- && cm.Channel == Channel && cm.Message == Message;
+ ///
+ /// The string representation of this channel.
+ ///
+ public override string? ToString() => (string?)Channel;
- internal ChannelMessage(ChannelMessageQueue queue, in RedisChannel channel, in RedisValue value)
- {
- _queue = queue;
- Channel = channel;
- Message = value;
- }
+ ///
+ /// An awaitable task the indicates completion of the queue (including drain of data).
+ ///
+ public Task Completion => _queue.Reader.Completion;
- ///
- /// The channel that the subscription was created from.
- ///
- public RedisChannel SubscriptionChannel => _queue.Channel;
-
- ///
- /// The channel that the message was broadcast to.
- ///
- public RedisChannel Channel { get; }
-
- ///
- /// The value that was broadcast.
- ///
- public RedisValue Message { get; }
-
- ///
- /// Checks if 2 messages are .Equal().
- ///
- public static bool operator ==(ChannelMessage left, ChannelMessage right) => left.Equals(right);
-
- ///
- /// Checks if 2 messages are not .Equal().
- ///
- public static bool operator !=(ChannelMessage left, ChannelMessage right) => !left.Equals(right);
+ internal ChannelMessageQueue(in RedisChannel redisChannel, RedisSubscriber parent)
+ {
+ Channel = redisChannel;
+ _parent = parent;
+ _queue = System.Threading.Channels.Channel.CreateUnbounded(s_ChannelOptions);
}
- ///
- /// Represents a message queue of ordered pub/sub notifications.
- ///
- ///
- /// To create a ChannelMessageQueue, use
- /// or .
- ///
- public sealed class ChannelMessageQueue : IAsyncEnumerable
+ private static readonly UnboundedChannelOptions s_ChannelOptions = new UnboundedChannelOptions
{
- private readonly Channel _queue;
+ SingleWriter = true, SingleReader = false, AllowSynchronousContinuations = false,
+ };
- ///
- /// The Channel that was subscribed for this queue.
- ///
- public RedisChannel Channel { get; }
- private RedisSubscriber? _parent;
+ private void Write(in RedisChannel channel, in RedisValue value)
+ {
+ var writer = _queue.Writer;
+ writer.TryWrite(new ChannelMessage(this, channel, value));
+ }
- ///
- /// The string representation of this channel.
- ///
- public override string? ToString() => (string?)Channel;
+ ///
+ /// Consume a message from the channel.
+ ///
+ /// The to use.
+ public ValueTask ReadAsync(CancellationToken cancellationToken = default)
+ => _queue.Reader.ReadAsync(cancellationToken);
- ///
- /// An awaitable task the indicates completion of the queue (including drain of data).
- ///
- public Task Completion => _queue.Reader.Completion;
+ ///
+ /// Attempt to synchronously consume a message from the channel.
+ ///
+ /// The read from the Channel.
+ public bool TryRead(out ChannelMessage item) => _queue.Reader.TryRead(out item);
- internal ChannelMessageQueue(in RedisChannel redisChannel, RedisSubscriber parent)
+ ///
+ /// Attempt to query the backlog length of the queue.
+ ///
+ /// The (approximate) count of items in the Channel.
+ public bool TryGetCount(out int count)
+ {
+ // This is specific to netcoreapp3.1, because full framework was out of band and the new prop is present
+#if NETCOREAPP3_1
+ // get this using the reflection
+ try
{
- Channel = redisChannel;
- _parent = parent;
- _queue = System.Threading.Channels.Channel.CreateUnbounded(s_ChannelOptions);
+ var prop =
+ _queue.GetType().GetProperty("ItemsCountForDebugger", BindingFlags.Instance | BindingFlags.NonPublic);
+ if (prop is not null)
+ {
+ count = (int)prop.GetValue(_queue)!;
+ return true;
+ }
}
-
- private static readonly UnboundedChannelOptions s_ChannelOptions = new UnboundedChannelOptions
+ catch (Exception ex)
{
- SingleWriter = true,
- SingleReader = false,
- AllowSynchronousContinuations = false,
- };
-
- private void Write(in RedisChannel channel, in RedisValue value)
+ Debug.WriteLine(ex.Message); // but ignore
+ }
+#else
+ var reader = _queue.Reader;
+ if (reader.CanCount)
{
- var writer = _queue.Writer;
- writer.TryWrite(new ChannelMessage(this, channel, value));
+ count = reader.Count;
+ return true;
}
+#endif
- ///
- /// Consume a message from the channel.
- ///
- /// The to use.
- public ValueTask ReadAsync(CancellationToken cancellationToken = default)
- => _queue.Reader.ReadAsync(cancellationToken);
-
- ///
- /// Attempt to synchronously consume a message from the channel.
- ///
- /// The read from the Channel.
- public bool TryRead(out ChannelMessage item) => _queue.Reader.TryRead(out item);
-
- ///
- /// Attempt to query the backlog length of the queue.
- ///
- /// The (approximate) count of items in the Channel.
- public bool TryGetCount(out int count)
+ count = 0;
+ return false;
+ }
+
+ private Delegate? _onMessageHandler;
+
+ private void AssertOnMessage(Delegate handler)
+ {
+ if (handler == null) throw new ArgumentNullException(nameof(handler));
+ if (Interlocked.CompareExchange(ref _onMessageHandler, handler, null) != null)
+ throw new InvalidOperationException("Only a single " + nameof(OnMessage) + " is allowed");
+ }
+
+ ///
+ /// Create a message loop that processes messages sequentially.
+ ///
+ /// The handler to run when receiving a message.
+ public void OnMessage(Action handler)
+ {
+ AssertOnMessage(handler);
+
+ ThreadPool.QueueUserWorkItem(
+ state => ((ChannelMessageQueue)state!).OnMessageSyncImpl().RedisFireAndForget(), this);
+ }
+
+ private async Task OnMessageSyncImpl()
+ {
+ var handler = (Action?)_onMessageHandler;
+ while (!Completion.IsCompleted)
{
- // This is specific to netcoreapp3.1, because full framework was out of band and the new prop is present
-#if NETCOREAPP3_1
- // get this using the reflection
+ ChannelMessage next;
try
{
- var prop = _queue.GetType().GetProperty("ItemsCountForDebugger", BindingFlags.Instance | BindingFlags.NonPublic);
- if (prop is not null)
- {
- count = (int)prop.GetValue(_queue)!;
- return true;
- }
+ if (!TryRead(out next)) next = await ReadAsync().ForAwait();
}
- catch { }
-#else
- var reader = _queue.Reader;
- if (reader.CanCount)
+ catch (ChannelClosedException) { break; } // expected
+ catch (Exception ex)
{
- count = reader.Count;
- return true;
+ _parent?.multiplexer?.OnInternalError(ex);
+ break;
}
-#endif
- count = default;
- return false;
+ try { handler?.Invoke(next); }
+ catch { } // matches MessageCompletable
}
+ }
- private Delegate? _onMessageHandler;
- private void AssertOnMessage(Delegate handler)
+ internal static void Combine(ref ChannelMessageQueue? head, ChannelMessageQueue queue)
+ {
+ if (queue != null)
{
- if (handler == null) throw new ArgumentNullException(nameof(handler));
- if (Interlocked.CompareExchange(ref _onMessageHandler, handler, null) != null)
- throw new InvalidOperationException("Only a single " + nameof(OnMessage) + " is allowed");
+ // insert at the start of the linked-list
+ ChannelMessageQueue? old;
+ do
+ {
+ old = Volatile.Read(ref head);
+ queue._next = old;
+ }
+ // format and validator disagree on newline...
+ while (Interlocked.CompareExchange(ref head, queue, old) != old);
}
+ }
- ///
- /// Create a message loop that processes messages sequentially.
- ///
- /// The handler to run when receiving a message.
- public void OnMessage(Action handler)
- {
- AssertOnMessage(handler);
+ ///
+ /// Create a message loop that processes messages sequentially.
+ ///
+ /// The handler to execute when receiving a message.
+ public void OnMessage(Func handler)
+ {
+ AssertOnMessage(handler);
- ThreadPool.QueueUserWorkItem(
- state => ((ChannelMessageQueue)state!).OnMessageSyncImpl().RedisFireAndForget(), this);
- }
+ ThreadPool.QueueUserWorkItem(
+ state => ((ChannelMessageQueue)state!).OnMessageAsyncImpl().RedisFireAndForget(), this);
+ }
- private async Task OnMessageSyncImpl()
+ internal static void Remove(ref ChannelMessageQueue? head, ChannelMessageQueue queue)
+ {
+ if (queue is null)
{
- var handler = (Action?)_onMessageHandler;
- while (!Completion.IsCompleted)
- {
- ChannelMessage next;
- try { if (!TryRead(out next)) next = await ReadAsync().ForAwait(); }
- catch (ChannelClosedException) { break; } // expected
- catch (Exception ex)
- {
- _parent?.multiplexer?.OnInternalError(ex);
- break;
- }
-
- try { handler?.Invoke(next); }
- catch { } // matches MessageCompletable
- }
+ return;
}
- internal static void Combine(ref ChannelMessageQueue? head, ChannelMessageQueue queue)
+ bool found;
+ // if we fail due to a conflict, re-do from start
+ do
{
- if (queue != null)
+ var current = Volatile.Read(ref head);
+ if (current == null) return; // no queue? nothing to do
+ if (current == queue)
{
- // insert at the start of the linked-list
- ChannelMessageQueue? old;
- do
+ found = true;
+ // found at the head - then we need to change the head
+ if (Interlocked.CompareExchange(ref head, Volatile.Read(ref current._next), current) == current)
{
- old = Volatile.Read(ref head);
- queue._next = old;
+ return; // success
}
- while (Interlocked.CompareExchange(ref head, queue, old) != old);
}
- }
-
- ///
- /// Create a message loop that processes messages sequentially.
- ///
- /// The handler to execute when receiving a message.
- public void OnMessage(Func handler)
- {
- AssertOnMessage(handler);
-
- ThreadPool.QueueUserWorkItem(
- state => ((ChannelMessageQueue)state!).OnMessageAsyncImpl().RedisFireAndForget(), this);
- }
-
- internal static void Remove(ref ChannelMessageQueue? head, ChannelMessageQueue queue)
- {
- if (queue is null)
+ else
{
- return;
- }
-
- bool found;
- // if we fail due to a conflict, re-do from start
- do
- {
- var current = Volatile.Read(ref head);
- if (current == null) return; // no queue? nothing to do
- if (current == queue)
- {
- found = true;
- // found at the head - then we need to change the head
- if (Interlocked.CompareExchange(ref head, Volatile.Read(ref current._next), current) == current)
- {
- return; // success
- }
- }
- else
+ ChannelMessageQueue? previous = current;
+ current = Volatile.Read(ref previous._next);
+ found = false;
+ do
{
- ChannelMessageQueue? previous = current;
- current = Volatile.Read(ref previous._next);
- found = false;
- do
+ if (current == queue)
{
- if (current == queue)
+ found = true;
+ // found it, not at the head; remove the node
+ if (Interlocked.CompareExchange(
+ ref previous._next,
+ Volatile.Read(ref current._next),
+ current) == current)
{
- found = true;
- // found it, not at the head; remove the node
- if (Interlocked.CompareExchange(ref previous._next, Volatile.Read(ref current._next), current) == current)
- {
- return; // success
- }
- else
- {
- break; // exit the inner loop, and repeat the outer loop
- }
+ return; // success
+ }
+ else
+ {
+ break; // exit the inner loop, and repeat the outer loop
}
- previous = current;
- current = Volatile.Read(ref previous!._next);
}
- while (current != null);
+
+ previous = current;
+ current = Volatile.Read(ref previous!._next);
}
+ // format and validator disagree on newline...
+ while (current != null);
}
- while (found);
}
+ // format and validator disagree on newline...
+ while (found);
+ }
- internal static int Count(ref ChannelMessageQueue? head)
+ internal static int Count(ref ChannelMessageQueue? head)
+ {
+ var current = Volatile.Read(ref head);
+ int count = 0;
+ while (current != null)
{
- var current = Volatile.Read(ref head);
- int count = 0;
- while (current != null)
- {
- count++;
- current = Volatile.Read(ref current._next);
- }
- return count;
+ count++;
+ current = Volatile.Read(ref current._next);
}
- internal static void WriteAll(ref ChannelMessageQueue head, in RedisChannel channel, in RedisValue message)
+ return count;
+ }
+
+ internal static void WriteAll(ref ChannelMessageQueue head, in RedisChannel channel, in RedisValue message)
+ {
+ var current = Volatile.Read(ref head);
+ while (current != null)
{
- var current = Volatile.Read(ref head);
- while (current != null)
- {
- current.Write(channel, message);
- current = Volatile.Read(ref current._next);
- }
+ current.Write(channel, message);
+ current = Volatile.Read(ref current._next);
}
+ }
- private ChannelMessageQueue? _next;
+ private ChannelMessageQueue? _next;
- private async Task OnMessageAsyncImpl()
+ private async Task OnMessageAsyncImpl()
+ {
+ var handler = (Func?)_onMessageHandler;
+ while (!Completion.IsCompleted)
{
- var handler = (Func?)_onMessageHandler;
- while (!Completion.IsCompleted)
+ ChannelMessage next;
+ try
{
- ChannelMessage next;
- try { if (!TryRead(out next)) next = await ReadAsync().ForAwait(); }
- catch (ChannelClosedException) { break; } // expected
- catch (Exception ex)
- {
- _parent?.multiplexer?.OnInternalError(ex);
- break;
- }
-
- try
- {
- var task = handler?.Invoke(next);
- if (task != null && task.Status != TaskStatus.RanToCompletion) await task.ForAwait();
- }
- catch { } // matches MessageCompletable
+ if (!TryRead(out next)) next = await ReadAsync().ForAwait();
+ }
+ catch (ChannelClosedException) { break; } // expected
+ catch (Exception ex)
+ {
+ _parent?.multiplexer?.OnInternalError(ex);
+ break;
}
- }
- internal static void MarkAllCompleted(ref ChannelMessageQueue? head)
- {
- var current = Interlocked.Exchange(ref head, null);
- while (current != null)
+ try
{
- current.MarkCompleted();
- current = Volatile.Read(ref current._next);
+ var task = handler?.Invoke(next);
+ if (task != null && task.Status != TaskStatus.RanToCompletion) await task.ForAwait();
}
+ catch { } // matches MessageCompletable
}
+ }
- private void MarkCompleted(Exception? error = null)
+ internal static void MarkAllCompleted(ref ChannelMessageQueue? head)
+ {
+ var current = Interlocked.Exchange(ref head, null);
+ while (current != null)
{
- _parent = null;
- _queue.Writer.TryComplete(error);
+ current.MarkCompleted();
+ current = Volatile.Read(ref current._next);
}
+ }
- internal void UnsubscribeImpl(Exception? error = null, CommandFlags flags = CommandFlags.None)
- {
- var parent = _parent;
- _parent = null;
- parent?.UnsubscribeAsync(Channel, null, this, flags);
- _queue.Writer.TryComplete(error);
- }
+ private void MarkCompleted(Exception? error = null)
+ {
+ _parent = null;
+ _queue.Writer.TryComplete(error);
+ }
- internal async Task UnsubscribeAsyncImpl(Exception? error = null, CommandFlags flags = CommandFlags.None)
+ internal void UnsubscribeImpl(Exception? error = null, CommandFlags flags = CommandFlags.None)
+ {
+ var parent = _parent;
+ _parent = null;
+ parent?.UnsubscribeAsync(Channel, null, this, flags);
+ _queue.Writer.TryComplete(error);
+ }
+
+ internal async Task UnsubscribeAsyncImpl(Exception? error = null, CommandFlags flags = CommandFlags.None)
+ {
+ var parent = _parent;
+ _parent = null;
+ if (parent != null)
{
- var parent = _parent;
- _parent = null;
- if (parent != null)
- {
- await parent.UnsubscribeAsync(Channel, null, this, flags).ForAwait();
- }
- _queue.Writer.TryComplete(error);
+ await parent.UnsubscribeAsync(Channel, null, this, flags).ForAwait();
}
- ///
- /// Stop receiving messages on this channel.
- ///
- /// The flags to use when unsubscribing.
- public void Unsubscribe(CommandFlags flags = CommandFlags.None) => UnsubscribeImpl(null, flags);
+ _queue.Writer.TryComplete(error);
+ }
- ///
- /// Stop receiving messages on this channel.
- ///
- /// The flags to use when unsubscribing.
- public Task UnsubscribeAsync(CommandFlags flags = CommandFlags.None) => UnsubscribeAsyncImpl(null, flags);
+ ///
+ /// Stop receiving messages on this channel.
+ ///
+ /// The flags to use when unsubscribing.
+ public void Unsubscribe(CommandFlags flags = CommandFlags.None) => UnsubscribeImpl(null, flags);
- ///
+ ///
+ /// Stop receiving messages on this channel.
+ ///
+ /// The flags to use when unsubscribing.
+ public Task UnsubscribeAsync(CommandFlags flags = CommandFlags.None) => UnsubscribeAsyncImpl(null, flags);
+
+ ///
#if NETCOREAPP3_0_OR_GREATER
- public IAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken = default)
- => _queue.Reader.ReadAllAsync().GetAsyncEnumerator(cancellationToken);
+ public IAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken = default)
+ // ReSharper disable once MethodSupportsCancellation - provided in GetAsyncEnumerator
+ => _queue.Reader.ReadAllAsync().GetAsyncEnumerator(cancellationToken);
#else
- public async IAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken = default)
+ public async IAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken = default)
+ {
+ while (await _queue.Reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false))
{
- while (await _queue.Reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false))
+ while (_queue.Reader.TryRead(out var item))
{
- while (_queue.Reader.TryRead(out var item))
- {
- yield return item;
- }
+ yield return item;
}
}
-#endif
}
+#endif
}
diff --git a/src/StackExchange.Redis/KeyNotification.cs b/src/StackExchange.Redis/KeyNotification.cs
new file mode 100644
index 000000000..f563019f3
--- /dev/null
+++ b/src/StackExchange.Redis/KeyNotification.cs
@@ -0,0 +1,244 @@
+using System;
+using System.Buffers.Text;
+using System.Diagnostics;
+using static StackExchange.Redis.KeyNotificationChannels;
+namespace StackExchange.Redis;
+
+///
+/// Represents keyspace and keyevent notifications.
+///
+public readonly struct KeyNotification
+{
+ ///
+ /// If the channel is either a keyspace or keyevent notification, parsed the data.
+ ///
+ public static bool TryParse(in RedisChannel channel, in RedisValue value, out KeyNotification notification)
+ {
+ // validate that it looks reasonable
+ var span = channel.Span;
+
+ // KeySpaceStart and KeyEventStart are the same size, see KeyEventPrefix_KeySpacePrefix_Length_Matches
+ if (span.Length >= KeySpacePrefix.Length + MinSuffixBytes)
+ {
+ // check that the prefix is valid, i.e. "__keyspace@" or "__keyevent@"
+ var prefix = span.Slice(0, KeySpacePrefix.Length);
+ var hash = prefix.Hash64();
+ switch (hash)
+ {
+ case KeySpacePrefix.Hash when KeySpacePrefix.Is(hash, prefix):
+ case KeyEventPrefix.Hash when KeyEventPrefix.Is(hash, prefix):
+ // check that there is *something* non-empty after the prefix, with __: as the suffix (we don't verify *what*)
+ if (span.Slice(KeySpacePrefix.Length).IndexOf("__:"u8) > 0)
+ {
+ notification = new KeyNotification(in channel, in value);
+ return true;
+ }
+
+ break;
+ }
+ }
+
+ notification = default;
+ return false;
+ }
+
+ private const int MinSuffixBytes = 5; // need "0__:x" or similar after prefix
+
+ ///
+ /// The channel associated with this notification.
+ ///
+ public RedisChannel Channel => _channel;
+
+ ///
+ /// The payload associated with this notification.
+ ///
+ public RedisValue Value => _value;
+
+ // effectively we just wrap a channel, but: we've pre-validated that things make sense
+ private readonly RedisChannel _channel;
+ private readonly RedisValue _value;
+
+ internal KeyNotification(in RedisChannel channel, in RedisValue value)
+ {
+ _channel = channel;
+ _value = value;
+ }
+
+ ///
+ /// The database the key is in. If the database cannot be parsed, -1 is returned.
+ ///
+ public int Database
+ {
+ get
+ {
+ // prevalidated format, so we can just skip past the prefix (except for the default value)
+ if (_channel.IsNull) return -1;
+ var span = _channel.Span.Slice(KeySpacePrefix.Length); // also works for KeyEventPrefix
+ var end = span.IndexOf((byte)'_'); // expecting "__:foo" - we'll just stop at the underscore
+ if (end <= 0) return -1;
+
+ span = span.Slice(0, end);
+ return Utf8Parser.TryParse(span, out int database, out var bytes)
+ && bytes == end ? database : -1;
+ }
+ }
+
+ ///
+ /// The key associated with this event.
+ ///
+ /// Note that this will allocate a copy of the key bytes; to avoid allocations,
+ /// the and APIs can be used.
+ public RedisKey GetKey()
+ {
+ if (IsKeySpace)
+ {
+ // then the channel contains the key, and the payload contains the event-type
+ return ChannelSuffix.ToArray(); // create an isolated copy
+ }
+
+ if (IsKeyEvent)
+ {
+ // then the channel contains the event-type, and the payload contains the key
+ return (byte[]?)Value; // todo: this could probably side-step
+ }
+
+ return RedisKey.Null;
+ }
+
+ ///
+ /// Get the number of bytes in the key.
+ ///
+ public int KeyByteCount
+ {
+ get
+ {
+ if (IsKeySpace)
+ {
+ return ChannelSuffix.Length;
+ }
+
+ if (IsKeyEvent)
+ {
+ return _value.GetByteCount();
+ }
+
+ return 0;
+ }
+ }
+
+ ///
+ /// Attempt to copy the bytes from the key to a buffer, returning the number of bytes written.
+ ///
+ public bool TryCopyKey(Span destination, out int bytesWritten)
+ {
+ if (IsKeySpace)
+ {
+ var suffix = ChannelSuffix;
+ bytesWritten = suffix.Length; // assume success
+ if (bytesWritten <= destination.Length)
+ {
+ suffix.CopyTo(destination);
+ return true;
+ }
+ }
+
+ if (IsKeyEvent)
+ {
+ bytesWritten = _value.GetByteCount();
+ if (bytesWritten <= destination.Length)
+ {
+ var tmp = _value.CopyTo(destination);
+ Debug.Assert(tmp == bytesWritten);
+ return true;
+ }
+ }
+
+ bytesWritten = 0;
+ return false;
+ }
+
+ ///
+ /// Get the portion of the channel after the "__{keyspace|keyevent}@{db}__:".
+ ///
+ private ReadOnlySpan ChannelSuffix
+ {
+ get
+ {
+ var span = _channel.Span;
+ var index = span.IndexOf("__:"u8);
+ return index > 0 ? span.Slice(index + 3) : default;
+ }
+ }
+
+ ///
+ /// The type of notification associated with this event, if it is well-known - otherwise .
+ ///
+ /// Unexpected values can be processed manually from the and .
+ public KeyNotificationType Type
+ {
+ get
+ {
+ if (IsKeySpace)
+ {
+ // then the channel contains the key, and the payload contains the event-type
+ var count = _value.GetByteCount();
+ if (count >= KeyNotificationTypeFastHash.MinBytes & count <= KeyNotificationTypeFastHash.MaxBytes)
+ {
+ if (_value.TryGetSpan(out var direct))
+ {
+ return KeyNotificationTypeFastHash.Parse(direct);
+ }
+ else
+ {
+ Span localCopy = stackalloc byte[KeyNotificationTypeFastHash.MaxBytes];
+ return KeyNotificationTypeFastHash.Parse(localCopy.Slice(0, _value.CopyTo(localCopy)));
+ }
+ }
+ }
+
+ if (IsKeyEvent)
+ {
+ // then the channel contains the event-type, and the payload contains the key
+ return KeyNotificationTypeFastHash.Parse(ChannelSuffix);
+ }
+ return KeyNotificationType.Unknown;
+ }
+ }
+
+ ///
+ /// Indicates whether this notification originated from a keyspace notification, for example __keyspace@4__:mykey with payload set.
+ ///
+ public bool IsKeySpace
+ {
+ get
+ {
+ var span = _channel.Span;
+ return span.Length >= KeySpacePrefix.Length + MinSuffixBytes && KeySpacePrefix.Is(span.Hash64(), span.Slice(0, KeySpacePrefix.Length));
+ }
+ }
+
+ ///
+ /// Indicates whether this notification originated from a keyevent notification, for example __keyevent@4__:set with payload mykey.
+ ///
+ public bool IsKeyEvent
+ {
+ get
+ {
+ var span = _channel.Span;
+ return span.Length >= KeyEventPrefix.Length + MinSuffixBytes && KeyEventPrefix.Is(span.Hash64(), span.Slice(0, KeyEventPrefix.Length));
+ }
+ }
+}
+
+internal static partial class KeyNotificationChannels
+{
+ [FastHash("__keyspace@")]
+ internal static partial class KeySpacePrefix
+ {
+ }
+
+ [FastHash("__keyevent@")]
+ internal static partial class KeyEventPrefix
+ {
+ }
+}
diff --git a/src/StackExchange.Redis/KeyNotificationType.cs b/src/StackExchange.Redis/KeyNotificationType.cs
new file mode 100644
index 000000000..cc4c74ef1
--- /dev/null
+++ b/src/StackExchange.Redis/KeyNotificationType.cs
@@ -0,0 +1,69 @@
+namespace StackExchange.Redis;
+
+///
+/// The type of keyspace or keyevent notification.
+///
+public enum KeyNotificationType
+{
+ // note: initially presented alphabetically, but: new values *must* be appended, not inserted
+ // (to preserve values of existing elements)
+#pragma warning disable CS1591 // docs, redundant
+ Unknown = 0,
+ Append = 1,
+ Copy = 2,
+ Del = 3,
+ Expire = 4,
+ HDel = 5,
+ HExpired = 6,
+ HIncrByFloat = 7,
+ HIncrBy = 8,
+ HPersist = 9,
+ HSet = 10,
+ IncrByFloat = 11,
+ IncrBy = 12,
+ LInsert = 13,
+ LPop = 14,
+ LPush = 15,
+ LRem = 16,
+ LSet = 17,
+ LTrim = 18,
+ MoveFrom = 19,
+ MoveTo = 20,
+ Persist = 21,
+ RenameFrom = 22,
+ RenameTo = 23,
+ Restore = 24,
+ RPop = 25,
+ RPush = 26,
+ SAdd = 27,
+ Set = 28,
+ SetRange = 29,
+ SortStore = 30,
+ SRem = 31,
+ SPop = 32,
+ XAdd = 33,
+ XDel = 34,
+ XGroupCreateConsumer = 35,
+ XGroupCreate = 36,
+ XGroupDelConsumer = 37,
+ XGroupDestroy = 38,
+ XGroupSetId = 39,
+ XSetId = 40,
+ XTrim = 41,
+ ZAdd = 42,
+ ZDiffStore = 43,
+ ZInterStore = 44,
+ ZUnionStore = 45,
+ ZIncr = 46,
+ ZRemByRank = 47,
+ ZRemByScore = 48,
+ ZRem = 49,
+
+ // side-effect notifications
+ Expired = 1000,
+ Evicted = 1001,
+ New = 1002,
+ Overwritten = 1003,
+ TypeChanged = 1004, // type_changed
+#pragma warning restore CS1591 // docs, redundant
+}
diff --git a/src/StackExchange.Redis/KeyNotificationTypeFastHash.cs b/src/StackExchange.Redis/KeyNotificationTypeFastHash.cs
new file mode 100644
index 000000000..bcf08bad2
--- /dev/null
+++ b/src/StackExchange.Redis/KeyNotificationTypeFastHash.cs
@@ -0,0 +1,413 @@
+using System;
+
+namespace StackExchange.Redis;
+
+///
+/// Internal helper type for fast parsing of key notification types, using [FastHash].
+///
+internal static partial class KeyNotificationTypeFastHash
+{
+ // these are checked by KeyNotificationTypeFastHash_MinMaxBytes_ReflectsActualLengths
+ public const int MinBytes = 3, MaxBytes = 21;
+
+ public static KeyNotificationType Parse(ReadOnlySpan value)
+ {
+ var hash = value.Hash64();
+ return hash switch
+ {
+ append.Hash when append.Is(hash, value) => KeyNotificationType.Append,
+ copy.Hash when copy.Is(hash, value) => KeyNotificationType.Copy,
+ del.Hash when del.Is(hash, value) => KeyNotificationType.Del,
+ expire.Hash when expire.Is(hash, value) => KeyNotificationType.Expire,
+ hdel.Hash when hdel.Is(hash, value) => KeyNotificationType.HDel,
+ hexpired.Hash when hexpired.Is(hash, value) => KeyNotificationType.HExpired,
+ hincrbyfloat.Hash when hincrbyfloat.Is(hash, value) => KeyNotificationType.HIncrByFloat,
+ hincrby.Hash when hincrby.Is(hash, value) => KeyNotificationType.HIncrBy,
+ hpersist.Hash when hpersist.Is(hash, value) => KeyNotificationType.HPersist,
+ hset.Hash when hset.Is(hash, value) => KeyNotificationType.HSet,
+ incrbyfloat.Hash when incrbyfloat.Is(hash, value) => KeyNotificationType.IncrByFloat,
+ incrby.Hash when incrby.Is(hash, value) => KeyNotificationType.IncrBy,
+ linsert.Hash when linsert.Is(hash, value) => KeyNotificationType.LInsert,
+ lpop.Hash when lpop.Is(hash, value) => KeyNotificationType.LPop,
+ lpush.Hash when lpush.Is(hash, value) => KeyNotificationType.LPush,
+ lrem.Hash when lrem.Is(hash, value) => KeyNotificationType.LRem,
+ lset.Hash when lset.Is(hash, value) => KeyNotificationType.LSet,
+ ltrim.Hash when ltrim.Is(hash, value) => KeyNotificationType.LTrim,
+ move_from.Hash when move_from.Is(hash, value) => KeyNotificationType.MoveFrom,
+ move_to.Hash when move_to.Is(hash, value) => KeyNotificationType.MoveTo,
+ persist.Hash when persist.Is(hash, value) => KeyNotificationType.Persist,
+ rename_from.Hash when rename_from.Is(hash, value) => KeyNotificationType.RenameFrom,
+ rename_to.Hash when rename_to.Is(hash, value) => KeyNotificationType.RenameTo,
+ restore.Hash when restore.Is(hash, value) => KeyNotificationType.Restore,
+ rpop.Hash when rpop.Is(hash, value) => KeyNotificationType.RPop,
+ rpush.Hash when rpush.Is(hash, value) => KeyNotificationType.RPush,
+ sadd.Hash when sadd.Is(hash, value) => KeyNotificationType.SAdd,
+ set.Hash when set.Is(hash, value) => KeyNotificationType.Set,
+ setrange.Hash when setrange.Is(hash, value) => KeyNotificationType.SetRange,
+ sortstore.Hash when sortstore.Is(hash, value) => KeyNotificationType.SortStore,
+ srem.Hash when srem.Is(hash, value) => KeyNotificationType.SRem,
+ spop.Hash when spop.Is(hash, value) => KeyNotificationType.SPop,
+ xadd.Hash when xadd.Is(hash, value) => KeyNotificationType.XAdd,
+ xdel.Hash when xdel.Is(hash, value) => KeyNotificationType.XDel,
+ xgroup_createconsumer.Hash when xgroup_createconsumer.Is(hash, value) => KeyNotificationType.XGroupCreateConsumer,
+ xgroup_create.Hash when xgroup_create.Is(hash, value) => KeyNotificationType.XGroupCreate,
+ xgroup_delconsumer.Hash when xgroup_delconsumer.Is(hash, value) => KeyNotificationType.XGroupDelConsumer,
+ xgroup_destroy.Hash when xgroup_destroy.Is(hash, value) => KeyNotificationType.XGroupDestroy,
+ xgroup_setid.Hash when xgroup_setid.Is(hash, value) => KeyNotificationType.XGroupSetId,
+ xsetid.Hash when xsetid.Is(hash, value) => KeyNotificationType.XSetId,
+ xtrim.Hash when xtrim.Is(hash, value) => KeyNotificationType.XTrim,
+ zadd.Hash when zadd.Is(hash, value) => KeyNotificationType.ZAdd,
+ zdiffstore.Hash when zdiffstore.Is(hash, value) => KeyNotificationType.ZDiffStore,
+ zinterstore.Hash when zinterstore.Is(hash, value) => KeyNotificationType.ZInterStore,
+ zunionstore.Hash when zunionstore.Is(hash, value) => KeyNotificationType.ZUnionStore,
+ zincr.Hash when zincr.Is(hash, value) => KeyNotificationType.ZIncr,
+ zrembyrank.Hash when zrembyrank.Is(hash, value) => KeyNotificationType.ZRemByRank,
+ zrembyscore.Hash when zrembyscore.Is(hash, value) => KeyNotificationType.ZRemByScore,
+ zrem.Hash when zrem.Is(hash, value) => KeyNotificationType.ZRem,
+ expired.Hash when expired.Is(hash, value) => KeyNotificationType.Expired,
+ evicted.Hash when evicted.Is(hash, value) => KeyNotificationType.Evicted,
+ _new.Hash when _new.Is(hash, value) => KeyNotificationType.New,
+ overwritten.Hash when overwritten.Is(hash, value) => KeyNotificationType.Overwritten,
+ type_changed.Hash when type_changed.Is(hash, value) => KeyNotificationType.TypeChanged,
+ _ => KeyNotificationType.Unknown,
+ };
+ }
+
+ internal static ReadOnlySpan GetRawBytes(KeyNotificationType type)
+ {
+ return type switch
+ {
+ KeyNotificationType.Append => append.U8,
+ KeyNotificationType.Copy => copy.U8,
+ KeyNotificationType.Del => del.U8,
+ KeyNotificationType.Expire => expire.U8,
+ KeyNotificationType.HDel => hdel.U8,
+ KeyNotificationType.HExpired => hexpired.U8,
+ KeyNotificationType.HIncrByFloat => hincrbyfloat.U8,
+ KeyNotificationType.HIncrBy => hincrby.U8,
+ KeyNotificationType.HPersist => hpersist.U8,
+ KeyNotificationType.HSet => hset.U8,
+ KeyNotificationType.IncrByFloat => incrbyfloat.U8,
+ KeyNotificationType.IncrBy => incrby.U8,
+ KeyNotificationType.LInsert => linsert.U8,
+ KeyNotificationType.LPop => lpop.U8,
+ KeyNotificationType.LPush => lpush.U8,
+ KeyNotificationType.LRem => lrem.U8,
+ KeyNotificationType.LSet => lset.U8,
+ KeyNotificationType.LTrim => ltrim.U8,
+ KeyNotificationType.MoveFrom => move_from.U8,
+ KeyNotificationType.MoveTo => move_to.U8,
+ KeyNotificationType.Persist => persist.U8,
+ KeyNotificationType.RenameFrom => rename_from.U8,
+ KeyNotificationType.RenameTo => rename_to.U8,
+ KeyNotificationType.Restore => restore.U8,
+ KeyNotificationType.RPop => rpop.U8,
+ KeyNotificationType.RPush => rpush.U8,
+ KeyNotificationType.SAdd => sadd.U8,
+ KeyNotificationType.Set => set.U8,
+ KeyNotificationType.SetRange => setrange.U8,
+ KeyNotificationType.SortStore => sortstore.U8,
+ KeyNotificationType.SRem => srem.U8,
+ KeyNotificationType.SPop => spop.U8,
+ KeyNotificationType.XAdd => xadd.U8,
+ KeyNotificationType.XDel => xdel.U8,
+ KeyNotificationType.XGroupCreateConsumer => xgroup_createconsumer.U8,
+ KeyNotificationType.XGroupCreate => xgroup_create.U8,
+ KeyNotificationType.XGroupDelConsumer => xgroup_delconsumer.U8,
+ KeyNotificationType.XGroupDestroy => xgroup_destroy.U8,
+ KeyNotificationType.XGroupSetId => xgroup_setid.U8,
+ KeyNotificationType.XSetId => xsetid.U8,
+ KeyNotificationType.XTrim => xtrim.U8,
+ KeyNotificationType.ZAdd => zadd.U8,
+ KeyNotificationType.ZDiffStore => zdiffstore.U8,
+ KeyNotificationType.ZInterStore => zinterstore.U8,
+ KeyNotificationType.ZUnionStore => zunionstore.U8,
+ KeyNotificationType.ZIncr => zincr.U8,
+ KeyNotificationType.ZRemByRank => zrembyrank.U8,
+ KeyNotificationType.ZRemByScore => zrembyscore.U8,
+ KeyNotificationType.ZRem => zrem.U8,
+ KeyNotificationType.Expired => expired.U8,
+ KeyNotificationType.Evicted => evicted.U8,
+ KeyNotificationType.New => _new.U8,
+ KeyNotificationType.Overwritten => overwritten.U8,
+ KeyNotificationType.TypeChanged => type_changed.U8,
+ _ => Throw(),
+ };
+ static ReadOnlySpan Throw() => throw new ArgumentOutOfRangeException(nameof(type));
+ }
+
+#pragma warning disable SA1300, CS8981
+ // ReSharper disable InconsistentNaming
+ [FastHash]
+ internal static partial class append
+ {
+ }
+
+ [FastHash]
+ internal static partial class copy
+ {
+ }
+
+ [FastHash]
+ internal static partial class del
+ {
+ }
+
+ [FastHash]
+ internal static partial class expire
+ {
+ }
+
+ [FastHash]
+ internal static partial class hdel
+ {
+ }
+
+ [FastHash]
+ internal static partial class hexpired
+ {
+ }
+
+ [FastHash]
+ internal static partial class hincrbyfloat
+ {
+ }
+
+ [FastHash]
+ internal static partial class hincrby
+ {
+ }
+
+ [FastHash]
+ internal static partial class hpersist
+ {
+ }
+
+ [FastHash]
+ internal static partial class hset
+ {
+ }
+
+ [FastHash]
+ internal static partial class incrbyfloat
+ {
+ }
+
+ [FastHash]
+ internal static partial class incrby
+ {
+ }
+
+ [FastHash]
+ internal static partial class linsert
+ {
+ }
+
+ [FastHash]
+ internal static partial class lpop
+ {
+ }
+
+ [FastHash]
+ internal static partial class lpush
+ {
+ }
+
+ [FastHash]
+ internal static partial class lrem
+ {
+ }
+
+ [FastHash]
+ internal static partial class lset
+ {
+ }
+
+ [FastHash]
+ internal static partial class ltrim
+ {
+ }
+
+ [FastHash("move_from")] // by default, the generator interprets underscore as hyphen
+ internal static partial class move_from
+ {
+ }
+
+ [FastHash("move_to")] // by default, the generator interprets underscore as hyphen
+ internal static partial class move_to
+ {
+ }
+
+ [FastHash]
+ internal static partial class persist
+ {
+ }
+
+ [FastHash("rename_from")] // by default, the generator interprets underscore as hyphen
+ internal static partial class rename_from
+ {
+ }
+
+ [FastHash("rename_to")] // by default, the generator interprets underscore as hyphen
+ internal static partial class rename_to
+ {
+ }
+
+ [FastHash]
+ internal static partial class restore
+ {
+ }
+
+ [FastHash]
+ internal static partial class rpop
+ {
+ }
+
+ [FastHash]
+ internal static partial class rpush
+ {
+ }
+
+ [FastHash]
+ internal static partial class sadd
+ {
+ }
+
+ [FastHash]
+ internal static partial class set
+ {
+ }
+
+ [FastHash]
+ internal static partial class setrange
+ {
+ }
+
+ [FastHash]
+ internal static partial class sortstore
+ {
+ }
+
+ [FastHash]
+ internal static partial class srem
+ {
+ }
+
+ [FastHash]
+ internal static partial class spop
+ {
+ }
+
+ [FastHash]
+ internal static partial class xadd
+ {
+ }
+
+ [FastHash]
+ internal static partial class xdel
+ {
+ }
+
+ [FastHash] // note: becomes hyphenated
+ internal static partial class xgroup_createconsumer
+ {
+ }
+
+ [FastHash] // note: becomes hyphenated
+ internal static partial class xgroup_create
+ {
+ }
+
+ [FastHash] // note: becomes hyphenated
+ internal static partial class xgroup_delconsumer
+ {
+ }
+
+ [FastHash] // note: becomes hyphenated
+ internal static partial class xgroup_destroy
+ {
+ }
+
+ [FastHash] // note: becomes hyphenated
+ internal static partial class xgroup_setid
+ {
+ }
+
+ [FastHash]
+ internal static partial class xsetid
+ {
+ }
+
+ [FastHash]
+ internal static partial class xtrim
+ {
+ }
+
+ [FastHash]
+ internal static partial class zadd
+ {
+ }
+
+ [FastHash]
+ internal static partial class zdiffstore
+ {
+ }
+
+ [FastHash]
+ internal static partial class zinterstore
+ {
+ }
+
+ [FastHash]
+ internal static partial class zunionstore
+ {
+ }
+
+ [FastHash]
+ internal static partial class zincr
+ {
+ }
+
+ [FastHash]
+ internal static partial class zrembyrank
+ {
+ }
+
+ [FastHash]
+ internal static partial class zrembyscore
+ {
+ }
+
+ [FastHash]
+ internal static partial class zrem
+ {
+ }
+
+ [FastHash]
+ internal static partial class expired
+ {
+ }
+
+ [FastHash]
+ internal static partial class evicted
+ {
+ }
+
+ [FastHash("new")]
+ internal static partial class _new // it isn't worth making the code-gen keyword aware
+ {
+ }
+
+ [FastHash]
+ internal static partial class overwritten
+ {
+ }
+
+ [FastHash("type_changed")] // by default, the generator interprets underscore as hyphen
+ internal static partial class type_changed
+ {
+ }
+
+ // ReSharper restore InconsistentNaming
+#pragma warning restore SA1300, CS8981
+}
diff --git a/src/StackExchange.Redis/PublicAPI/PublicAPI.Unshipped.txt b/src/StackExchange.Redis/PublicAPI/PublicAPI.Unshipped.txt
index 91b0e1a43..871fe71f0 100644
--- a/src/StackExchange.Redis/PublicAPI/PublicAPI.Unshipped.txt
+++ b/src/StackExchange.Redis/PublicAPI/PublicAPI.Unshipped.txt
@@ -1 +1,74 @@
-#nullable enable
\ No newline at end of file
+#nullable enable
+StackExchange.Redis.KeyNotification
+StackExchange.Redis.KeyNotification.Channel.get -> StackExchange.Redis.RedisChannel
+StackExchange.Redis.KeyNotification.Value.get -> StackExchange.Redis.RedisValue
+StackExchange.Redis.KeyNotification.Database.get -> int
+StackExchange.Redis.KeyNotification.GetKey() -> StackExchange.Redis.RedisKey
+StackExchange.Redis.KeyNotification.IsKeyEvent.get -> bool
+StackExchange.Redis.KeyNotification.IsKeySpace.get -> bool
+StackExchange.Redis.KeyNotification.KeyByteCount.get -> int
+StackExchange.Redis.KeyNotification.KeyNotification() -> void
+StackExchange.Redis.KeyNotification.TryCopyKey(System.Span destination, out int bytesWritten) -> bool
+StackExchange.Redis.KeyNotification.Type.get -> StackExchange.Redis.KeyNotificationType
+static StackExchange.Redis.KeyNotification.TryParse(in StackExchange.Redis.RedisChannel channel, in StackExchange.Redis.RedisValue value, out StackExchange.Redis.KeyNotification notification) -> bool
+StackExchange.Redis.ChannelMessage.TryParseKeyNotification(out StackExchange.Redis.KeyNotification notification) -> bool
+static StackExchange.Redis.RedisChannel.KeyEvent(StackExchange.Redis.KeyNotificationType type, int? database = null) -> StackExchange.Redis.RedisChannel
+static StackExchange.Redis.RedisChannel.KeyEvent(System.ReadOnlySpan type, int? database) -> StackExchange.Redis.RedisChannel
+static StackExchange.Redis.RedisChannel.KeySpace(in StackExchange.Redis.RedisKey key, int database) -> StackExchange.Redis.RedisChannel
+static StackExchange.Redis.RedisChannel.KeySpacePattern(in StackExchange.Redis.RedisKey pattern, int? database = null) -> StackExchange.Redis.RedisChannel
+StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.Append = 1 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.Copy = 2 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.Del = 3 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.Evicted = 1001 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.Expire = 4 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.Expired = 1000 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.HDel = 5 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.HExpired = 6 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.HIncrBy = 8 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.HIncrByFloat = 7 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.HPersist = 9 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.HSet = 10 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.IncrBy = 12 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.IncrByFloat = 11 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.LInsert = 13 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.LPop = 14 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.LPush = 15 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.LRem = 16 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.LSet = 17 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.LTrim = 18 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.MoveFrom = 19 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.MoveTo = 20 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.New = 1002 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.Overwritten = 1003 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.Persist = 21 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.RenameFrom = 22 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.RenameTo = 23 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.Restore = 24 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.RPop = 25 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.RPush = 26 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.SAdd = 27 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.Set = 28 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.SetRange = 29 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.SortStore = 30 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.SPop = 32 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.SRem = 31 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.TypeChanged = 1004 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.Unknown = 0 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.XAdd = 33 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.XDel = 34 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.XGroupCreate = 36 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.XGroupCreateConsumer = 35 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.XGroupDelConsumer = 37 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.XGroupDestroy = 38 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.XGroupSetId = 39 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.XSetId = 40 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.XTrim = 41 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.ZAdd = 42 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.ZDiffStore = 43 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.ZIncr = 46 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.ZInterStore = 44 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.ZRem = 49 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.ZRemByRank = 47 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.ZRemByScore = 48 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.ZUnionStore = 45 -> StackExchange.Redis.KeyNotificationType
diff --git a/src/StackExchange.Redis/RedisChannel.cs b/src/StackExchange.Redis/RedisChannel.cs
index d4289f3c6..7b208bb9d 100644
--- a/src/StackExchange.Redis/RedisChannel.cs
+++ b/src/StackExchange.Redis/RedisChannel.cs
@@ -1,4 +1,5 @@
using System;
+using System.Diagnostics;
using System.Text;
namespace StackExchange.Redis
@@ -10,6 +11,8 @@ namespace StackExchange.Redis
{
internal readonly byte[]? Value;
+ internal ReadOnlySpan Span => Value is null ? default : Value.AsSpan();
+
internal readonly RedisChannelOptions Options;
[Flags]
@@ -19,19 +22,36 @@ internal enum RedisChannelOptions
Pattern = 1 << 0,
Sharded = 1 << 1,
KeyRouted = 1 << 2,
+ MultiNode = 1 << 3,
}
// we don't consider Routed for equality - it's an implementation detail, not a fundamental feature
- private const RedisChannelOptions EqualityMask = ~RedisChannelOptions.KeyRouted;
+ private const RedisChannelOptions EqualityMask =
+ ~(RedisChannelOptions.KeyRouted | RedisChannelOptions.MultiNode);
- internal RedisCommand PublishCommand => IsSharded ? RedisCommand.SPUBLISH : RedisCommand.PUBLISH;
+ internal RedisCommand GetPublishCommand()
+ {
+ return (Options & (RedisChannelOptions.Sharded | RedisChannelOptions.MultiNode)) switch
+ {
+ RedisChannelOptions.None => RedisCommand.PUBLISH,
+ RedisChannelOptions.Sharded => RedisCommand.SPUBLISH,
+ _ => ThrowKeyRouted(),
+ };
+
+ static RedisCommand ThrowKeyRouted() => throw new InvalidOperationException("Publishing is not supported for multi-node channels");
+ }
///
- /// Should we use cluster routing for this channel? This applies *either* to sharded (SPUBLISH) scenarios,
+ /// Should we use cluster routing for this channel? This applies *either* to sharded (SPUBLISH) scenarios,
/// or to scenarios using .
///
internal bool IsKeyRouted => (Options & RedisChannelOptions.KeyRouted) != 0;
+ ///
+ /// Should this channel be subscribed to on all nodes? This is only relevant for cluster scenarios and keyspace notifications.
+ ///
+ internal bool IsMultiNode => (Options & RedisChannelOptions.MultiNode) != 0;
+
///
/// Indicates whether the channel-name is either null or a zero-length value.
///
@@ -58,6 +78,7 @@ public static bool UseImplicitAutoPattern
get => s_DefaultPatternMode == PatternMode.Auto;
set => s_DefaultPatternMode = value ? PatternMode.Auto : PatternMode.Literal;
}
+
private static PatternMode s_DefaultPatternMode = PatternMode.Auto;
///
@@ -82,7 +103,13 @@ public static bool UseImplicitAutoPattern
/// a consideration.
///
/// Note that channels from Sharded are always routed.
- public RedisChannel WithKeyRouting() => new(Value, Options | RedisChannelOptions.KeyRouted);
+ public RedisChannel WithKeyRouting()
+ {
+ if (IsMultiNode) Throw();
+ return new(Value, Options | RedisChannelOptions.KeyRouted);
+
+ static void Throw() => throw new InvalidOperationException("Key routing is not supported for multi-node channels");
+ }
///
/// Creates a new that acts as a wildcard subscription. In cluster
@@ -105,7 +132,8 @@ public static bool UseImplicitAutoPattern
///
/// The name of the channel to create.
/// The mode for name matching.
- public RedisChannel(byte[]? value, PatternMode mode) : this(value, DeterminePatternBased(value, mode) ? RedisChannelOptions.Pattern : RedisChannelOptions.None)
+ public RedisChannel(byte[]? value, PatternMode mode) : this(
+ value, DeterminePatternBased(value, mode) ? RedisChannelOptions.Pattern : RedisChannelOptions.None)
{
}
@@ -115,7 +143,9 @@ public RedisChannel(byte[]? value, PatternMode mode) : this(value, DeterminePatt
/// The string name of the channel to create.
/// The mode for name matching.
// ReSharper disable once ConditionIsAlwaysTrueOrFalseAccordingToNullableAPIContract
- public RedisChannel(string value, PatternMode mode) : this(value is null ? null : Encoding.UTF8.GetBytes(value), mode)
+ public RedisChannel(string value, PatternMode mode) : this(
+ // ReSharper disable once ConditionIsAlwaysTrueOrFalseAccordingToNullableAPIContract
+ value is null ? null : Encoding.UTF8.GetBytes(value), mode)
{
}
@@ -128,7 +158,8 @@ public RedisChannel(string value, PatternMode mode) : this(value is null ? null
/// The name of the channel to create.
/// Note that sharded subscriptions are completely separate to regular subscriptions; subscriptions
/// using sharded channels must also be published with sharded channels (and vice versa).
- public static RedisChannel Sharded(byte[]? value) => new(value, RedisChannelOptions.Sharded | RedisChannelOptions.KeyRouted);
+ public static RedisChannel Sharded(byte[]? value) =>
+ new(value, RedisChannelOptions.Sharded | RedisChannelOptions.KeyRouted);
///
/// Create a new redis channel from a string, representing a sharded channel. In cluster
@@ -139,7 +170,112 @@ public RedisChannel(string value, PatternMode mode) : this(value is null ? null
/// The string name of the channel to create.
/// Note that sharded subscriptions are completely separate to regular subscriptions; subscriptions
/// using sharded channels must also be published with sharded channels (and vice versa).
- public static RedisChannel Sharded(string value) => new(value, RedisChannelOptions.Sharded | RedisChannelOptions.KeyRouted);
+ public static RedisChannel Sharded(string value) =>
+ new(value, RedisChannelOptions.Sharded | RedisChannelOptions.KeyRouted);
+
+ ///
+ /// Create a key-notification channel for a single key in a single database.
+ ///
+ public static RedisChannel KeySpace(in RedisKey key, int database)
+ => BuildKeySpace(key, database, RedisChannelOptions.KeyRouted);
+
+ ///
+ /// Create a key-notification channel for a pattern, optionally in a specified database.
+ ///
+ public static RedisChannel KeySpacePattern(in RedisKey pattern, int? database = null)
+ => BuildKeySpace(pattern, database, RedisChannelOptions.Pattern | RedisChannelOptions.MultiNode);
+
+ private const int DatabaseScratchBufferSize = 16; // largest non-negative int32 is 10 digits
+
+ private static ReadOnlySpan AppendDatabase(Span target, int? database, RedisChannelOptions options)
+ {
+ if (database is null)
+ {
+ if ((options & RedisChannelOptions.Pattern) == 0) throw new ArgumentNullException(nameof(database));
+ return "*"u8; // don't worry about the inbound scratch buffer, this is fine
+ }
+ else
+ {
+ var db32 = database.GetValueOrDefault();
+ if (db32 == 0) return "0"u8; // so common, we might as well special case
+ if (db32 < 0) throw new ArgumentOutOfRangeException(nameof(database));
+ return target.Slice(0, Format.FormatInt32(db32, target));
+ }
+ }
+
+ ///
+ /// Create an event-notification channel for a given event type, optionally in a specified database.
+ ///
+#pragma warning disable RS0027
+ public static RedisChannel KeyEvent(KeyNotificationType type, int? database = null)
+#pragma warning restore RS0027
+ => KeyEvent(KeyNotificationTypeFastHash.GetRawBytes(type), database);
+
+ ///
+ /// Create an event-notification channel for a given event type, optionally in a specified database.
+ ///
+ /// This API is intended for use with custom/unknown event types; for well-known types, use .
+ public static RedisChannel KeyEvent(ReadOnlySpan type, int? database)
+ {
+ if (type.IsEmpty) throw new ArgumentNullException(nameof(type));
+
+ RedisChannelOptions options = RedisChannelOptions.MultiNode;
+ if (database is null) options |= RedisChannelOptions.Pattern;
+ var db = AppendDatabase(stackalloc byte[DatabaseScratchBufferSize], database, options);
+
+ // __keyevent@{db}__:{type}
+ var arr = new byte[14 + db.Length + type.Length];
+
+ var target = AppendAndAdvance(arr.AsSpan(), "__keyevent@"u8);
+ target = AppendAndAdvance(target, db);
+ target = AppendAndAdvance(target, "__:"u8);
+ target = AppendAndAdvance(target, type);
+ Debug.Assert(target.IsEmpty); // should have calculated length correctly
+
+ return new RedisChannel(arr, options);
+ }
+
+ private static Span AppendAndAdvance(Span target, scoped ReadOnlySpan value)
+ {
+ value.CopyTo(target);
+ return target.Slice(value.Length);
+ }
+
+ private static RedisChannel BuildKeySpace(in RedisKey key, int? database, RedisChannelOptions options)
+ {
+ int keyLen;
+ if (key.IsNull)
+ {
+ if ((options & RedisChannelOptions.Pattern) == 0) throw new ArgumentNullException(nameof(key));
+ keyLen = 1;
+ }
+ else
+ {
+ keyLen = key.TotalLength();
+ if (keyLen == 0) throw new ArgumentOutOfRangeException(nameof(key));
+ }
+
+ var db = AppendDatabase(stackalloc byte[DatabaseScratchBufferSize], database, options);
+
+ // __keyspace@{db}__:{key}
+ var arr = new byte[14 + db.Length + keyLen];
+
+ var target = AppendAndAdvance(arr.AsSpan(), "__keyspace@"u8);
+ target = AppendAndAdvance(target, db);
+ target = AppendAndAdvance(target, "__:"u8);
+ Debug.Assert(keyLen == target.Length); // should have exactly "len" bytes remaining
+ if (key.IsNull)
+ {
+ target[0] = (byte)'*';
+ target = target.Slice(1);
+ }
+ else
+ {
+ target = target.Slice(key.CopyTo(target));
+ }
+ Debug.Assert(target.IsEmpty); // should have calculated length correctly
+ return new RedisChannel(arr, options);
+ }
internal RedisChannel(byte[]? value, RedisChannelOptions options)
{
@@ -351,7 +487,7 @@ public static implicit operator RedisChannel(byte[]? key)
{
return Encoding.UTF8.GetString(arr);
}
- catch (Exception e) when // Only catch exception throwed by Encoding.UTF8.GetString
+ catch (Exception e) when // Only catch exception thrown by Encoding.UTF8.GetString
(e is DecoderFallbackException or ArgumentException or ArgumentNullException)
{
return BitConverter.ToString(arr);
diff --git a/src/StackExchange.Redis/RedisDatabase.cs b/src/StackExchange.Redis/RedisDatabase.cs
index f13571c4e..d8b62e2b9 100644
--- a/src/StackExchange.Redis/RedisDatabase.cs
+++ b/src/StackExchange.Redis/RedisDatabase.cs
@@ -1901,7 +1901,7 @@ public Task StringLongestCommonSubsequenceWithMatchesAsync(Redis
public long Publish(RedisChannel channel, RedisValue message, CommandFlags flags = CommandFlags.None)
{
if (channel.IsNullOrEmpty) throw new ArgumentNullException(nameof(channel));
- var msg = Message.Create(-1, flags, channel.PublishCommand, channel, message);
+ var msg = Message.Create(-1, flags, channel.GetPublishCommand(), channel, message);
// if we're actively subscribed: send via that connection (otherwise, follow normal rules)
return ExecuteSync(msg, ResultProcessor.Int64, server: multiplexer.GetSubscribedServer(channel));
}
@@ -1909,7 +1909,7 @@ public long Publish(RedisChannel channel, RedisValue message, CommandFlags flags
public Task PublishAsync(RedisChannel channel, RedisValue message, CommandFlags flags = CommandFlags.None)
{
if (channel.IsNullOrEmpty) throw new ArgumentNullException(nameof(channel));
- var msg = Message.Create(-1, flags, channel.PublishCommand, channel, message);
+ var msg = Message.Create(-1, flags, channel.GetPublishCommand(), channel, message);
// if we're actively subscribed: send via that connection (otherwise, follow normal rules)
return ExecuteAsync(msg, ResultProcessor.Int64, server: multiplexer.GetSubscribedServer(channel));
}
diff --git a/src/StackExchange.Redis/RedisSubscriber.cs b/src/StackExchange.Redis/RedisSubscriber.cs
index 9ade78c2d..5d083164b 100644
--- a/src/StackExchange.Redis/RedisSubscriber.cs
+++ b/src/StackExchange.Redis/RedisSubscriber.cs
@@ -393,7 +393,7 @@ private static void ThrowIfNull(in RedisChannel channel)
public long Publish(RedisChannel channel, RedisValue message, CommandFlags flags = CommandFlags.None)
{
ThrowIfNull(channel);
- var msg = Message.Create(-1, flags, channel.PublishCommand, channel, message);
+ var msg = Message.Create(-1, flags, channel.GetPublishCommand(), channel, message);
// if we're actively subscribed: send via that connection (otherwise, follow normal rules)
return ExecuteSync(msg, ResultProcessor.Int64, server: multiplexer.GetSubscribedServer(channel));
}
@@ -401,7 +401,7 @@ public long Publish(RedisChannel channel, RedisValue message, CommandFlags flags
public Task PublishAsync(RedisChannel channel, RedisValue message, CommandFlags flags = CommandFlags.None)
{
ThrowIfNull(channel);
- var msg = Message.Create(-1, flags, channel.PublishCommand, channel, message);
+ var msg = Message.Create(-1, flags, channel.GetPublishCommand(), channel, message);
// if we're actively subscribed: send via that connection (otherwise, follow normal rules)
return ExecuteAsync(msg, ResultProcessor.Int64, server: multiplexer.GetSubscribedServer(channel));
}
diff --git a/src/StackExchange.Redis/RedisValue.cs b/src/StackExchange.Redis/RedisValue.cs
index d306ca0d0..1f6947460 100644
--- a/src/StackExchange.Redis/RedisValue.cs
+++ b/src/StackExchange.Redis/RedisValue.cs
@@ -1245,5 +1245,16 @@ internal ValueCondition Digest()
return digest;
}
}
+
+ internal bool TryGetSpan(out ReadOnlySpan span)
+ {
+ if (_objectOrSentinel == Sentinel_Raw)
+ {
+ span = _memory.Span;
+ return true;
+ }
+ span = default;
+ return false;
+ }
}
}
diff --git a/tests/StackExchange.Redis.Tests/FastHashTests.cs b/tests/StackExchange.Redis.Tests/FastHashTests.cs
index 418198cfd..a032cfc80 100644
--- a/tests/StackExchange.Redis.Tests/FastHashTests.cs
+++ b/tests/StackExchange.Redis.Tests/FastHashTests.cs
@@ -2,13 +2,14 @@
using System.Runtime.InteropServices;
using System.Text;
using Xunit;
+using Xunit.Sdk;
#pragma warning disable CS8981, SA1134, SA1300, SA1303, SA1502 // names are weird in this test!
// ReSharper disable InconsistentNaming - to better represent expected literals
// ReSharper disable IdentifierTypo
namespace StackExchange.Redis.Tests;
-public partial class FastHashTests
+public partial class FastHashTests(ITestOutputHelper log)
{
// note: if the hashing algorithm changes, we can update the last parameter freely; it doesn't matter
// what it *is* - what matters is that we can see that it has entropy between different values
@@ -83,6 +84,46 @@ public void FastHashIs_Long()
Assert.False(abcdefghijklmnopqrst.Is(hash, value));
}
+ [Fact]
+ public void KeyNotificationTypeFastHash_MinMaxBytes_ReflectsActualLengths()
+ {
+ // Use reflection to find all nested types in KeyNotificationTypeFastHash
+ var fastHashType = typeof(KeyNotificationTypeFastHash);
+ var nestedTypes = fastHashType.GetNestedTypes(System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Static);
+
+ int? minLength = null;
+ int? maxLength = null;
+
+ foreach (var nestedType in nestedTypes)
+ {
+ // Look for the Length field (generated by FastHash source generator)
+ var lengthField = nestedType.GetField("Length", System.Reflection.BindingFlags.Public | System.Reflection.BindingFlags.Static);
+ if (lengthField != null && lengthField.FieldType == typeof(int))
+ {
+ var length = (int)lengthField.GetValue(null)!;
+
+ if (minLength == null || length < minLength)
+ {
+ minLength = length;
+ }
+
+ if (maxLength == null || length > maxLength)
+ {
+ maxLength = length;
+ }
+ }
+ }
+
+ // Assert that we found at least some nested types with Length fields
+ Assert.NotNull(minLength);
+ Assert.NotNull(maxLength);
+
+ // Assert that MinBytes and MaxBytes match the actual min/max lengths
+ log.WriteLine($"MinBytes: {KeyNotificationTypeFastHash.MinBytes}, MaxBytes: {KeyNotificationTypeFastHash.MaxBytes}");
+ Assert.Equal(KeyNotificationTypeFastHash.MinBytes, minLength.Value);
+ Assert.Equal(KeyNotificationTypeFastHash.MaxBytes, maxLength.Value);
+ }
+
[FastHash] private static partial class a { }
[FastHash] private static partial class ab { }
[FastHash] private static partial class abc { }
diff --git a/tests/StackExchange.Redis.Tests/KeyNotificationTests.cs b/tests/StackExchange.Redis.Tests/KeyNotificationTests.cs
new file mode 100644
index 000000000..b9548eb44
--- /dev/null
+++ b/tests/StackExchange.Redis.Tests/KeyNotificationTests.cs
@@ -0,0 +1,500 @@
+using System;
+using System.Buffers;
+using System.Text;
+using Xunit;
+
+namespace StackExchange.Redis.Tests;
+
+public class KeyNotificationTests(ITestOutputHelper log)
+{
+ [Fact]
+ public void Keyspace_Del_ParsesCorrectly()
+ {
+ // __keyspace@1__:mykey with payload "del"
+ var channel = RedisChannel.Literal("__keyspace@1__:mykey");
+ RedisValue value = "del";
+
+ Assert.True(KeyNotification.TryParse(in channel, in value, out var notification));
+
+ Assert.True(notification.IsKeySpace);
+ Assert.False(notification.IsKeyEvent);
+ Assert.Equal(1, notification.Database);
+ Assert.Equal(KeyNotificationType.Del, notification.Type);
+ Assert.Equal("mykey", (string?)notification.GetKey());
+ Assert.Equal(5, notification.KeyByteCount);
+ }
+
+ [Fact]
+ public void Keyevent_Del_ParsesCorrectly()
+ {
+ // __keyevent@42__:del with value "mykey"
+ var channel = RedisChannel.Literal("__keyevent@42__:del");
+ RedisValue value = "mykey";
+
+ Assert.True(KeyNotification.TryParse(in channel, in value, out var notification));
+
+ Assert.False(notification.IsKeySpace);
+ Assert.True(notification.IsKeyEvent);
+ Assert.Equal(42, notification.Database);
+ Assert.Equal(KeyNotificationType.Del, notification.Type);
+ Assert.Equal("mykey", (string?)notification.GetKey());
+ Assert.Equal(5, notification.KeyByteCount);
+ }
+
+ [Fact]
+ public void Keyspace_Set_ParsesCorrectly()
+ {
+ var channel = RedisChannel.Literal("__keyspace@0__:testkey");
+ RedisValue value = "set";
+
+ Assert.True(KeyNotification.TryParse(in channel, in value, out var notification));
+
+ Assert.True(notification.IsKeySpace);
+ Assert.Equal(0, notification.Database);
+ Assert.Equal(KeyNotificationType.Set, notification.Type);
+ Assert.Equal("testkey", (string?)notification.GetKey());
+ }
+
+ [Fact]
+ public void Keyevent_Expire_ParsesCorrectly()
+ {
+ var channel = RedisChannel.Literal("__keyevent@5__:expire");
+ RedisValue value = "session:12345";
+
+ Assert.True(KeyNotification.TryParse(in channel, in value, out var notification));
+
+ Assert.True(notification.IsKeyEvent);
+ Assert.Equal(5, notification.Database);
+ Assert.Equal(KeyNotificationType.Expire, notification.Type);
+ Assert.Equal("session:12345", (string?)notification.GetKey());
+ }
+
+ [Fact]
+ public void Keyspace_Expired_ParsesCorrectly()
+ {
+ var channel = RedisChannel.Literal("__keyspace@3__:cache:item");
+ RedisValue value = "expired";
+
+ Assert.True(KeyNotification.TryParse(in channel, in value, out var notification));
+
+ Assert.True(notification.IsKeySpace);
+ Assert.Equal(3, notification.Database);
+ Assert.Equal(KeyNotificationType.Expired, notification.Type);
+ Assert.Equal("cache:item", (string?)notification.GetKey());
+ }
+
+ [Fact]
+ public void Keyevent_LPush_ParsesCorrectly()
+ {
+ var channel = RedisChannel.Literal("__keyevent@0__:lpush");
+ RedisValue value = "queue:tasks";
+
+ Assert.True(KeyNotification.TryParse(in channel, in value, out var notification));
+
+ Assert.True(notification.IsKeyEvent);
+ Assert.Equal(0, notification.Database);
+ Assert.Equal(KeyNotificationType.LPush, notification.Type);
+ Assert.Equal("queue:tasks", (string?)notification.GetKey());
+ }
+
+ [Fact]
+ public void Keyspace_HSet_ParsesCorrectly()
+ {
+ var channel = RedisChannel.Literal("__keyspace@2__:user:1000");
+ RedisValue value = "hset";
+
+ Assert.True(KeyNotification.TryParse(in channel, in value, out var notification));
+
+ Assert.True(notification.IsKeySpace);
+ Assert.Equal(2, notification.Database);
+ Assert.Equal(KeyNotificationType.HSet, notification.Type);
+ Assert.Equal("user:1000", (string?)notification.GetKey());
+ }
+
+ [Fact]
+ public void Keyevent_ZAdd_ParsesCorrectly()
+ {
+ var channel = RedisChannel.Literal("__keyevent@7__:zadd");
+ RedisValue value = "leaderboard";
+
+ Assert.True(KeyNotification.TryParse(in channel, in value, out var notification));
+
+ Assert.True(notification.IsKeyEvent);
+ Assert.Equal(7, notification.Database);
+ Assert.Equal(KeyNotificationType.ZAdd, notification.Type);
+ Assert.Equal("leaderboard", (string?)notification.GetKey());
+ }
+
+ [Fact]
+ public void TryCopyKey_WorksCorrectly()
+ {
+ var channel = RedisChannel.Literal("__keyspace@0__:testkey");
+ RedisValue value = "set";
+
+ Assert.True(KeyNotification.TryParse(in channel, in value, out var notification));
+
+ var lease = ArrayPool.Shared.Rent(20);
+ Span buffer = lease.AsSpan(0, 20);
+ Assert.True(notification.TryCopyKey(buffer, out var bytesWritten));
+ Assert.Equal(7, bytesWritten);
+ Assert.Equal("testkey", Encoding.UTF8.GetString(lease, 0, bytesWritten));
+ ArrayPool.Shared.Return(lease);
+ }
+
+ [Fact]
+ public void TryCopyKey_FailsWithSmallBuffer()
+ {
+ var channel = RedisChannel.Literal("__keyspace@0__:testkey");
+ RedisValue value = "set";
+
+ Assert.True(KeyNotification.TryParse(in channel, in value, out var notification));
+
+ Span buffer = stackalloc byte[3]; // too small
+ Assert.False(notification.TryCopyKey(buffer, out var bytesWritten));
+ Assert.Equal(0, bytesWritten);
+ }
+
+ [Fact]
+ public void InvalidChannel_ReturnsFalse()
+ {
+ var channel = RedisChannel.Literal("regular:channel");
+ RedisValue value = "data";
+
+ Assert.False(KeyNotification.TryParse(in channel, in value, out var notification));
+ }
+
+ [Fact]
+ public void InvalidKeyspaceChannel_MissingDelimiter_ReturnsFalse()
+ {
+ var channel = RedisChannel.Literal("__keyspace@0__"); // missing the key part
+ RedisValue value = "set";
+
+ Assert.False(KeyNotification.TryParse(in channel, in value, out var notification));
+ }
+
+ [Fact]
+ public void Keyspace_UnknownEventType_ReturnsUnknown()
+ {
+ var channel = RedisChannel.Literal("__keyspace@0__:mykey");
+ RedisValue value = "unknownevent";
+
+ Assert.True(KeyNotification.TryParse(in channel, in value, out var notification));
+
+ Assert.True(notification.IsKeySpace);
+ Assert.Equal(0, notification.Database);
+ Assert.Equal(KeyNotificationType.Unknown, notification.Type);
+ Assert.Equal("mykey", (string?)notification.GetKey());
+ }
+
+ [Fact]
+ public void Keyevent_UnknownEventType_ReturnsUnknown()
+ {
+ var channel = RedisChannel.Literal("__keyevent@0__:unknownevent");
+ RedisValue value = "mykey";
+
+ Assert.True(KeyNotification.TryParse(in channel, in value, out var notification));
+
+ Assert.True(notification.IsKeyEvent);
+ Assert.Equal(0, notification.Database);
+ Assert.Equal(KeyNotificationType.Unknown, notification.Type);
+ Assert.Equal("mykey", (string?)notification.GetKey());
+ }
+
+ [Fact]
+ public void Keyspace_WithColonInKey_ParsesCorrectly()
+ {
+ var channel = RedisChannel.Literal("__keyspace@0__:user:session:12345");
+ RedisValue value = "del";
+
+ Assert.True(KeyNotification.TryParse(in channel, in value, out var notification));
+
+ Assert.True(notification.IsKeySpace);
+ Assert.Equal(0, notification.Database);
+ Assert.Equal(KeyNotificationType.Del, notification.Type);
+ Assert.Equal("user:session:12345", (string?)notification.GetKey());
+ }
+
+ [Fact]
+ public void Keyevent_Evicted_ParsesCorrectly()
+ {
+ var channel = RedisChannel.Literal("__keyevent@1__:evicted");
+ RedisValue value = "cache:old";
+
+ Assert.True(KeyNotification.TryParse(in channel, in value, out var notification));
+
+ Assert.True(notification.IsKeyEvent);
+ Assert.Equal(1, notification.Database);
+ Assert.Equal(KeyNotificationType.Evicted, notification.Type);
+ Assert.Equal("cache:old", (string?)notification.GetKey());
+ }
+
+ [Fact]
+ public void Keyspace_New_ParsesCorrectly()
+ {
+ var channel = RedisChannel.Literal("__keyspace@0__:newkey");
+ RedisValue value = "new";
+
+ Assert.True(KeyNotification.TryParse(in channel, in value, out var notification));
+
+ Assert.True(notification.IsKeySpace);
+ Assert.Equal(0, notification.Database);
+ Assert.Equal(KeyNotificationType.New, notification.Type);
+ Assert.Equal("newkey", (string?)notification.GetKey());
+ }
+
+ [Fact]
+ public void Keyevent_XGroupCreate_ParsesCorrectly()
+ {
+ var channel = RedisChannel.Literal("__keyevent@0__:xgroup-create");
+ RedisValue value = "mystream";
+
+ Assert.True(KeyNotification.TryParse(in channel, in value, out var notification));
+
+ Assert.True(notification.IsKeyEvent);
+ Assert.Equal(0, notification.Database);
+ Assert.Equal(KeyNotificationType.XGroupCreate, notification.Type);
+ Assert.Equal("mystream", (string?)notification.GetKey());
+ }
+
+ [Fact]
+ public void Keyspace_TypeChanged_ParsesCorrectly()
+ {
+ var channel = RedisChannel.Literal("__keyspace@0__:mykey");
+ RedisValue value = "type_changed";
+
+ Assert.True(KeyNotification.TryParse(in channel, in value, out var notification));
+
+ Assert.True(notification.IsKeySpace);
+ Assert.Equal(0, notification.Database);
+ Assert.Equal(KeyNotificationType.TypeChanged, notification.Type);
+ Assert.Equal("mykey", (string?)notification.GetKey());
+ }
+
+ [Fact]
+ public void Keyevent_HighDatabaseNumber_ParsesCorrectly()
+ {
+ var channel = RedisChannel.Literal("__keyevent@999__:set");
+ RedisValue value = "testkey";
+
+ Assert.True(KeyNotification.TryParse(in channel, in value, out var notification));
+
+ Assert.True(notification.IsKeyEvent);
+ Assert.Equal(999, notification.Database);
+ Assert.Equal(KeyNotificationType.Set, notification.Type);
+ Assert.Equal("testkey", (string?)notification.GetKey());
+ }
+
+ [Fact]
+ public void Keyevent_NonIntegerDatabase_ParsesWellEnough()
+ {
+ var channel = RedisChannel.Literal("__keyevent@abc__:set");
+ RedisValue value = "testkey";
+
+ Assert.True(KeyNotification.TryParse(in channel, in value, out var notification));
+
+ Assert.True(notification.IsKeyEvent);
+ Assert.Equal(-1, notification.Database);
+ Assert.Equal(KeyNotificationType.Set, notification.Type);
+ Assert.Equal("testkey", (string?)notification.GetKey());
+ }
+
+ [Fact]
+ public void DefaultKeyNotification_HasExpectedProperties()
+ {
+ var notification = default(KeyNotification);
+
+ Assert.False(notification.IsKeySpace);
+ Assert.False(notification.IsKeyEvent);
+ Assert.Equal(-1, notification.Database);
+ Assert.Equal(KeyNotificationType.Unknown, notification.Type);
+ Assert.True(notification.GetKey().IsNull);
+ Assert.Equal(0, notification.KeyByteCount);
+ Assert.True(notification.Channel.IsNull);
+ Assert.True(notification.Value.IsNull);
+
+ // TryCopyKey should return false and write 0 bytes
+ Span buffer = stackalloc byte[10];
+ Assert.False(notification.TryCopyKey(buffer, out var bytesWritten));
+ Assert.Equal(0, bytesWritten);
+ }
+
+ [Theory]
+ [InlineData(KeyNotificationTypeFastHash.append.Text, KeyNotificationType.Append)]
+ [InlineData(KeyNotificationTypeFastHash.copy.Text, KeyNotificationType.Copy)]
+ [InlineData(KeyNotificationTypeFastHash.del.Text, KeyNotificationType.Del)]
+ [InlineData(KeyNotificationTypeFastHash.expire.Text, KeyNotificationType.Expire)]
+ [InlineData(KeyNotificationTypeFastHash.hdel.Text, KeyNotificationType.HDel)]
+ [InlineData(KeyNotificationTypeFastHash.hexpired.Text, KeyNotificationType.HExpired)]
+ [InlineData(KeyNotificationTypeFastHash.hincrbyfloat.Text, KeyNotificationType.HIncrByFloat)]
+ [InlineData(KeyNotificationTypeFastHash.hincrby.Text, KeyNotificationType.HIncrBy)]
+ [InlineData(KeyNotificationTypeFastHash.hpersist.Text, KeyNotificationType.HPersist)]
+ [InlineData(KeyNotificationTypeFastHash.hset.Text, KeyNotificationType.HSet)]
+ [InlineData(KeyNotificationTypeFastHash.incrbyfloat.Text, KeyNotificationType.IncrByFloat)]
+ [InlineData(KeyNotificationTypeFastHash.incrby.Text, KeyNotificationType.IncrBy)]
+ [InlineData(KeyNotificationTypeFastHash.linsert.Text, KeyNotificationType.LInsert)]
+ [InlineData(KeyNotificationTypeFastHash.lpop.Text, KeyNotificationType.LPop)]
+ [InlineData(KeyNotificationTypeFastHash.lpush.Text, KeyNotificationType.LPush)]
+ [InlineData(KeyNotificationTypeFastHash.lrem.Text, KeyNotificationType.LRem)]
+ [InlineData(KeyNotificationTypeFastHash.lset.Text, KeyNotificationType.LSet)]
+ [InlineData(KeyNotificationTypeFastHash.ltrim.Text, KeyNotificationType.LTrim)]
+ [InlineData(KeyNotificationTypeFastHash.move_from.Text, KeyNotificationType.MoveFrom)]
+ [InlineData(KeyNotificationTypeFastHash.move_to.Text, KeyNotificationType.MoveTo)]
+ [InlineData(KeyNotificationTypeFastHash.persist.Text, KeyNotificationType.Persist)]
+ [InlineData(KeyNotificationTypeFastHash.rename_from.Text, KeyNotificationType.RenameFrom)]
+ [InlineData(KeyNotificationTypeFastHash.rename_to.Text, KeyNotificationType.RenameTo)]
+ [InlineData(KeyNotificationTypeFastHash.restore.Text, KeyNotificationType.Restore)]
+ [InlineData(KeyNotificationTypeFastHash.rpop.Text, KeyNotificationType.RPop)]
+ [InlineData(KeyNotificationTypeFastHash.rpush.Text, KeyNotificationType.RPush)]
+ [InlineData(KeyNotificationTypeFastHash.sadd.Text, KeyNotificationType.SAdd)]
+ [InlineData(KeyNotificationTypeFastHash.set.Text, KeyNotificationType.Set)]
+ [InlineData(KeyNotificationTypeFastHash.setrange.Text, KeyNotificationType.SetRange)]
+ [InlineData(KeyNotificationTypeFastHash.sortstore.Text, KeyNotificationType.SortStore)]
+ [InlineData(KeyNotificationTypeFastHash.srem.Text, KeyNotificationType.SRem)]
+ [InlineData(KeyNotificationTypeFastHash.spop.Text, KeyNotificationType.SPop)]
+ [InlineData(KeyNotificationTypeFastHash.xadd.Text, KeyNotificationType.XAdd)]
+ [InlineData(KeyNotificationTypeFastHash.xdel.Text, KeyNotificationType.XDel)]
+ [InlineData(KeyNotificationTypeFastHash.xgroup_createconsumer.Text, KeyNotificationType.XGroupCreateConsumer)]
+ [InlineData(KeyNotificationTypeFastHash.xgroup_create.Text, KeyNotificationType.XGroupCreate)]
+ [InlineData(KeyNotificationTypeFastHash.xgroup_delconsumer.Text, KeyNotificationType.XGroupDelConsumer)]
+ [InlineData(KeyNotificationTypeFastHash.xgroup_destroy.Text, KeyNotificationType.XGroupDestroy)]
+ [InlineData(KeyNotificationTypeFastHash.xgroup_setid.Text, KeyNotificationType.XGroupSetId)]
+ [InlineData(KeyNotificationTypeFastHash.xsetid.Text, KeyNotificationType.XSetId)]
+ [InlineData(KeyNotificationTypeFastHash.xtrim.Text, KeyNotificationType.XTrim)]
+ [InlineData(KeyNotificationTypeFastHash.zadd.Text, KeyNotificationType.ZAdd)]
+ [InlineData(KeyNotificationTypeFastHash.zdiffstore.Text, KeyNotificationType.ZDiffStore)]
+ [InlineData(KeyNotificationTypeFastHash.zinterstore.Text, KeyNotificationType.ZInterStore)]
+ [InlineData(KeyNotificationTypeFastHash.zunionstore.Text, KeyNotificationType.ZUnionStore)]
+ [InlineData(KeyNotificationTypeFastHash.zincr.Text, KeyNotificationType.ZIncr)]
+ [InlineData(KeyNotificationTypeFastHash.zrembyrank.Text, KeyNotificationType.ZRemByRank)]
+ [InlineData(KeyNotificationTypeFastHash.zrembyscore.Text, KeyNotificationType.ZRemByScore)]
+ [InlineData(KeyNotificationTypeFastHash.zrem.Text, KeyNotificationType.ZRem)]
+ [InlineData(KeyNotificationTypeFastHash.expired.Text, KeyNotificationType.Expired)]
+ [InlineData(KeyNotificationTypeFastHash.evicted.Text, KeyNotificationType.Evicted)]
+ [InlineData(KeyNotificationTypeFastHash._new.Text, KeyNotificationType.New)]
+ [InlineData(KeyNotificationTypeFastHash.overwritten.Text, KeyNotificationType.Overwritten)]
+ [InlineData(KeyNotificationTypeFastHash.type_changed.Text, KeyNotificationType.TypeChanged)]
+ public unsafe void FastHashParse_AllKnownValues_ParseCorrectly(string raw, KeyNotificationType parsed)
+ {
+ var arr = ArrayPool.Shared.Rent(Encoding.UTF8.GetMaxByteCount(raw.Length));
+ int bytes;
+ fixed (byte* bPtr = arr) // encode into the buffer
+ {
+ fixed (char* cPtr = raw)
+ {
+ bytes = Encoding.UTF8.GetBytes(cPtr, raw.Length, bPtr, arr.Length);
+ }
+ }
+
+ var result = KeyNotificationTypeFastHash.Parse(arr.AsSpan(0, bytes));
+ log.WriteLine($"Parsed '{raw}' as {result}");
+ Assert.Equal(parsed, result);
+
+ // and the other direction:
+ var fetchedBytes = KeyNotificationTypeFastHash.GetRawBytes(parsed);
+ string fetched;
+ fixed (byte* bPtr = fetchedBytes)
+ {
+ fetched = Encoding.UTF8.GetString(bPtr, fetchedBytes.Length);
+ }
+
+ log.WriteLine($"Fetched '{raw}'");
+ Assert.Equal(raw, fetched);
+
+ ArrayPool.Shared.Return(arr);
+ }
+
+ [Fact]
+ public void CreateKeySpaceNotification_Valid()
+ {
+ var channel = RedisChannel.KeySpace("abc", 42);
+ Assert.Equal("__keyspace@42__:abc", channel.ToString());
+ Assert.False(channel.IsMultiNode);
+ Assert.True(channel.IsKeyRouted);
+ Assert.False(channel.IsSharded);
+ Assert.False(channel.IsPattern);
+ }
+
+ [Theory]
+ [InlineData(null, null, "__keyspace@*__:*")]
+ [InlineData("abc*", null, "__keyspace@*__:abc*")]
+ [InlineData(null, 42, "__keyspace@42__:*")]
+ [InlineData("abc*", 42, "__keyspace@42__:abc*")]
+ public void CreateKeySpaceNotificationPattern(string? pattern, int? database, string expected)
+ {
+ var channel = RedisChannel.KeySpacePattern(pattern, database);
+ Assert.Equal(expected, channel.ToString());
+ Assert.True(channel.IsMultiNode);
+ Assert.False(channel.IsKeyRouted);
+ Assert.False(channel.IsSharded);
+ Assert.True(channel.IsPattern);
+ }
+
+ [Theory]
+ [InlineData(KeyNotificationType.Set, null, "__keyevent@*__:set", true)]
+ [InlineData(KeyNotificationType.XGroupCreate, null, "__keyevent@*__:xgroup-create", true)]
+ [InlineData(KeyNotificationType.Set, 42, "__keyevent@42__:set", false)]
+ [InlineData(KeyNotificationType.XGroupCreate, 42, "__keyevent@42__:xgroup-create", false)]
+ public void CreateKeyEventNotification(KeyNotificationType type, int? database, string expected, bool isPattern)
+ {
+ var channel = RedisChannel.KeyEvent(type, database);
+ Assert.Equal(expected, channel.ToString());
+ Assert.True(channel.IsMultiNode);
+ Assert.False(channel.IsKeyRouted);
+ Assert.False(channel.IsSharded);
+ if (isPattern)
+ {
+ Assert.True(channel.IsPattern);
+ }
+ else
+ {
+ Assert.False(channel.IsPattern);
+ }
+ }
+
+ [Fact]
+ public void Cannot_KeyRoute_KeySpace_SingleKeyIsKeyRouted()
+ {
+ var channel = RedisChannel.KeySpace("abc", 42);
+ Assert.False(channel.IsMultiNode);
+ Assert.True(channel.IsKeyRouted);
+ Assert.True(channel.WithKeyRouting().IsKeyRouted); // no change, still key-routed
+ Assert.Equal(RedisCommand.PUBLISH, channel.GetPublishCommand());
+ }
+
+ [Fact]
+ public void Cannot_KeyRoute_KeySpacePattern()
+ {
+ var channel = RedisChannel.KeySpacePattern("abc", 42);
+ Assert.True(channel.IsMultiNode);
+ Assert.False(channel.IsKeyRouted);
+ Assert.StartsWith("Key routing is not supported for multi-node channels", Assert.Throws(() => channel.WithKeyRouting()).Message);
+ Assert.StartsWith("Publishing is not supported for multi-node channels", Assert.Throws(() => channel.GetPublishCommand()).Message);
+ }
+
+ [Fact]
+ public void Cannot_KeyRoute_KeyEvent()
+ {
+ var channel = RedisChannel.KeyEvent(KeyNotificationType.Set, 42);
+ Assert.True(channel.IsMultiNode);
+ Assert.False(channel.IsKeyRouted);
+ Assert.StartsWith("Key routing is not supported for multi-node channels", Assert.Throws(() => channel.WithKeyRouting()).Message);
+ Assert.StartsWith("Publishing is not supported for multi-node channels", Assert.Throws(() => channel.GetPublishCommand()).Message);
+ }
+
+ [Fact]
+ public void Cannot_KeyRoute_KeyEvent_Custom()
+ {
+ var channel = RedisChannel.KeyEvent("foo"u8, 42);
+ Assert.True(channel.IsMultiNode);
+ Assert.False(channel.IsKeyRouted);
+ Assert.StartsWith("Key routing is not supported for multi-node channels", Assert.Throws(() => channel.WithKeyRouting()).Message);
+ Assert.StartsWith("Publishing is not supported for multi-node channels", Assert.Throws(() => channel.GetPublishCommand()).Message);
+ }
+
+ [Fact]
+ public void KeyEventPrefix_KeySpacePrefix_Length_Matches()
+ {
+ // this is a sanity check for the parsing step in KeyNotification.TryParse
+ Assert.Equal(KeyNotificationChannels.KeySpacePrefix.Length, KeyNotificationChannels.KeyEventPrefix.Length);
+ }
+}