diff --git a/StackExchange.Redis.sln.DotSettings b/StackExchange.Redis.sln.DotSettings index 216edbcca..8dd9095d9 100644 --- a/StackExchange.Redis.sln.DotSettings +++ b/StackExchange.Redis.sln.DotSettings @@ -12,9 +12,12 @@ True True True + True True True + True True + True True True True diff --git a/src/StackExchange.Redis/ChannelMessage.cs b/src/StackExchange.Redis/ChannelMessage.cs new file mode 100644 index 000000000..330aedee4 --- /dev/null +++ b/src/StackExchange.Redis/ChannelMessage.cs @@ -0,0 +1,64 @@ +namespace StackExchange.Redis; + +/// +/// Represents a message that is broadcast via publish/subscribe. +/// +public readonly struct ChannelMessage +{ + // this is *smaller* than storing a RedisChannel for the subscribed channel + private readonly ChannelMessageQueue _queue; + + /// + /// The Channel:Message string representation. + /// + public override string ToString() => ((string?)Channel) + ":" + ((string?)Message); + + /// + public override int GetHashCode() => Channel.GetHashCode() ^ Message.GetHashCode(); + + /// + public override bool Equals(object? obj) => obj is ChannelMessage cm + && cm.Channel == Channel && cm.Message == Message; + + internal ChannelMessage(ChannelMessageQueue queue, in RedisChannel channel, in RedisValue value) + { + _queue = queue; + _channel = channel; + _message = value; + } + + /// + /// The channel that the subscription was created from. + /// + public RedisChannel SubscriptionChannel => _queue.Channel; + + private readonly RedisChannel _channel; + + /// + /// The channel that the message was broadcast to. + /// + public RedisChannel Channel => _channel; + + private readonly RedisValue _message; + + /// + /// The value that was broadcast. + /// + public RedisValue Message => _message; + + /// + /// Checks if 2 messages are .Equal(). + /// + public static bool operator ==(ChannelMessage left, ChannelMessage right) => left.Equals(right); + + /// + /// Checks if 2 messages are not .Equal(). + /// + public static bool operator !=(ChannelMessage left, ChannelMessage right) => !left.Equals(right); + + /// + /// If the channel is either a keyspace or keyevent notification, resolve the key and event type. + /// + public bool TryParseKeyNotification(out KeyNotification notification) + => KeyNotification.TryParse(in _channel, in _message, out notification); +} diff --git a/src/StackExchange.Redis/ChannelMessageQueue.cs b/src/StackExchange.Redis/ChannelMessageQueue.cs index e58fb393b..9f962e52a 100644 --- a/src/StackExchange.Redis/ChannelMessageQueue.cs +++ b/src/StackExchange.Redis/ChannelMessageQueue.cs @@ -1,385 +1,353 @@ using System; +using System.Buffers.Text; using System.Collections.Generic; using System.Threading; using System.Threading.Channels; using System.Threading.Tasks; #if NETCOREAPP3_1 +using System.Diagnostics; using System.Reflection; #endif -namespace StackExchange.Redis +namespace StackExchange.Redis; + +/// +/// Represents a message queue of ordered pub/sub notifications. +/// +/// +/// To create a ChannelMessageQueue, use +/// or . +/// +public sealed class ChannelMessageQueue : IAsyncEnumerable { + private readonly Channel _queue; + /// - /// Represents a message that is broadcast via publish/subscribe. + /// The Channel that was subscribed for this queue. /// - public readonly struct ChannelMessage - { - // this is *smaller* than storing a RedisChannel for the subscribed channel - private readonly ChannelMessageQueue _queue; + public RedisChannel Channel { get; } - /// - /// The Channel:Message string representation. - /// - public override string ToString() => ((string?)Channel) + ":" + ((string?)Message); + private RedisSubscriber? _parent; - /// - public override int GetHashCode() => Channel.GetHashCode() ^ Message.GetHashCode(); - - /// - public override bool Equals(object? obj) => obj is ChannelMessage cm - && cm.Channel == Channel && cm.Message == Message; + /// + /// The string representation of this channel. + /// + public override string? ToString() => (string?)Channel; - internal ChannelMessage(ChannelMessageQueue queue, in RedisChannel channel, in RedisValue value) - { - _queue = queue; - Channel = channel; - Message = value; - } + /// + /// An awaitable task the indicates completion of the queue (including drain of data). + /// + public Task Completion => _queue.Reader.Completion; - /// - /// The channel that the subscription was created from. - /// - public RedisChannel SubscriptionChannel => _queue.Channel; - - /// - /// The channel that the message was broadcast to. - /// - public RedisChannel Channel { get; } - - /// - /// The value that was broadcast. - /// - public RedisValue Message { get; } - - /// - /// Checks if 2 messages are .Equal(). - /// - public static bool operator ==(ChannelMessage left, ChannelMessage right) => left.Equals(right); - - /// - /// Checks if 2 messages are not .Equal(). - /// - public static bool operator !=(ChannelMessage left, ChannelMessage right) => !left.Equals(right); + internal ChannelMessageQueue(in RedisChannel redisChannel, RedisSubscriber parent) + { + Channel = redisChannel; + _parent = parent; + _queue = System.Threading.Channels.Channel.CreateUnbounded(s_ChannelOptions); } - /// - /// Represents a message queue of ordered pub/sub notifications. - /// - /// - /// To create a ChannelMessageQueue, use - /// or . - /// - public sealed class ChannelMessageQueue : IAsyncEnumerable + private static readonly UnboundedChannelOptions s_ChannelOptions = new UnboundedChannelOptions { - private readonly Channel _queue; + SingleWriter = true, SingleReader = false, AllowSynchronousContinuations = false, + }; - /// - /// The Channel that was subscribed for this queue. - /// - public RedisChannel Channel { get; } - private RedisSubscriber? _parent; + private void Write(in RedisChannel channel, in RedisValue value) + { + var writer = _queue.Writer; + writer.TryWrite(new ChannelMessage(this, channel, value)); + } - /// - /// The string representation of this channel. - /// - public override string? ToString() => (string?)Channel; + /// + /// Consume a message from the channel. + /// + /// The to use. + public ValueTask ReadAsync(CancellationToken cancellationToken = default) + => _queue.Reader.ReadAsync(cancellationToken); - /// - /// An awaitable task the indicates completion of the queue (including drain of data). - /// - public Task Completion => _queue.Reader.Completion; + /// + /// Attempt to synchronously consume a message from the channel. + /// + /// The read from the Channel. + public bool TryRead(out ChannelMessage item) => _queue.Reader.TryRead(out item); - internal ChannelMessageQueue(in RedisChannel redisChannel, RedisSubscriber parent) + /// + /// Attempt to query the backlog length of the queue. + /// + /// The (approximate) count of items in the Channel. + public bool TryGetCount(out int count) + { + // This is specific to netcoreapp3.1, because full framework was out of band and the new prop is present +#if NETCOREAPP3_1 + // get this using the reflection + try { - Channel = redisChannel; - _parent = parent; - _queue = System.Threading.Channels.Channel.CreateUnbounded(s_ChannelOptions); + var prop = + _queue.GetType().GetProperty("ItemsCountForDebugger", BindingFlags.Instance | BindingFlags.NonPublic); + if (prop is not null) + { + count = (int)prop.GetValue(_queue)!; + return true; + } } - - private static readonly UnboundedChannelOptions s_ChannelOptions = new UnboundedChannelOptions + catch (Exception ex) { - SingleWriter = true, - SingleReader = false, - AllowSynchronousContinuations = false, - }; - - private void Write(in RedisChannel channel, in RedisValue value) + Debug.WriteLine(ex.Message); // but ignore + } +#else + var reader = _queue.Reader; + if (reader.CanCount) { - var writer = _queue.Writer; - writer.TryWrite(new ChannelMessage(this, channel, value)); + count = reader.Count; + return true; } +#endif - /// - /// Consume a message from the channel. - /// - /// The to use. - public ValueTask ReadAsync(CancellationToken cancellationToken = default) - => _queue.Reader.ReadAsync(cancellationToken); - - /// - /// Attempt to synchronously consume a message from the channel. - /// - /// The read from the Channel. - public bool TryRead(out ChannelMessage item) => _queue.Reader.TryRead(out item); - - /// - /// Attempt to query the backlog length of the queue. - /// - /// The (approximate) count of items in the Channel. - public bool TryGetCount(out int count) + count = 0; + return false; + } + + private Delegate? _onMessageHandler; + + private void AssertOnMessage(Delegate handler) + { + if (handler == null) throw new ArgumentNullException(nameof(handler)); + if (Interlocked.CompareExchange(ref _onMessageHandler, handler, null) != null) + throw new InvalidOperationException("Only a single " + nameof(OnMessage) + " is allowed"); + } + + /// + /// Create a message loop that processes messages sequentially. + /// + /// The handler to run when receiving a message. + public void OnMessage(Action handler) + { + AssertOnMessage(handler); + + ThreadPool.QueueUserWorkItem( + state => ((ChannelMessageQueue)state!).OnMessageSyncImpl().RedisFireAndForget(), this); + } + + private async Task OnMessageSyncImpl() + { + var handler = (Action?)_onMessageHandler; + while (!Completion.IsCompleted) { - // This is specific to netcoreapp3.1, because full framework was out of band and the new prop is present -#if NETCOREAPP3_1 - // get this using the reflection + ChannelMessage next; try { - var prop = _queue.GetType().GetProperty("ItemsCountForDebugger", BindingFlags.Instance | BindingFlags.NonPublic); - if (prop is not null) - { - count = (int)prop.GetValue(_queue)!; - return true; - } + if (!TryRead(out next)) next = await ReadAsync().ForAwait(); } - catch { } -#else - var reader = _queue.Reader; - if (reader.CanCount) + catch (ChannelClosedException) { break; } // expected + catch (Exception ex) { - count = reader.Count; - return true; + _parent?.multiplexer?.OnInternalError(ex); + break; } -#endif - count = default; - return false; + try { handler?.Invoke(next); } + catch { } // matches MessageCompletable } + } - private Delegate? _onMessageHandler; - private void AssertOnMessage(Delegate handler) + internal static void Combine(ref ChannelMessageQueue? head, ChannelMessageQueue queue) + { + if (queue != null) { - if (handler == null) throw new ArgumentNullException(nameof(handler)); - if (Interlocked.CompareExchange(ref _onMessageHandler, handler, null) != null) - throw new InvalidOperationException("Only a single " + nameof(OnMessage) + " is allowed"); + // insert at the start of the linked-list + ChannelMessageQueue? old; + do + { + old = Volatile.Read(ref head); + queue._next = old; + } + // format and validator disagree on newline... + while (Interlocked.CompareExchange(ref head, queue, old) != old); } + } - /// - /// Create a message loop that processes messages sequentially. - /// - /// The handler to run when receiving a message. - public void OnMessage(Action handler) - { - AssertOnMessage(handler); + /// + /// Create a message loop that processes messages sequentially. + /// + /// The handler to execute when receiving a message. + public void OnMessage(Func handler) + { + AssertOnMessage(handler); - ThreadPool.QueueUserWorkItem( - state => ((ChannelMessageQueue)state!).OnMessageSyncImpl().RedisFireAndForget(), this); - } + ThreadPool.QueueUserWorkItem( + state => ((ChannelMessageQueue)state!).OnMessageAsyncImpl().RedisFireAndForget(), this); + } - private async Task OnMessageSyncImpl() + internal static void Remove(ref ChannelMessageQueue? head, ChannelMessageQueue queue) + { + if (queue is null) { - var handler = (Action?)_onMessageHandler; - while (!Completion.IsCompleted) - { - ChannelMessage next; - try { if (!TryRead(out next)) next = await ReadAsync().ForAwait(); } - catch (ChannelClosedException) { break; } // expected - catch (Exception ex) - { - _parent?.multiplexer?.OnInternalError(ex); - break; - } - - try { handler?.Invoke(next); } - catch { } // matches MessageCompletable - } + return; } - internal static void Combine(ref ChannelMessageQueue? head, ChannelMessageQueue queue) + bool found; + // if we fail due to a conflict, re-do from start + do { - if (queue != null) + var current = Volatile.Read(ref head); + if (current == null) return; // no queue? nothing to do + if (current == queue) { - // insert at the start of the linked-list - ChannelMessageQueue? old; - do + found = true; + // found at the head - then we need to change the head + if (Interlocked.CompareExchange(ref head, Volatile.Read(ref current._next), current) == current) { - old = Volatile.Read(ref head); - queue._next = old; + return; // success } - while (Interlocked.CompareExchange(ref head, queue, old) != old); } - } - - /// - /// Create a message loop that processes messages sequentially. - /// - /// The handler to execute when receiving a message. - public void OnMessage(Func handler) - { - AssertOnMessage(handler); - - ThreadPool.QueueUserWorkItem( - state => ((ChannelMessageQueue)state!).OnMessageAsyncImpl().RedisFireAndForget(), this); - } - - internal static void Remove(ref ChannelMessageQueue? head, ChannelMessageQueue queue) - { - if (queue is null) + else { - return; - } - - bool found; - // if we fail due to a conflict, re-do from start - do - { - var current = Volatile.Read(ref head); - if (current == null) return; // no queue? nothing to do - if (current == queue) - { - found = true; - // found at the head - then we need to change the head - if (Interlocked.CompareExchange(ref head, Volatile.Read(ref current._next), current) == current) - { - return; // success - } - } - else + ChannelMessageQueue? previous = current; + current = Volatile.Read(ref previous._next); + found = false; + do { - ChannelMessageQueue? previous = current; - current = Volatile.Read(ref previous._next); - found = false; - do + if (current == queue) { - if (current == queue) + found = true; + // found it, not at the head; remove the node + if (Interlocked.CompareExchange( + ref previous._next, + Volatile.Read(ref current._next), + current) == current) { - found = true; - // found it, not at the head; remove the node - if (Interlocked.CompareExchange(ref previous._next, Volatile.Read(ref current._next), current) == current) - { - return; // success - } - else - { - break; // exit the inner loop, and repeat the outer loop - } + return; // success + } + else + { + break; // exit the inner loop, and repeat the outer loop } - previous = current; - current = Volatile.Read(ref previous!._next); } - while (current != null); + + previous = current; + current = Volatile.Read(ref previous!._next); } + // format and validator disagree on newline... + while (current != null); } - while (found); } + // format and validator disagree on newline... + while (found); + } - internal static int Count(ref ChannelMessageQueue? head) + internal static int Count(ref ChannelMessageQueue? head) + { + var current = Volatile.Read(ref head); + int count = 0; + while (current != null) { - var current = Volatile.Read(ref head); - int count = 0; - while (current != null) - { - count++; - current = Volatile.Read(ref current._next); - } - return count; + count++; + current = Volatile.Read(ref current._next); } - internal static void WriteAll(ref ChannelMessageQueue head, in RedisChannel channel, in RedisValue message) + return count; + } + + internal static void WriteAll(ref ChannelMessageQueue head, in RedisChannel channel, in RedisValue message) + { + var current = Volatile.Read(ref head); + while (current != null) { - var current = Volatile.Read(ref head); - while (current != null) - { - current.Write(channel, message); - current = Volatile.Read(ref current._next); - } + current.Write(channel, message); + current = Volatile.Read(ref current._next); } + } - private ChannelMessageQueue? _next; + private ChannelMessageQueue? _next; - private async Task OnMessageAsyncImpl() + private async Task OnMessageAsyncImpl() + { + var handler = (Func?)_onMessageHandler; + while (!Completion.IsCompleted) { - var handler = (Func?)_onMessageHandler; - while (!Completion.IsCompleted) + ChannelMessage next; + try { - ChannelMessage next; - try { if (!TryRead(out next)) next = await ReadAsync().ForAwait(); } - catch (ChannelClosedException) { break; } // expected - catch (Exception ex) - { - _parent?.multiplexer?.OnInternalError(ex); - break; - } - - try - { - var task = handler?.Invoke(next); - if (task != null && task.Status != TaskStatus.RanToCompletion) await task.ForAwait(); - } - catch { } // matches MessageCompletable + if (!TryRead(out next)) next = await ReadAsync().ForAwait(); + } + catch (ChannelClosedException) { break; } // expected + catch (Exception ex) + { + _parent?.multiplexer?.OnInternalError(ex); + break; } - } - internal static void MarkAllCompleted(ref ChannelMessageQueue? head) - { - var current = Interlocked.Exchange(ref head, null); - while (current != null) + try { - current.MarkCompleted(); - current = Volatile.Read(ref current._next); + var task = handler?.Invoke(next); + if (task != null && task.Status != TaskStatus.RanToCompletion) await task.ForAwait(); } + catch { } // matches MessageCompletable } + } - private void MarkCompleted(Exception? error = null) + internal static void MarkAllCompleted(ref ChannelMessageQueue? head) + { + var current = Interlocked.Exchange(ref head, null); + while (current != null) { - _parent = null; - _queue.Writer.TryComplete(error); + current.MarkCompleted(); + current = Volatile.Read(ref current._next); } + } - internal void UnsubscribeImpl(Exception? error = null, CommandFlags flags = CommandFlags.None) - { - var parent = _parent; - _parent = null; - parent?.UnsubscribeAsync(Channel, null, this, flags); - _queue.Writer.TryComplete(error); - } + private void MarkCompleted(Exception? error = null) + { + _parent = null; + _queue.Writer.TryComplete(error); + } - internal async Task UnsubscribeAsyncImpl(Exception? error = null, CommandFlags flags = CommandFlags.None) + internal void UnsubscribeImpl(Exception? error = null, CommandFlags flags = CommandFlags.None) + { + var parent = _parent; + _parent = null; + parent?.UnsubscribeAsync(Channel, null, this, flags); + _queue.Writer.TryComplete(error); + } + + internal async Task UnsubscribeAsyncImpl(Exception? error = null, CommandFlags flags = CommandFlags.None) + { + var parent = _parent; + _parent = null; + if (parent != null) { - var parent = _parent; - _parent = null; - if (parent != null) - { - await parent.UnsubscribeAsync(Channel, null, this, flags).ForAwait(); - } - _queue.Writer.TryComplete(error); + await parent.UnsubscribeAsync(Channel, null, this, flags).ForAwait(); } - /// - /// Stop receiving messages on this channel. - /// - /// The flags to use when unsubscribing. - public void Unsubscribe(CommandFlags flags = CommandFlags.None) => UnsubscribeImpl(null, flags); + _queue.Writer.TryComplete(error); + } - /// - /// Stop receiving messages on this channel. - /// - /// The flags to use when unsubscribing. - public Task UnsubscribeAsync(CommandFlags flags = CommandFlags.None) => UnsubscribeAsyncImpl(null, flags); + /// + /// Stop receiving messages on this channel. + /// + /// The flags to use when unsubscribing. + public void Unsubscribe(CommandFlags flags = CommandFlags.None) => UnsubscribeImpl(null, flags); - /// + /// + /// Stop receiving messages on this channel. + /// + /// The flags to use when unsubscribing. + public Task UnsubscribeAsync(CommandFlags flags = CommandFlags.None) => UnsubscribeAsyncImpl(null, flags); + + /// #if NETCOREAPP3_0_OR_GREATER - public IAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken = default) - => _queue.Reader.ReadAllAsync().GetAsyncEnumerator(cancellationToken); + public IAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken = default) + // ReSharper disable once MethodSupportsCancellation - provided in GetAsyncEnumerator + => _queue.Reader.ReadAllAsync().GetAsyncEnumerator(cancellationToken); #else - public async IAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken = default) + public async IAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken = default) + { + while (await _queue.Reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false)) { - while (await _queue.Reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false)) + while (_queue.Reader.TryRead(out var item)) { - while (_queue.Reader.TryRead(out var item)) - { - yield return item; - } + yield return item; } } -#endif } +#endif } diff --git a/src/StackExchange.Redis/KeyNotification.cs b/src/StackExchange.Redis/KeyNotification.cs new file mode 100644 index 000000000..f563019f3 --- /dev/null +++ b/src/StackExchange.Redis/KeyNotification.cs @@ -0,0 +1,244 @@ +using System; +using System.Buffers.Text; +using System.Diagnostics; +using static StackExchange.Redis.KeyNotificationChannels; +namespace StackExchange.Redis; + +/// +/// Represents keyspace and keyevent notifications. +/// +public readonly struct KeyNotification +{ + /// + /// If the channel is either a keyspace or keyevent notification, parsed the data. + /// + public static bool TryParse(in RedisChannel channel, in RedisValue value, out KeyNotification notification) + { + // validate that it looks reasonable + var span = channel.Span; + + // KeySpaceStart and KeyEventStart are the same size, see KeyEventPrefix_KeySpacePrefix_Length_Matches + if (span.Length >= KeySpacePrefix.Length + MinSuffixBytes) + { + // check that the prefix is valid, i.e. "__keyspace@" or "__keyevent@" + var prefix = span.Slice(0, KeySpacePrefix.Length); + var hash = prefix.Hash64(); + switch (hash) + { + case KeySpacePrefix.Hash when KeySpacePrefix.Is(hash, prefix): + case KeyEventPrefix.Hash when KeyEventPrefix.Is(hash, prefix): + // check that there is *something* non-empty after the prefix, with __: as the suffix (we don't verify *what*) + if (span.Slice(KeySpacePrefix.Length).IndexOf("__:"u8) > 0) + { + notification = new KeyNotification(in channel, in value); + return true; + } + + break; + } + } + + notification = default; + return false; + } + + private const int MinSuffixBytes = 5; // need "0__:x" or similar after prefix + + /// + /// The channel associated with this notification. + /// + public RedisChannel Channel => _channel; + + /// + /// The payload associated with this notification. + /// + public RedisValue Value => _value; + + // effectively we just wrap a channel, but: we've pre-validated that things make sense + private readonly RedisChannel _channel; + private readonly RedisValue _value; + + internal KeyNotification(in RedisChannel channel, in RedisValue value) + { + _channel = channel; + _value = value; + } + + /// + /// The database the key is in. If the database cannot be parsed, -1 is returned. + /// + public int Database + { + get + { + // prevalidated format, so we can just skip past the prefix (except for the default value) + if (_channel.IsNull) return -1; + var span = _channel.Span.Slice(KeySpacePrefix.Length); // also works for KeyEventPrefix + var end = span.IndexOf((byte)'_'); // expecting "__:foo" - we'll just stop at the underscore + if (end <= 0) return -1; + + span = span.Slice(0, end); + return Utf8Parser.TryParse(span, out int database, out var bytes) + && bytes == end ? database : -1; + } + } + + /// + /// The key associated with this event. + /// + /// Note that this will allocate a copy of the key bytes; to avoid allocations, + /// the and APIs can be used. + public RedisKey GetKey() + { + if (IsKeySpace) + { + // then the channel contains the key, and the payload contains the event-type + return ChannelSuffix.ToArray(); // create an isolated copy + } + + if (IsKeyEvent) + { + // then the channel contains the event-type, and the payload contains the key + return (byte[]?)Value; // todo: this could probably side-step + } + + return RedisKey.Null; + } + + /// + /// Get the number of bytes in the key. + /// + public int KeyByteCount + { + get + { + if (IsKeySpace) + { + return ChannelSuffix.Length; + } + + if (IsKeyEvent) + { + return _value.GetByteCount(); + } + + return 0; + } + } + + /// + /// Attempt to copy the bytes from the key to a buffer, returning the number of bytes written. + /// + public bool TryCopyKey(Span destination, out int bytesWritten) + { + if (IsKeySpace) + { + var suffix = ChannelSuffix; + bytesWritten = suffix.Length; // assume success + if (bytesWritten <= destination.Length) + { + suffix.CopyTo(destination); + return true; + } + } + + if (IsKeyEvent) + { + bytesWritten = _value.GetByteCount(); + if (bytesWritten <= destination.Length) + { + var tmp = _value.CopyTo(destination); + Debug.Assert(tmp == bytesWritten); + return true; + } + } + + bytesWritten = 0; + return false; + } + + /// + /// Get the portion of the channel after the "__{keyspace|keyevent}@{db}__:". + /// + private ReadOnlySpan ChannelSuffix + { + get + { + var span = _channel.Span; + var index = span.IndexOf("__:"u8); + return index > 0 ? span.Slice(index + 3) : default; + } + } + + /// + /// The type of notification associated with this event, if it is well-known - otherwise . + /// + /// Unexpected values can be processed manually from the and . + public KeyNotificationType Type + { + get + { + if (IsKeySpace) + { + // then the channel contains the key, and the payload contains the event-type + var count = _value.GetByteCount(); + if (count >= KeyNotificationTypeFastHash.MinBytes & count <= KeyNotificationTypeFastHash.MaxBytes) + { + if (_value.TryGetSpan(out var direct)) + { + return KeyNotificationTypeFastHash.Parse(direct); + } + else + { + Span localCopy = stackalloc byte[KeyNotificationTypeFastHash.MaxBytes]; + return KeyNotificationTypeFastHash.Parse(localCopy.Slice(0, _value.CopyTo(localCopy))); + } + } + } + + if (IsKeyEvent) + { + // then the channel contains the event-type, and the payload contains the key + return KeyNotificationTypeFastHash.Parse(ChannelSuffix); + } + return KeyNotificationType.Unknown; + } + } + + /// + /// Indicates whether this notification originated from a keyspace notification, for example __keyspace@4__:mykey with payload set. + /// + public bool IsKeySpace + { + get + { + var span = _channel.Span; + return span.Length >= KeySpacePrefix.Length + MinSuffixBytes && KeySpacePrefix.Is(span.Hash64(), span.Slice(0, KeySpacePrefix.Length)); + } + } + + /// + /// Indicates whether this notification originated from a keyevent notification, for example __keyevent@4__:set with payload mykey. + /// + public bool IsKeyEvent + { + get + { + var span = _channel.Span; + return span.Length >= KeyEventPrefix.Length + MinSuffixBytes && KeyEventPrefix.Is(span.Hash64(), span.Slice(0, KeyEventPrefix.Length)); + } + } +} + +internal static partial class KeyNotificationChannels +{ + [FastHash("__keyspace@")] + internal static partial class KeySpacePrefix + { + } + + [FastHash("__keyevent@")] + internal static partial class KeyEventPrefix + { + } +} diff --git a/src/StackExchange.Redis/KeyNotificationType.cs b/src/StackExchange.Redis/KeyNotificationType.cs new file mode 100644 index 000000000..cc4c74ef1 --- /dev/null +++ b/src/StackExchange.Redis/KeyNotificationType.cs @@ -0,0 +1,69 @@ +namespace StackExchange.Redis; + +/// +/// The type of keyspace or keyevent notification. +/// +public enum KeyNotificationType +{ + // note: initially presented alphabetically, but: new values *must* be appended, not inserted + // (to preserve values of existing elements) +#pragma warning disable CS1591 // docs, redundant + Unknown = 0, + Append = 1, + Copy = 2, + Del = 3, + Expire = 4, + HDel = 5, + HExpired = 6, + HIncrByFloat = 7, + HIncrBy = 8, + HPersist = 9, + HSet = 10, + IncrByFloat = 11, + IncrBy = 12, + LInsert = 13, + LPop = 14, + LPush = 15, + LRem = 16, + LSet = 17, + LTrim = 18, + MoveFrom = 19, + MoveTo = 20, + Persist = 21, + RenameFrom = 22, + RenameTo = 23, + Restore = 24, + RPop = 25, + RPush = 26, + SAdd = 27, + Set = 28, + SetRange = 29, + SortStore = 30, + SRem = 31, + SPop = 32, + XAdd = 33, + XDel = 34, + XGroupCreateConsumer = 35, + XGroupCreate = 36, + XGroupDelConsumer = 37, + XGroupDestroy = 38, + XGroupSetId = 39, + XSetId = 40, + XTrim = 41, + ZAdd = 42, + ZDiffStore = 43, + ZInterStore = 44, + ZUnionStore = 45, + ZIncr = 46, + ZRemByRank = 47, + ZRemByScore = 48, + ZRem = 49, + + // side-effect notifications + Expired = 1000, + Evicted = 1001, + New = 1002, + Overwritten = 1003, + TypeChanged = 1004, // type_changed +#pragma warning restore CS1591 // docs, redundant +} diff --git a/src/StackExchange.Redis/KeyNotificationTypeFastHash.cs b/src/StackExchange.Redis/KeyNotificationTypeFastHash.cs new file mode 100644 index 000000000..bcf08bad2 --- /dev/null +++ b/src/StackExchange.Redis/KeyNotificationTypeFastHash.cs @@ -0,0 +1,413 @@ +using System; + +namespace StackExchange.Redis; + +/// +/// Internal helper type for fast parsing of key notification types, using [FastHash]. +/// +internal static partial class KeyNotificationTypeFastHash +{ + // these are checked by KeyNotificationTypeFastHash_MinMaxBytes_ReflectsActualLengths + public const int MinBytes = 3, MaxBytes = 21; + + public static KeyNotificationType Parse(ReadOnlySpan value) + { + var hash = value.Hash64(); + return hash switch + { + append.Hash when append.Is(hash, value) => KeyNotificationType.Append, + copy.Hash when copy.Is(hash, value) => KeyNotificationType.Copy, + del.Hash when del.Is(hash, value) => KeyNotificationType.Del, + expire.Hash when expire.Is(hash, value) => KeyNotificationType.Expire, + hdel.Hash when hdel.Is(hash, value) => KeyNotificationType.HDel, + hexpired.Hash when hexpired.Is(hash, value) => KeyNotificationType.HExpired, + hincrbyfloat.Hash when hincrbyfloat.Is(hash, value) => KeyNotificationType.HIncrByFloat, + hincrby.Hash when hincrby.Is(hash, value) => KeyNotificationType.HIncrBy, + hpersist.Hash when hpersist.Is(hash, value) => KeyNotificationType.HPersist, + hset.Hash when hset.Is(hash, value) => KeyNotificationType.HSet, + incrbyfloat.Hash when incrbyfloat.Is(hash, value) => KeyNotificationType.IncrByFloat, + incrby.Hash when incrby.Is(hash, value) => KeyNotificationType.IncrBy, + linsert.Hash when linsert.Is(hash, value) => KeyNotificationType.LInsert, + lpop.Hash when lpop.Is(hash, value) => KeyNotificationType.LPop, + lpush.Hash when lpush.Is(hash, value) => KeyNotificationType.LPush, + lrem.Hash when lrem.Is(hash, value) => KeyNotificationType.LRem, + lset.Hash when lset.Is(hash, value) => KeyNotificationType.LSet, + ltrim.Hash when ltrim.Is(hash, value) => KeyNotificationType.LTrim, + move_from.Hash when move_from.Is(hash, value) => KeyNotificationType.MoveFrom, + move_to.Hash when move_to.Is(hash, value) => KeyNotificationType.MoveTo, + persist.Hash when persist.Is(hash, value) => KeyNotificationType.Persist, + rename_from.Hash when rename_from.Is(hash, value) => KeyNotificationType.RenameFrom, + rename_to.Hash when rename_to.Is(hash, value) => KeyNotificationType.RenameTo, + restore.Hash when restore.Is(hash, value) => KeyNotificationType.Restore, + rpop.Hash when rpop.Is(hash, value) => KeyNotificationType.RPop, + rpush.Hash when rpush.Is(hash, value) => KeyNotificationType.RPush, + sadd.Hash when sadd.Is(hash, value) => KeyNotificationType.SAdd, + set.Hash when set.Is(hash, value) => KeyNotificationType.Set, + setrange.Hash when setrange.Is(hash, value) => KeyNotificationType.SetRange, + sortstore.Hash when sortstore.Is(hash, value) => KeyNotificationType.SortStore, + srem.Hash when srem.Is(hash, value) => KeyNotificationType.SRem, + spop.Hash when spop.Is(hash, value) => KeyNotificationType.SPop, + xadd.Hash when xadd.Is(hash, value) => KeyNotificationType.XAdd, + xdel.Hash when xdel.Is(hash, value) => KeyNotificationType.XDel, + xgroup_createconsumer.Hash when xgroup_createconsumer.Is(hash, value) => KeyNotificationType.XGroupCreateConsumer, + xgroup_create.Hash when xgroup_create.Is(hash, value) => KeyNotificationType.XGroupCreate, + xgroup_delconsumer.Hash when xgroup_delconsumer.Is(hash, value) => KeyNotificationType.XGroupDelConsumer, + xgroup_destroy.Hash when xgroup_destroy.Is(hash, value) => KeyNotificationType.XGroupDestroy, + xgroup_setid.Hash when xgroup_setid.Is(hash, value) => KeyNotificationType.XGroupSetId, + xsetid.Hash when xsetid.Is(hash, value) => KeyNotificationType.XSetId, + xtrim.Hash when xtrim.Is(hash, value) => KeyNotificationType.XTrim, + zadd.Hash when zadd.Is(hash, value) => KeyNotificationType.ZAdd, + zdiffstore.Hash when zdiffstore.Is(hash, value) => KeyNotificationType.ZDiffStore, + zinterstore.Hash when zinterstore.Is(hash, value) => KeyNotificationType.ZInterStore, + zunionstore.Hash when zunionstore.Is(hash, value) => KeyNotificationType.ZUnionStore, + zincr.Hash when zincr.Is(hash, value) => KeyNotificationType.ZIncr, + zrembyrank.Hash when zrembyrank.Is(hash, value) => KeyNotificationType.ZRemByRank, + zrembyscore.Hash when zrembyscore.Is(hash, value) => KeyNotificationType.ZRemByScore, + zrem.Hash when zrem.Is(hash, value) => KeyNotificationType.ZRem, + expired.Hash when expired.Is(hash, value) => KeyNotificationType.Expired, + evicted.Hash when evicted.Is(hash, value) => KeyNotificationType.Evicted, + _new.Hash when _new.Is(hash, value) => KeyNotificationType.New, + overwritten.Hash when overwritten.Is(hash, value) => KeyNotificationType.Overwritten, + type_changed.Hash when type_changed.Is(hash, value) => KeyNotificationType.TypeChanged, + _ => KeyNotificationType.Unknown, + }; + } + + internal static ReadOnlySpan GetRawBytes(KeyNotificationType type) + { + return type switch + { + KeyNotificationType.Append => append.U8, + KeyNotificationType.Copy => copy.U8, + KeyNotificationType.Del => del.U8, + KeyNotificationType.Expire => expire.U8, + KeyNotificationType.HDel => hdel.U8, + KeyNotificationType.HExpired => hexpired.U8, + KeyNotificationType.HIncrByFloat => hincrbyfloat.U8, + KeyNotificationType.HIncrBy => hincrby.U8, + KeyNotificationType.HPersist => hpersist.U8, + KeyNotificationType.HSet => hset.U8, + KeyNotificationType.IncrByFloat => incrbyfloat.U8, + KeyNotificationType.IncrBy => incrby.U8, + KeyNotificationType.LInsert => linsert.U8, + KeyNotificationType.LPop => lpop.U8, + KeyNotificationType.LPush => lpush.U8, + KeyNotificationType.LRem => lrem.U8, + KeyNotificationType.LSet => lset.U8, + KeyNotificationType.LTrim => ltrim.U8, + KeyNotificationType.MoveFrom => move_from.U8, + KeyNotificationType.MoveTo => move_to.U8, + KeyNotificationType.Persist => persist.U8, + KeyNotificationType.RenameFrom => rename_from.U8, + KeyNotificationType.RenameTo => rename_to.U8, + KeyNotificationType.Restore => restore.U8, + KeyNotificationType.RPop => rpop.U8, + KeyNotificationType.RPush => rpush.U8, + KeyNotificationType.SAdd => sadd.U8, + KeyNotificationType.Set => set.U8, + KeyNotificationType.SetRange => setrange.U8, + KeyNotificationType.SortStore => sortstore.U8, + KeyNotificationType.SRem => srem.U8, + KeyNotificationType.SPop => spop.U8, + KeyNotificationType.XAdd => xadd.U8, + KeyNotificationType.XDel => xdel.U8, + KeyNotificationType.XGroupCreateConsumer => xgroup_createconsumer.U8, + KeyNotificationType.XGroupCreate => xgroup_create.U8, + KeyNotificationType.XGroupDelConsumer => xgroup_delconsumer.U8, + KeyNotificationType.XGroupDestroy => xgroup_destroy.U8, + KeyNotificationType.XGroupSetId => xgroup_setid.U8, + KeyNotificationType.XSetId => xsetid.U8, + KeyNotificationType.XTrim => xtrim.U8, + KeyNotificationType.ZAdd => zadd.U8, + KeyNotificationType.ZDiffStore => zdiffstore.U8, + KeyNotificationType.ZInterStore => zinterstore.U8, + KeyNotificationType.ZUnionStore => zunionstore.U8, + KeyNotificationType.ZIncr => zincr.U8, + KeyNotificationType.ZRemByRank => zrembyrank.U8, + KeyNotificationType.ZRemByScore => zrembyscore.U8, + KeyNotificationType.ZRem => zrem.U8, + KeyNotificationType.Expired => expired.U8, + KeyNotificationType.Evicted => evicted.U8, + KeyNotificationType.New => _new.U8, + KeyNotificationType.Overwritten => overwritten.U8, + KeyNotificationType.TypeChanged => type_changed.U8, + _ => Throw(), + }; + static ReadOnlySpan Throw() => throw new ArgumentOutOfRangeException(nameof(type)); + } + +#pragma warning disable SA1300, CS8981 + // ReSharper disable InconsistentNaming + [FastHash] + internal static partial class append + { + } + + [FastHash] + internal static partial class copy + { + } + + [FastHash] + internal static partial class del + { + } + + [FastHash] + internal static partial class expire + { + } + + [FastHash] + internal static partial class hdel + { + } + + [FastHash] + internal static partial class hexpired + { + } + + [FastHash] + internal static partial class hincrbyfloat + { + } + + [FastHash] + internal static partial class hincrby + { + } + + [FastHash] + internal static partial class hpersist + { + } + + [FastHash] + internal static partial class hset + { + } + + [FastHash] + internal static partial class incrbyfloat + { + } + + [FastHash] + internal static partial class incrby + { + } + + [FastHash] + internal static partial class linsert + { + } + + [FastHash] + internal static partial class lpop + { + } + + [FastHash] + internal static partial class lpush + { + } + + [FastHash] + internal static partial class lrem + { + } + + [FastHash] + internal static partial class lset + { + } + + [FastHash] + internal static partial class ltrim + { + } + + [FastHash("move_from")] // by default, the generator interprets underscore as hyphen + internal static partial class move_from + { + } + + [FastHash("move_to")] // by default, the generator interprets underscore as hyphen + internal static partial class move_to + { + } + + [FastHash] + internal static partial class persist + { + } + + [FastHash("rename_from")] // by default, the generator interprets underscore as hyphen + internal static partial class rename_from + { + } + + [FastHash("rename_to")] // by default, the generator interprets underscore as hyphen + internal static partial class rename_to + { + } + + [FastHash] + internal static partial class restore + { + } + + [FastHash] + internal static partial class rpop + { + } + + [FastHash] + internal static partial class rpush + { + } + + [FastHash] + internal static partial class sadd + { + } + + [FastHash] + internal static partial class set + { + } + + [FastHash] + internal static partial class setrange + { + } + + [FastHash] + internal static partial class sortstore + { + } + + [FastHash] + internal static partial class srem + { + } + + [FastHash] + internal static partial class spop + { + } + + [FastHash] + internal static partial class xadd + { + } + + [FastHash] + internal static partial class xdel + { + } + + [FastHash] // note: becomes hyphenated + internal static partial class xgroup_createconsumer + { + } + + [FastHash] // note: becomes hyphenated + internal static partial class xgroup_create + { + } + + [FastHash] // note: becomes hyphenated + internal static partial class xgroup_delconsumer + { + } + + [FastHash] // note: becomes hyphenated + internal static partial class xgroup_destroy + { + } + + [FastHash] // note: becomes hyphenated + internal static partial class xgroup_setid + { + } + + [FastHash] + internal static partial class xsetid + { + } + + [FastHash] + internal static partial class xtrim + { + } + + [FastHash] + internal static partial class zadd + { + } + + [FastHash] + internal static partial class zdiffstore + { + } + + [FastHash] + internal static partial class zinterstore + { + } + + [FastHash] + internal static partial class zunionstore + { + } + + [FastHash] + internal static partial class zincr + { + } + + [FastHash] + internal static partial class zrembyrank + { + } + + [FastHash] + internal static partial class zrembyscore + { + } + + [FastHash] + internal static partial class zrem + { + } + + [FastHash] + internal static partial class expired + { + } + + [FastHash] + internal static partial class evicted + { + } + + [FastHash("new")] + internal static partial class _new // it isn't worth making the code-gen keyword aware + { + } + + [FastHash] + internal static partial class overwritten + { + } + + [FastHash("type_changed")] // by default, the generator interprets underscore as hyphen + internal static partial class type_changed + { + } + + // ReSharper restore InconsistentNaming +#pragma warning restore SA1300, CS8981 +} diff --git a/src/StackExchange.Redis/PublicAPI/PublicAPI.Unshipped.txt b/src/StackExchange.Redis/PublicAPI/PublicAPI.Unshipped.txt index 91b0e1a43..871fe71f0 100644 --- a/src/StackExchange.Redis/PublicAPI/PublicAPI.Unshipped.txt +++ b/src/StackExchange.Redis/PublicAPI/PublicAPI.Unshipped.txt @@ -1 +1,74 @@ -#nullable enable \ No newline at end of file +#nullable enable +StackExchange.Redis.KeyNotification +StackExchange.Redis.KeyNotification.Channel.get -> StackExchange.Redis.RedisChannel +StackExchange.Redis.KeyNotification.Value.get -> StackExchange.Redis.RedisValue +StackExchange.Redis.KeyNotification.Database.get -> int +StackExchange.Redis.KeyNotification.GetKey() -> StackExchange.Redis.RedisKey +StackExchange.Redis.KeyNotification.IsKeyEvent.get -> bool +StackExchange.Redis.KeyNotification.IsKeySpace.get -> bool +StackExchange.Redis.KeyNotification.KeyByteCount.get -> int +StackExchange.Redis.KeyNotification.KeyNotification() -> void +StackExchange.Redis.KeyNotification.TryCopyKey(System.Span destination, out int bytesWritten) -> bool +StackExchange.Redis.KeyNotification.Type.get -> StackExchange.Redis.KeyNotificationType +static StackExchange.Redis.KeyNotification.TryParse(in StackExchange.Redis.RedisChannel channel, in StackExchange.Redis.RedisValue value, out StackExchange.Redis.KeyNotification notification) -> bool +StackExchange.Redis.ChannelMessage.TryParseKeyNotification(out StackExchange.Redis.KeyNotification notification) -> bool +static StackExchange.Redis.RedisChannel.KeyEvent(StackExchange.Redis.KeyNotificationType type, int? database = null) -> StackExchange.Redis.RedisChannel +static StackExchange.Redis.RedisChannel.KeyEvent(System.ReadOnlySpan type, int? database) -> StackExchange.Redis.RedisChannel +static StackExchange.Redis.RedisChannel.KeySpace(in StackExchange.Redis.RedisKey key, int database) -> StackExchange.Redis.RedisChannel +static StackExchange.Redis.RedisChannel.KeySpacePattern(in StackExchange.Redis.RedisKey pattern, int? database = null) -> StackExchange.Redis.RedisChannel +StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.Append = 1 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.Copy = 2 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.Del = 3 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.Evicted = 1001 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.Expire = 4 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.Expired = 1000 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.HDel = 5 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.HExpired = 6 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.HIncrBy = 8 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.HIncrByFloat = 7 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.HPersist = 9 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.HSet = 10 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.IncrBy = 12 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.IncrByFloat = 11 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.LInsert = 13 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.LPop = 14 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.LPush = 15 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.LRem = 16 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.LSet = 17 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.LTrim = 18 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.MoveFrom = 19 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.MoveTo = 20 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.New = 1002 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.Overwritten = 1003 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.Persist = 21 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.RenameFrom = 22 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.RenameTo = 23 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.Restore = 24 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.RPop = 25 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.RPush = 26 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.SAdd = 27 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.Set = 28 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.SetRange = 29 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.SortStore = 30 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.SPop = 32 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.SRem = 31 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.TypeChanged = 1004 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.Unknown = 0 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.XAdd = 33 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.XDel = 34 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.XGroupCreate = 36 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.XGroupCreateConsumer = 35 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.XGroupDelConsumer = 37 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.XGroupDestroy = 38 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.XGroupSetId = 39 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.XSetId = 40 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.XTrim = 41 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.ZAdd = 42 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.ZDiffStore = 43 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.ZIncr = 46 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.ZInterStore = 44 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.ZRem = 49 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.ZRemByRank = 47 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.ZRemByScore = 48 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.ZUnionStore = 45 -> StackExchange.Redis.KeyNotificationType diff --git a/src/StackExchange.Redis/RedisChannel.cs b/src/StackExchange.Redis/RedisChannel.cs index d4289f3c6..7b208bb9d 100644 --- a/src/StackExchange.Redis/RedisChannel.cs +++ b/src/StackExchange.Redis/RedisChannel.cs @@ -1,4 +1,5 @@ using System; +using System.Diagnostics; using System.Text; namespace StackExchange.Redis @@ -10,6 +11,8 @@ namespace StackExchange.Redis { internal readonly byte[]? Value; + internal ReadOnlySpan Span => Value is null ? default : Value.AsSpan(); + internal readonly RedisChannelOptions Options; [Flags] @@ -19,19 +22,36 @@ internal enum RedisChannelOptions Pattern = 1 << 0, Sharded = 1 << 1, KeyRouted = 1 << 2, + MultiNode = 1 << 3, } // we don't consider Routed for equality - it's an implementation detail, not a fundamental feature - private const RedisChannelOptions EqualityMask = ~RedisChannelOptions.KeyRouted; + private const RedisChannelOptions EqualityMask = + ~(RedisChannelOptions.KeyRouted | RedisChannelOptions.MultiNode); - internal RedisCommand PublishCommand => IsSharded ? RedisCommand.SPUBLISH : RedisCommand.PUBLISH; + internal RedisCommand GetPublishCommand() + { + return (Options & (RedisChannelOptions.Sharded | RedisChannelOptions.MultiNode)) switch + { + RedisChannelOptions.None => RedisCommand.PUBLISH, + RedisChannelOptions.Sharded => RedisCommand.SPUBLISH, + _ => ThrowKeyRouted(), + }; + + static RedisCommand ThrowKeyRouted() => throw new InvalidOperationException("Publishing is not supported for multi-node channels"); + } /// - /// Should we use cluster routing for this channel? This applies *either* to sharded (SPUBLISH) scenarios, + /// Should we use cluster routing for this channel? This applies *either* to sharded (SPUBLISH) scenarios, /// or to scenarios using . /// internal bool IsKeyRouted => (Options & RedisChannelOptions.KeyRouted) != 0; + /// + /// Should this channel be subscribed to on all nodes? This is only relevant for cluster scenarios and keyspace notifications. + /// + internal bool IsMultiNode => (Options & RedisChannelOptions.MultiNode) != 0; + /// /// Indicates whether the channel-name is either null or a zero-length value. /// @@ -58,6 +78,7 @@ public static bool UseImplicitAutoPattern get => s_DefaultPatternMode == PatternMode.Auto; set => s_DefaultPatternMode = value ? PatternMode.Auto : PatternMode.Literal; } + private static PatternMode s_DefaultPatternMode = PatternMode.Auto; /// @@ -82,7 +103,13 @@ public static bool UseImplicitAutoPattern /// a consideration. /// /// Note that channels from Sharded are always routed. - public RedisChannel WithKeyRouting() => new(Value, Options | RedisChannelOptions.KeyRouted); + public RedisChannel WithKeyRouting() + { + if (IsMultiNode) Throw(); + return new(Value, Options | RedisChannelOptions.KeyRouted); + + static void Throw() => throw new InvalidOperationException("Key routing is not supported for multi-node channels"); + } /// /// Creates a new that acts as a wildcard subscription. In cluster @@ -105,7 +132,8 @@ public static bool UseImplicitAutoPattern /// /// The name of the channel to create. /// The mode for name matching. - public RedisChannel(byte[]? value, PatternMode mode) : this(value, DeterminePatternBased(value, mode) ? RedisChannelOptions.Pattern : RedisChannelOptions.None) + public RedisChannel(byte[]? value, PatternMode mode) : this( + value, DeterminePatternBased(value, mode) ? RedisChannelOptions.Pattern : RedisChannelOptions.None) { } @@ -115,7 +143,9 @@ public RedisChannel(byte[]? value, PatternMode mode) : this(value, DeterminePatt /// The string name of the channel to create. /// The mode for name matching. // ReSharper disable once ConditionIsAlwaysTrueOrFalseAccordingToNullableAPIContract - public RedisChannel(string value, PatternMode mode) : this(value is null ? null : Encoding.UTF8.GetBytes(value), mode) + public RedisChannel(string value, PatternMode mode) : this( + // ReSharper disable once ConditionIsAlwaysTrueOrFalseAccordingToNullableAPIContract + value is null ? null : Encoding.UTF8.GetBytes(value), mode) { } @@ -128,7 +158,8 @@ public RedisChannel(string value, PatternMode mode) : this(value is null ? null /// The name of the channel to create. /// Note that sharded subscriptions are completely separate to regular subscriptions; subscriptions /// using sharded channels must also be published with sharded channels (and vice versa). - public static RedisChannel Sharded(byte[]? value) => new(value, RedisChannelOptions.Sharded | RedisChannelOptions.KeyRouted); + public static RedisChannel Sharded(byte[]? value) => + new(value, RedisChannelOptions.Sharded | RedisChannelOptions.KeyRouted); /// /// Create a new redis channel from a string, representing a sharded channel. In cluster @@ -139,7 +170,112 @@ public RedisChannel(string value, PatternMode mode) : this(value is null ? null /// The string name of the channel to create. /// Note that sharded subscriptions are completely separate to regular subscriptions; subscriptions /// using sharded channels must also be published with sharded channels (and vice versa). - public static RedisChannel Sharded(string value) => new(value, RedisChannelOptions.Sharded | RedisChannelOptions.KeyRouted); + public static RedisChannel Sharded(string value) => + new(value, RedisChannelOptions.Sharded | RedisChannelOptions.KeyRouted); + + /// + /// Create a key-notification channel for a single key in a single database. + /// + public static RedisChannel KeySpace(in RedisKey key, int database) + => BuildKeySpace(key, database, RedisChannelOptions.KeyRouted); + + /// + /// Create a key-notification channel for a pattern, optionally in a specified database. + /// + public static RedisChannel KeySpacePattern(in RedisKey pattern, int? database = null) + => BuildKeySpace(pattern, database, RedisChannelOptions.Pattern | RedisChannelOptions.MultiNode); + + private const int DatabaseScratchBufferSize = 16; // largest non-negative int32 is 10 digits + + private static ReadOnlySpan AppendDatabase(Span target, int? database, RedisChannelOptions options) + { + if (database is null) + { + if ((options & RedisChannelOptions.Pattern) == 0) throw new ArgumentNullException(nameof(database)); + return "*"u8; // don't worry about the inbound scratch buffer, this is fine + } + else + { + var db32 = database.GetValueOrDefault(); + if (db32 == 0) return "0"u8; // so common, we might as well special case + if (db32 < 0) throw new ArgumentOutOfRangeException(nameof(database)); + return target.Slice(0, Format.FormatInt32(db32, target)); + } + } + + /// + /// Create an event-notification channel for a given event type, optionally in a specified database. + /// +#pragma warning disable RS0027 + public static RedisChannel KeyEvent(KeyNotificationType type, int? database = null) +#pragma warning restore RS0027 + => KeyEvent(KeyNotificationTypeFastHash.GetRawBytes(type), database); + + /// + /// Create an event-notification channel for a given event type, optionally in a specified database. + /// + /// This API is intended for use with custom/unknown event types; for well-known types, use . + public static RedisChannel KeyEvent(ReadOnlySpan type, int? database) + { + if (type.IsEmpty) throw new ArgumentNullException(nameof(type)); + + RedisChannelOptions options = RedisChannelOptions.MultiNode; + if (database is null) options |= RedisChannelOptions.Pattern; + var db = AppendDatabase(stackalloc byte[DatabaseScratchBufferSize], database, options); + + // __keyevent@{db}__:{type} + var arr = new byte[14 + db.Length + type.Length]; + + var target = AppendAndAdvance(arr.AsSpan(), "__keyevent@"u8); + target = AppendAndAdvance(target, db); + target = AppendAndAdvance(target, "__:"u8); + target = AppendAndAdvance(target, type); + Debug.Assert(target.IsEmpty); // should have calculated length correctly + + return new RedisChannel(arr, options); + } + + private static Span AppendAndAdvance(Span target, scoped ReadOnlySpan value) + { + value.CopyTo(target); + return target.Slice(value.Length); + } + + private static RedisChannel BuildKeySpace(in RedisKey key, int? database, RedisChannelOptions options) + { + int keyLen; + if (key.IsNull) + { + if ((options & RedisChannelOptions.Pattern) == 0) throw new ArgumentNullException(nameof(key)); + keyLen = 1; + } + else + { + keyLen = key.TotalLength(); + if (keyLen == 0) throw new ArgumentOutOfRangeException(nameof(key)); + } + + var db = AppendDatabase(stackalloc byte[DatabaseScratchBufferSize], database, options); + + // __keyspace@{db}__:{key} + var arr = new byte[14 + db.Length + keyLen]; + + var target = AppendAndAdvance(arr.AsSpan(), "__keyspace@"u8); + target = AppendAndAdvance(target, db); + target = AppendAndAdvance(target, "__:"u8); + Debug.Assert(keyLen == target.Length); // should have exactly "len" bytes remaining + if (key.IsNull) + { + target[0] = (byte)'*'; + target = target.Slice(1); + } + else + { + target = target.Slice(key.CopyTo(target)); + } + Debug.Assert(target.IsEmpty); // should have calculated length correctly + return new RedisChannel(arr, options); + } internal RedisChannel(byte[]? value, RedisChannelOptions options) { @@ -351,7 +487,7 @@ public static implicit operator RedisChannel(byte[]? key) { return Encoding.UTF8.GetString(arr); } - catch (Exception e) when // Only catch exception throwed by Encoding.UTF8.GetString + catch (Exception e) when // Only catch exception thrown by Encoding.UTF8.GetString (e is DecoderFallbackException or ArgumentException or ArgumentNullException) { return BitConverter.ToString(arr); diff --git a/src/StackExchange.Redis/RedisDatabase.cs b/src/StackExchange.Redis/RedisDatabase.cs index f13571c4e..d8b62e2b9 100644 --- a/src/StackExchange.Redis/RedisDatabase.cs +++ b/src/StackExchange.Redis/RedisDatabase.cs @@ -1901,7 +1901,7 @@ public Task StringLongestCommonSubsequenceWithMatchesAsync(Redis public long Publish(RedisChannel channel, RedisValue message, CommandFlags flags = CommandFlags.None) { if (channel.IsNullOrEmpty) throw new ArgumentNullException(nameof(channel)); - var msg = Message.Create(-1, flags, channel.PublishCommand, channel, message); + var msg = Message.Create(-1, flags, channel.GetPublishCommand(), channel, message); // if we're actively subscribed: send via that connection (otherwise, follow normal rules) return ExecuteSync(msg, ResultProcessor.Int64, server: multiplexer.GetSubscribedServer(channel)); } @@ -1909,7 +1909,7 @@ public long Publish(RedisChannel channel, RedisValue message, CommandFlags flags public Task PublishAsync(RedisChannel channel, RedisValue message, CommandFlags flags = CommandFlags.None) { if (channel.IsNullOrEmpty) throw new ArgumentNullException(nameof(channel)); - var msg = Message.Create(-1, flags, channel.PublishCommand, channel, message); + var msg = Message.Create(-1, flags, channel.GetPublishCommand(), channel, message); // if we're actively subscribed: send via that connection (otherwise, follow normal rules) return ExecuteAsync(msg, ResultProcessor.Int64, server: multiplexer.GetSubscribedServer(channel)); } diff --git a/src/StackExchange.Redis/RedisSubscriber.cs b/src/StackExchange.Redis/RedisSubscriber.cs index 9ade78c2d..5d083164b 100644 --- a/src/StackExchange.Redis/RedisSubscriber.cs +++ b/src/StackExchange.Redis/RedisSubscriber.cs @@ -393,7 +393,7 @@ private static void ThrowIfNull(in RedisChannel channel) public long Publish(RedisChannel channel, RedisValue message, CommandFlags flags = CommandFlags.None) { ThrowIfNull(channel); - var msg = Message.Create(-1, flags, channel.PublishCommand, channel, message); + var msg = Message.Create(-1, flags, channel.GetPublishCommand(), channel, message); // if we're actively subscribed: send via that connection (otherwise, follow normal rules) return ExecuteSync(msg, ResultProcessor.Int64, server: multiplexer.GetSubscribedServer(channel)); } @@ -401,7 +401,7 @@ public long Publish(RedisChannel channel, RedisValue message, CommandFlags flags public Task PublishAsync(RedisChannel channel, RedisValue message, CommandFlags flags = CommandFlags.None) { ThrowIfNull(channel); - var msg = Message.Create(-1, flags, channel.PublishCommand, channel, message); + var msg = Message.Create(-1, flags, channel.GetPublishCommand(), channel, message); // if we're actively subscribed: send via that connection (otherwise, follow normal rules) return ExecuteAsync(msg, ResultProcessor.Int64, server: multiplexer.GetSubscribedServer(channel)); } diff --git a/src/StackExchange.Redis/RedisValue.cs b/src/StackExchange.Redis/RedisValue.cs index d306ca0d0..1f6947460 100644 --- a/src/StackExchange.Redis/RedisValue.cs +++ b/src/StackExchange.Redis/RedisValue.cs @@ -1245,5 +1245,16 @@ internal ValueCondition Digest() return digest; } } + + internal bool TryGetSpan(out ReadOnlySpan span) + { + if (_objectOrSentinel == Sentinel_Raw) + { + span = _memory.Span; + return true; + } + span = default; + return false; + } } } diff --git a/tests/StackExchange.Redis.Tests/FastHashTests.cs b/tests/StackExchange.Redis.Tests/FastHashTests.cs index 418198cfd..a032cfc80 100644 --- a/tests/StackExchange.Redis.Tests/FastHashTests.cs +++ b/tests/StackExchange.Redis.Tests/FastHashTests.cs @@ -2,13 +2,14 @@ using System.Runtime.InteropServices; using System.Text; using Xunit; +using Xunit.Sdk; #pragma warning disable CS8981, SA1134, SA1300, SA1303, SA1502 // names are weird in this test! // ReSharper disable InconsistentNaming - to better represent expected literals // ReSharper disable IdentifierTypo namespace StackExchange.Redis.Tests; -public partial class FastHashTests +public partial class FastHashTests(ITestOutputHelper log) { // note: if the hashing algorithm changes, we can update the last parameter freely; it doesn't matter // what it *is* - what matters is that we can see that it has entropy between different values @@ -83,6 +84,46 @@ public void FastHashIs_Long() Assert.False(abcdefghijklmnopqrst.Is(hash, value)); } + [Fact] + public void KeyNotificationTypeFastHash_MinMaxBytes_ReflectsActualLengths() + { + // Use reflection to find all nested types in KeyNotificationTypeFastHash + var fastHashType = typeof(KeyNotificationTypeFastHash); + var nestedTypes = fastHashType.GetNestedTypes(System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Static); + + int? minLength = null; + int? maxLength = null; + + foreach (var nestedType in nestedTypes) + { + // Look for the Length field (generated by FastHash source generator) + var lengthField = nestedType.GetField("Length", System.Reflection.BindingFlags.Public | System.Reflection.BindingFlags.Static); + if (lengthField != null && lengthField.FieldType == typeof(int)) + { + var length = (int)lengthField.GetValue(null)!; + + if (minLength == null || length < minLength) + { + minLength = length; + } + + if (maxLength == null || length > maxLength) + { + maxLength = length; + } + } + } + + // Assert that we found at least some nested types with Length fields + Assert.NotNull(minLength); + Assert.NotNull(maxLength); + + // Assert that MinBytes and MaxBytes match the actual min/max lengths + log.WriteLine($"MinBytes: {KeyNotificationTypeFastHash.MinBytes}, MaxBytes: {KeyNotificationTypeFastHash.MaxBytes}"); + Assert.Equal(KeyNotificationTypeFastHash.MinBytes, minLength.Value); + Assert.Equal(KeyNotificationTypeFastHash.MaxBytes, maxLength.Value); + } + [FastHash] private static partial class a { } [FastHash] private static partial class ab { } [FastHash] private static partial class abc { } diff --git a/tests/StackExchange.Redis.Tests/KeyNotificationTests.cs b/tests/StackExchange.Redis.Tests/KeyNotificationTests.cs new file mode 100644 index 000000000..b9548eb44 --- /dev/null +++ b/tests/StackExchange.Redis.Tests/KeyNotificationTests.cs @@ -0,0 +1,500 @@ +using System; +using System.Buffers; +using System.Text; +using Xunit; + +namespace StackExchange.Redis.Tests; + +public class KeyNotificationTests(ITestOutputHelper log) +{ + [Fact] + public void Keyspace_Del_ParsesCorrectly() + { + // __keyspace@1__:mykey with payload "del" + var channel = RedisChannel.Literal("__keyspace@1__:mykey"); + RedisValue value = "del"; + + Assert.True(KeyNotification.TryParse(in channel, in value, out var notification)); + + Assert.True(notification.IsKeySpace); + Assert.False(notification.IsKeyEvent); + Assert.Equal(1, notification.Database); + Assert.Equal(KeyNotificationType.Del, notification.Type); + Assert.Equal("mykey", (string?)notification.GetKey()); + Assert.Equal(5, notification.KeyByteCount); + } + + [Fact] + public void Keyevent_Del_ParsesCorrectly() + { + // __keyevent@42__:del with value "mykey" + var channel = RedisChannel.Literal("__keyevent@42__:del"); + RedisValue value = "mykey"; + + Assert.True(KeyNotification.TryParse(in channel, in value, out var notification)); + + Assert.False(notification.IsKeySpace); + Assert.True(notification.IsKeyEvent); + Assert.Equal(42, notification.Database); + Assert.Equal(KeyNotificationType.Del, notification.Type); + Assert.Equal("mykey", (string?)notification.GetKey()); + Assert.Equal(5, notification.KeyByteCount); + } + + [Fact] + public void Keyspace_Set_ParsesCorrectly() + { + var channel = RedisChannel.Literal("__keyspace@0__:testkey"); + RedisValue value = "set"; + + Assert.True(KeyNotification.TryParse(in channel, in value, out var notification)); + + Assert.True(notification.IsKeySpace); + Assert.Equal(0, notification.Database); + Assert.Equal(KeyNotificationType.Set, notification.Type); + Assert.Equal("testkey", (string?)notification.GetKey()); + } + + [Fact] + public void Keyevent_Expire_ParsesCorrectly() + { + var channel = RedisChannel.Literal("__keyevent@5__:expire"); + RedisValue value = "session:12345"; + + Assert.True(KeyNotification.TryParse(in channel, in value, out var notification)); + + Assert.True(notification.IsKeyEvent); + Assert.Equal(5, notification.Database); + Assert.Equal(KeyNotificationType.Expire, notification.Type); + Assert.Equal("session:12345", (string?)notification.GetKey()); + } + + [Fact] + public void Keyspace_Expired_ParsesCorrectly() + { + var channel = RedisChannel.Literal("__keyspace@3__:cache:item"); + RedisValue value = "expired"; + + Assert.True(KeyNotification.TryParse(in channel, in value, out var notification)); + + Assert.True(notification.IsKeySpace); + Assert.Equal(3, notification.Database); + Assert.Equal(KeyNotificationType.Expired, notification.Type); + Assert.Equal("cache:item", (string?)notification.GetKey()); + } + + [Fact] + public void Keyevent_LPush_ParsesCorrectly() + { + var channel = RedisChannel.Literal("__keyevent@0__:lpush"); + RedisValue value = "queue:tasks"; + + Assert.True(KeyNotification.TryParse(in channel, in value, out var notification)); + + Assert.True(notification.IsKeyEvent); + Assert.Equal(0, notification.Database); + Assert.Equal(KeyNotificationType.LPush, notification.Type); + Assert.Equal("queue:tasks", (string?)notification.GetKey()); + } + + [Fact] + public void Keyspace_HSet_ParsesCorrectly() + { + var channel = RedisChannel.Literal("__keyspace@2__:user:1000"); + RedisValue value = "hset"; + + Assert.True(KeyNotification.TryParse(in channel, in value, out var notification)); + + Assert.True(notification.IsKeySpace); + Assert.Equal(2, notification.Database); + Assert.Equal(KeyNotificationType.HSet, notification.Type); + Assert.Equal("user:1000", (string?)notification.GetKey()); + } + + [Fact] + public void Keyevent_ZAdd_ParsesCorrectly() + { + var channel = RedisChannel.Literal("__keyevent@7__:zadd"); + RedisValue value = "leaderboard"; + + Assert.True(KeyNotification.TryParse(in channel, in value, out var notification)); + + Assert.True(notification.IsKeyEvent); + Assert.Equal(7, notification.Database); + Assert.Equal(KeyNotificationType.ZAdd, notification.Type); + Assert.Equal("leaderboard", (string?)notification.GetKey()); + } + + [Fact] + public void TryCopyKey_WorksCorrectly() + { + var channel = RedisChannel.Literal("__keyspace@0__:testkey"); + RedisValue value = "set"; + + Assert.True(KeyNotification.TryParse(in channel, in value, out var notification)); + + var lease = ArrayPool.Shared.Rent(20); + Span buffer = lease.AsSpan(0, 20); + Assert.True(notification.TryCopyKey(buffer, out var bytesWritten)); + Assert.Equal(7, bytesWritten); + Assert.Equal("testkey", Encoding.UTF8.GetString(lease, 0, bytesWritten)); + ArrayPool.Shared.Return(lease); + } + + [Fact] + public void TryCopyKey_FailsWithSmallBuffer() + { + var channel = RedisChannel.Literal("__keyspace@0__:testkey"); + RedisValue value = "set"; + + Assert.True(KeyNotification.TryParse(in channel, in value, out var notification)); + + Span buffer = stackalloc byte[3]; // too small + Assert.False(notification.TryCopyKey(buffer, out var bytesWritten)); + Assert.Equal(0, bytesWritten); + } + + [Fact] + public void InvalidChannel_ReturnsFalse() + { + var channel = RedisChannel.Literal("regular:channel"); + RedisValue value = "data"; + + Assert.False(KeyNotification.TryParse(in channel, in value, out var notification)); + } + + [Fact] + public void InvalidKeyspaceChannel_MissingDelimiter_ReturnsFalse() + { + var channel = RedisChannel.Literal("__keyspace@0__"); // missing the key part + RedisValue value = "set"; + + Assert.False(KeyNotification.TryParse(in channel, in value, out var notification)); + } + + [Fact] + public void Keyspace_UnknownEventType_ReturnsUnknown() + { + var channel = RedisChannel.Literal("__keyspace@0__:mykey"); + RedisValue value = "unknownevent"; + + Assert.True(KeyNotification.TryParse(in channel, in value, out var notification)); + + Assert.True(notification.IsKeySpace); + Assert.Equal(0, notification.Database); + Assert.Equal(KeyNotificationType.Unknown, notification.Type); + Assert.Equal("mykey", (string?)notification.GetKey()); + } + + [Fact] + public void Keyevent_UnknownEventType_ReturnsUnknown() + { + var channel = RedisChannel.Literal("__keyevent@0__:unknownevent"); + RedisValue value = "mykey"; + + Assert.True(KeyNotification.TryParse(in channel, in value, out var notification)); + + Assert.True(notification.IsKeyEvent); + Assert.Equal(0, notification.Database); + Assert.Equal(KeyNotificationType.Unknown, notification.Type); + Assert.Equal("mykey", (string?)notification.GetKey()); + } + + [Fact] + public void Keyspace_WithColonInKey_ParsesCorrectly() + { + var channel = RedisChannel.Literal("__keyspace@0__:user:session:12345"); + RedisValue value = "del"; + + Assert.True(KeyNotification.TryParse(in channel, in value, out var notification)); + + Assert.True(notification.IsKeySpace); + Assert.Equal(0, notification.Database); + Assert.Equal(KeyNotificationType.Del, notification.Type); + Assert.Equal("user:session:12345", (string?)notification.GetKey()); + } + + [Fact] + public void Keyevent_Evicted_ParsesCorrectly() + { + var channel = RedisChannel.Literal("__keyevent@1__:evicted"); + RedisValue value = "cache:old"; + + Assert.True(KeyNotification.TryParse(in channel, in value, out var notification)); + + Assert.True(notification.IsKeyEvent); + Assert.Equal(1, notification.Database); + Assert.Equal(KeyNotificationType.Evicted, notification.Type); + Assert.Equal("cache:old", (string?)notification.GetKey()); + } + + [Fact] + public void Keyspace_New_ParsesCorrectly() + { + var channel = RedisChannel.Literal("__keyspace@0__:newkey"); + RedisValue value = "new"; + + Assert.True(KeyNotification.TryParse(in channel, in value, out var notification)); + + Assert.True(notification.IsKeySpace); + Assert.Equal(0, notification.Database); + Assert.Equal(KeyNotificationType.New, notification.Type); + Assert.Equal("newkey", (string?)notification.GetKey()); + } + + [Fact] + public void Keyevent_XGroupCreate_ParsesCorrectly() + { + var channel = RedisChannel.Literal("__keyevent@0__:xgroup-create"); + RedisValue value = "mystream"; + + Assert.True(KeyNotification.TryParse(in channel, in value, out var notification)); + + Assert.True(notification.IsKeyEvent); + Assert.Equal(0, notification.Database); + Assert.Equal(KeyNotificationType.XGroupCreate, notification.Type); + Assert.Equal("mystream", (string?)notification.GetKey()); + } + + [Fact] + public void Keyspace_TypeChanged_ParsesCorrectly() + { + var channel = RedisChannel.Literal("__keyspace@0__:mykey"); + RedisValue value = "type_changed"; + + Assert.True(KeyNotification.TryParse(in channel, in value, out var notification)); + + Assert.True(notification.IsKeySpace); + Assert.Equal(0, notification.Database); + Assert.Equal(KeyNotificationType.TypeChanged, notification.Type); + Assert.Equal("mykey", (string?)notification.GetKey()); + } + + [Fact] + public void Keyevent_HighDatabaseNumber_ParsesCorrectly() + { + var channel = RedisChannel.Literal("__keyevent@999__:set"); + RedisValue value = "testkey"; + + Assert.True(KeyNotification.TryParse(in channel, in value, out var notification)); + + Assert.True(notification.IsKeyEvent); + Assert.Equal(999, notification.Database); + Assert.Equal(KeyNotificationType.Set, notification.Type); + Assert.Equal("testkey", (string?)notification.GetKey()); + } + + [Fact] + public void Keyevent_NonIntegerDatabase_ParsesWellEnough() + { + var channel = RedisChannel.Literal("__keyevent@abc__:set"); + RedisValue value = "testkey"; + + Assert.True(KeyNotification.TryParse(in channel, in value, out var notification)); + + Assert.True(notification.IsKeyEvent); + Assert.Equal(-1, notification.Database); + Assert.Equal(KeyNotificationType.Set, notification.Type); + Assert.Equal("testkey", (string?)notification.GetKey()); + } + + [Fact] + public void DefaultKeyNotification_HasExpectedProperties() + { + var notification = default(KeyNotification); + + Assert.False(notification.IsKeySpace); + Assert.False(notification.IsKeyEvent); + Assert.Equal(-1, notification.Database); + Assert.Equal(KeyNotificationType.Unknown, notification.Type); + Assert.True(notification.GetKey().IsNull); + Assert.Equal(0, notification.KeyByteCount); + Assert.True(notification.Channel.IsNull); + Assert.True(notification.Value.IsNull); + + // TryCopyKey should return false and write 0 bytes + Span buffer = stackalloc byte[10]; + Assert.False(notification.TryCopyKey(buffer, out var bytesWritten)); + Assert.Equal(0, bytesWritten); + } + + [Theory] + [InlineData(KeyNotificationTypeFastHash.append.Text, KeyNotificationType.Append)] + [InlineData(KeyNotificationTypeFastHash.copy.Text, KeyNotificationType.Copy)] + [InlineData(KeyNotificationTypeFastHash.del.Text, KeyNotificationType.Del)] + [InlineData(KeyNotificationTypeFastHash.expire.Text, KeyNotificationType.Expire)] + [InlineData(KeyNotificationTypeFastHash.hdel.Text, KeyNotificationType.HDel)] + [InlineData(KeyNotificationTypeFastHash.hexpired.Text, KeyNotificationType.HExpired)] + [InlineData(KeyNotificationTypeFastHash.hincrbyfloat.Text, KeyNotificationType.HIncrByFloat)] + [InlineData(KeyNotificationTypeFastHash.hincrby.Text, KeyNotificationType.HIncrBy)] + [InlineData(KeyNotificationTypeFastHash.hpersist.Text, KeyNotificationType.HPersist)] + [InlineData(KeyNotificationTypeFastHash.hset.Text, KeyNotificationType.HSet)] + [InlineData(KeyNotificationTypeFastHash.incrbyfloat.Text, KeyNotificationType.IncrByFloat)] + [InlineData(KeyNotificationTypeFastHash.incrby.Text, KeyNotificationType.IncrBy)] + [InlineData(KeyNotificationTypeFastHash.linsert.Text, KeyNotificationType.LInsert)] + [InlineData(KeyNotificationTypeFastHash.lpop.Text, KeyNotificationType.LPop)] + [InlineData(KeyNotificationTypeFastHash.lpush.Text, KeyNotificationType.LPush)] + [InlineData(KeyNotificationTypeFastHash.lrem.Text, KeyNotificationType.LRem)] + [InlineData(KeyNotificationTypeFastHash.lset.Text, KeyNotificationType.LSet)] + [InlineData(KeyNotificationTypeFastHash.ltrim.Text, KeyNotificationType.LTrim)] + [InlineData(KeyNotificationTypeFastHash.move_from.Text, KeyNotificationType.MoveFrom)] + [InlineData(KeyNotificationTypeFastHash.move_to.Text, KeyNotificationType.MoveTo)] + [InlineData(KeyNotificationTypeFastHash.persist.Text, KeyNotificationType.Persist)] + [InlineData(KeyNotificationTypeFastHash.rename_from.Text, KeyNotificationType.RenameFrom)] + [InlineData(KeyNotificationTypeFastHash.rename_to.Text, KeyNotificationType.RenameTo)] + [InlineData(KeyNotificationTypeFastHash.restore.Text, KeyNotificationType.Restore)] + [InlineData(KeyNotificationTypeFastHash.rpop.Text, KeyNotificationType.RPop)] + [InlineData(KeyNotificationTypeFastHash.rpush.Text, KeyNotificationType.RPush)] + [InlineData(KeyNotificationTypeFastHash.sadd.Text, KeyNotificationType.SAdd)] + [InlineData(KeyNotificationTypeFastHash.set.Text, KeyNotificationType.Set)] + [InlineData(KeyNotificationTypeFastHash.setrange.Text, KeyNotificationType.SetRange)] + [InlineData(KeyNotificationTypeFastHash.sortstore.Text, KeyNotificationType.SortStore)] + [InlineData(KeyNotificationTypeFastHash.srem.Text, KeyNotificationType.SRem)] + [InlineData(KeyNotificationTypeFastHash.spop.Text, KeyNotificationType.SPop)] + [InlineData(KeyNotificationTypeFastHash.xadd.Text, KeyNotificationType.XAdd)] + [InlineData(KeyNotificationTypeFastHash.xdel.Text, KeyNotificationType.XDel)] + [InlineData(KeyNotificationTypeFastHash.xgroup_createconsumer.Text, KeyNotificationType.XGroupCreateConsumer)] + [InlineData(KeyNotificationTypeFastHash.xgroup_create.Text, KeyNotificationType.XGroupCreate)] + [InlineData(KeyNotificationTypeFastHash.xgroup_delconsumer.Text, KeyNotificationType.XGroupDelConsumer)] + [InlineData(KeyNotificationTypeFastHash.xgroup_destroy.Text, KeyNotificationType.XGroupDestroy)] + [InlineData(KeyNotificationTypeFastHash.xgroup_setid.Text, KeyNotificationType.XGroupSetId)] + [InlineData(KeyNotificationTypeFastHash.xsetid.Text, KeyNotificationType.XSetId)] + [InlineData(KeyNotificationTypeFastHash.xtrim.Text, KeyNotificationType.XTrim)] + [InlineData(KeyNotificationTypeFastHash.zadd.Text, KeyNotificationType.ZAdd)] + [InlineData(KeyNotificationTypeFastHash.zdiffstore.Text, KeyNotificationType.ZDiffStore)] + [InlineData(KeyNotificationTypeFastHash.zinterstore.Text, KeyNotificationType.ZInterStore)] + [InlineData(KeyNotificationTypeFastHash.zunionstore.Text, KeyNotificationType.ZUnionStore)] + [InlineData(KeyNotificationTypeFastHash.zincr.Text, KeyNotificationType.ZIncr)] + [InlineData(KeyNotificationTypeFastHash.zrembyrank.Text, KeyNotificationType.ZRemByRank)] + [InlineData(KeyNotificationTypeFastHash.zrembyscore.Text, KeyNotificationType.ZRemByScore)] + [InlineData(KeyNotificationTypeFastHash.zrem.Text, KeyNotificationType.ZRem)] + [InlineData(KeyNotificationTypeFastHash.expired.Text, KeyNotificationType.Expired)] + [InlineData(KeyNotificationTypeFastHash.evicted.Text, KeyNotificationType.Evicted)] + [InlineData(KeyNotificationTypeFastHash._new.Text, KeyNotificationType.New)] + [InlineData(KeyNotificationTypeFastHash.overwritten.Text, KeyNotificationType.Overwritten)] + [InlineData(KeyNotificationTypeFastHash.type_changed.Text, KeyNotificationType.TypeChanged)] + public unsafe void FastHashParse_AllKnownValues_ParseCorrectly(string raw, KeyNotificationType parsed) + { + var arr = ArrayPool.Shared.Rent(Encoding.UTF8.GetMaxByteCount(raw.Length)); + int bytes; + fixed (byte* bPtr = arr) // encode into the buffer + { + fixed (char* cPtr = raw) + { + bytes = Encoding.UTF8.GetBytes(cPtr, raw.Length, bPtr, arr.Length); + } + } + + var result = KeyNotificationTypeFastHash.Parse(arr.AsSpan(0, bytes)); + log.WriteLine($"Parsed '{raw}' as {result}"); + Assert.Equal(parsed, result); + + // and the other direction: + var fetchedBytes = KeyNotificationTypeFastHash.GetRawBytes(parsed); + string fetched; + fixed (byte* bPtr = fetchedBytes) + { + fetched = Encoding.UTF8.GetString(bPtr, fetchedBytes.Length); + } + + log.WriteLine($"Fetched '{raw}'"); + Assert.Equal(raw, fetched); + + ArrayPool.Shared.Return(arr); + } + + [Fact] + public void CreateKeySpaceNotification_Valid() + { + var channel = RedisChannel.KeySpace("abc", 42); + Assert.Equal("__keyspace@42__:abc", channel.ToString()); + Assert.False(channel.IsMultiNode); + Assert.True(channel.IsKeyRouted); + Assert.False(channel.IsSharded); + Assert.False(channel.IsPattern); + } + + [Theory] + [InlineData(null, null, "__keyspace@*__:*")] + [InlineData("abc*", null, "__keyspace@*__:abc*")] + [InlineData(null, 42, "__keyspace@42__:*")] + [InlineData("abc*", 42, "__keyspace@42__:abc*")] + public void CreateKeySpaceNotificationPattern(string? pattern, int? database, string expected) + { + var channel = RedisChannel.KeySpacePattern(pattern, database); + Assert.Equal(expected, channel.ToString()); + Assert.True(channel.IsMultiNode); + Assert.False(channel.IsKeyRouted); + Assert.False(channel.IsSharded); + Assert.True(channel.IsPattern); + } + + [Theory] + [InlineData(KeyNotificationType.Set, null, "__keyevent@*__:set", true)] + [InlineData(KeyNotificationType.XGroupCreate, null, "__keyevent@*__:xgroup-create", true)] + [InlineData(KeyNotificationType.Set, 42, "__keyevent@42__:set", false)] + [InlineData(KeyNotificationType.XGroupCreate, 42, "__keyevent@42__:xgroup-create", false)] + public void CreateKeyEventNotification(KeyNotificationType type, int? database, string expected, bool isPattern) + { + var channel = RedisChannel.KeyEvent(type, database); + Assert.Equal(expected, channel.ToString()); + Assert.True(channel.IsMultiNode); + Assert.False(channel.IsKeyRouted); + Assert.False(channel.IsSharded); + if (isPattern) + { + Assert.True(channel.IsPattern); + } + else + { + Assert.False(channel.IsPattern); + } + } + + [Fact] + public void Cannot_KeyRoute_KeySpace_SingleKeyIsKeyRouted() + { + var channel = RedisChannel.KeySpace("abc", 42); + Assert.False(channel.IsMultiNode); + Assert.True(channel.IsKeyRouted); + Assert.True(channel.WithKeyRouting().IsKeyRouted); // no change, still key-routed + Assert.Equal(RedisCommand.PUBLISH, channel.GetPublishCommand()); + } + + [Fact] + public void Cannot_KeyRoute_KeySpacePattern() + { + var channel = RedisChannel.KeySpacePattern("abc", 42); + Assert.True(channel.IsMultiNode); + Assert.False(channel.IsKeyRouted); + Assert.StartsWith("Key routing is not supported for multi-node channels", Assert.Throws(() => channel.WithKeyRouting()).Message); + Assert.StartsWith("Publishing is not supported for multi-node channels", Assert.Throws(() => channel.GetPublishCommand()).Message); + } + + [Fact] + public void Cannot_KeyRoute_KeyEvent() + { + var channel = RedisChannel.KeyEvent(KeyNotificationType.Set, 42); + Assert.True(channel.IsMultiNode); + Assert.False(channel.IsKeyRouted); + Assert.StartsWith("Key routing is not supported for multi-node channels", Assert.Throws(() => channel.WithKeyRouting()).Message); + Assert.StartsWith("Publishing is not supported for multi-node channels", Assert.Throws(() => channel.GetPublishCommand()).Message); + } + + [Fact] + public void Cannot_KeyRoute_KeyEvent_Custom() + { + var channel = RedisChannel.KeyEvent("foo"u8, 42); + Assert.True(channel.IsMultiNode); + Assert.False(channel.IsKeyRouted); + Assert.StartsWith("Key routing is not supported for multi-node channels", Assert.Throws(() => channel.WithKeyRouting()).Message); + Assert.StartsWith("Publishing is not supported for multi-node channels", Assert.Throws(() => channel.GetPublishCommand()).Message); + } + + [Fact] + public void KeyEventPrefix_KeySpacePrefix_Length_Matches() + { + // this is a sanity check for the parsing step in KeyNotification.TryParse + Assert.Equal(KeyNotificationChannels.KeySpacePrefix.Length, KeyNotificationChannels.KeyEventPrefix.Length); + } +}