使用 C# 实现一个 Event Bus
Event Bus(事件总线)是一种用于在应用程序内部或跨应用程序组件之间进行事件通信的机制。
它允许不同的组件通过发布和订阅事件来进行解耦和通信。在给定的代码片段中,我们可以看到一个使用C#实现的Event Bus。它定义了一些接口和类来实现事件的发布和订阅。
首先,我们有两个基本的约束接口:IEvent
和IAsyncEventHandler<TEvent>
。
IEvent是一个空接口,用于约束事件的类型。IAsyncEventHandler<TEvent>
是一个泛型接口,用于约束事件处理程序的类型。它定义了处理事件的异步方法HandleAsync和处理异常的方法HandleException。接下来,我们有一个IEventBus接口,它定义了一些操作方法用于发布和订阅事件。
其中,Publish<TEvent>
和PublishAsync<TEvent>
方法用于发布事件,而OnSubscribe<TEvent>
方法用于订阅事件。然后,我们看到一个实现了本地事件总线的类LocalEventBusManager<TEvent>
。它实现了ILocalEventBusManager<TEvent>
接口,用于在单一管道内处理本地事件。它使用了一个Channel<TEvent>
来存储事件,并提供了发布事件的方法Publish
和PublishAsync
。此外,它还提供了一个自动处理事件的方法AutoHandle
。
总的来说Event Bus
提供了一种方便的方式来实现组件之间的松耦合通信。
通过发布和订阅事件,组件可以独立地进行操作,而不需要直接依赖于彼此的实现细节。
这种机制可以提高代码的可维护性和可扩展性。
Github仓库地址:https://github.com/DonPangPang/soda-event-bus
实现一些基本约束
先实现一些约束,实现IEvent
约束事件,实现IAsyncEvnetHandler<TEvent> where TEvent:IEvent
来约束事件的处理程序。
public interface IEvent
{
}
public interface IAsyncEventHandler<in TEvent> where TEvent : IEvent
{
Task HandleAsync(IEvent @event);
void HandleException(IEvent @event, Exception ex);
}
接下来规定一下咱们的IEventBus
,会有哪些操作方法。基本就是发布和订阅。
public interface IEventBus
{
void Publish<TEvent>(TEvent @event) where TEvent : IEvent;
Task PublishAsync<TEvent>(TEvent @event) where TEvent : IEvent;
void OnSubscribe<TEvent>() where TEvent : IEvent;
}
实现一个本地事件总线
本地事件处理
本地事件的处理我打算采用两种方式实现,一种是LocalEventBusManager
即本地事件管理,第二种是LocalEventBusPool
池化本地事件。
LocalEvnetBusManager
LocalEventBusManager
主要在单一管道内进行处理,集中进行消费。
public interface ILocalEventBusManager<in TEvent>where TEvent : IEvent
{
void Publish(TEvent @event);
Task PublishAsync(TEvent @event) ;
void AutoHandle();
}
public class LocalEventBusManager<TEvent>(IServiceProvider serviceProvider):ILocalEventBusManager<TEvent>
where TEvent: IEvent
{
readonly IServiceProvider _servicesProvider = serviceProvider;
private readonly Channel<TEvent> _eventChannel = Channel.CreateUnbounded<TEvent>();
public void Publish(TEvent @event)
{
Debug.Assert(_eventChannel != null, nameof(_eventChannel) + " != null");
_eventChannel.Writer.WriteAsync(@event);
}
private CancellationTokenSource Cts { get; } = new();
public void Cancel()
{
Cts.Cancel();
}
public async Task PublishAsync(TEvent @event)
{
await _eventChannel.Writer.WriteAsync(@event);
}
public void AutoHandle()
{
// 确保只启动一次
if (!Cts.IsCancellationRequested) return;
Task.Run(async () =>
{
while (!Cts.IsCancellationRequested)
{
var reader = await _eventChannel.Reader.ReadAsync();
await HandleAsync(reader);
}
}, Cts.Token);
}
async Task HandleAsync(TEvent @event)
{
var handler = _servicesProvider.GetService<IAsyncEventHandler<TEvent>>();
if (handler is null)
{
throw new NullReferenceException($"No handler for event {@event.GetType().Name}");
}
try
{
await handler.HandleAsync(@event);
}
catch (Exception ex)
{
handler.HandleException( @event, ex);
}
}
}
LocalEventBusPool
LocalEventBusPool
即所有的Event都会有一个单独的管道处理,单独消费处理,并行能力更好一些。
public sealed class LocalEventBusPool(IServiceProvider serviceProvider)
{
private readonly IServiceProvider _serviceProvider = serviceProvider;
private class ChannelKey
{
public required string Key { get; init; }
public int Subscribers { get; set; }
public override bool Equals(object? obj)
{
if (obj is ChannelKey key)
{
return string.Equals(key.Key, Key, StringComparison.OrdinalIgnoreCase);
}
return false;
}
public override int GetHashCode()
{
return 0;
}
}
private Channel<IEvent> Rent(string channel)
{
_channels.TryGetValue(new ChannelKey() { Key = channel }, out var value);
if (value != null) return value;
value = Channel.CreateUnbounded<IEvent>();
_channels.TryAdd(new ChannelKey() { Key = channel }, value);
return value;
}
private Channel<IEvent> Rent(ChannelKey channelKey)
{
_channels.TryGetValue(channelKey, out var value);
if (value != null) return value;
value = Channel.CreateUnbounded<IEvent>();
_channels.TryAdd(channelKey, value);
return value;
}
private readonly ConcurrentDictionary<ChannelKey, Channel<IEvent>> _channels = new();
private CancellationTokenSource Cts { get; } = new();
public void Cancel()
{
Cts.Cancel();
_channels.Clear();
Cts.TryReset();
}
public async Task PublishAsync<TEvent>(TEvent @event) where TEvent : IEvent
{
await Rent(typeof(TEvent).Name).Writer.WriteAsync(@event);
}
public void Publish<TEvent>(TEvent @event) where TEvent : IEvent
{
Rent(typeof(TEvent).Name).Writer.TryWrite(@event);
}
public void OnSubscribe<TEvent>() where TEvent : IEvent
{
var channelKey = _channels.FirstOrDefault(x => x.Key.Key == typeof(TEvent).Name).Key ??
new ChannelKey() { Key = typeof(TEvent).Name };
channelKey.Subscribers++;
Task.Run(async () =>
{
try
{
while (!Cts.IsCancellationRequested)
{
var @event = await ReadAsync(channelKey);
var handler = _serviceProvider.GetService<IAsyncEventHandler<TEvent>>();
if (handler == null) throw new NullReferenceException($"No handler for Event {typeof(TEvent).Name}");
try
{
await handler.HandleAsync((TEvent)@event);
}
catch (Exception ex)
{
handler.HandleException((TEvent)@event, ex);
}
}
}
catch (Exception e)
{
throw new InvalidOperationException("Error on onSubscribe handler", e);
}
}, Cts.Token);
}
private async Task<IEvent> ReadAsync(string channel)
{
return await Rent(channel).Reader.ReadAsync(Cts.Token);
}
private async Task<IEvent> ReadAsync(ChannelKey channel)
{
return await Rent(channel).Reader.ReadAsync(Cts.Token);
}
}
LocalEventBus
实现LocalEventBus
继承自IEventBus
即可,如果有需要扩展的方法自行添加,池化和管理器的情况单独处理。
public interface ILocalEventBus: IEventBus
{
}
public class LocalEventBus(IServiceProvider serviceProvider, LocalEventBusOptions options) : ILocalEventBus
{
private LocalEventBusPool? EventBusPool => serviceProvider.GetService<LocalEventBusPool>();
public void Publish<TEvent>(TEvent @event) where TEvent : IEvent
{
if (options.Pool)
{
Debug.Assert(EventBusPool != null, nameof(EventBusPool) + " != null");
EventBusPool.Publish(@event);
}
else
{
var manager = serviceProvider.GetService<LocalEventBusManager<TEvent>>();
if (manager is null) throw new NullReferenceException($"No manager for event {typeof(TEvent).Name}, please add singleton service it.");
manager.Publish(@event);
}
}
public async Task PublishAsync<TEvent>(TEvent @event) where TEvent : IEvent
{
if (options.Pool)
{
Debug.Assert(EventBusPool != null, nameof(EventBusPool) + " != null");
await EventBusPool.PublishAsync(@event);
}
else
{
var manager = serviceProvider.GetService<LocalEventBusManager<TEvent>>();
if (manager is null) throw new NullReferenceException($"No manager for event {typeof(TEvent).Name}, please add singleton service it.");
await manager.PublishAsync(@event);
}
}
public void OnSubscribe<TEvent>() where TEvent : IEvent
{
if (options.Pool)
{
Debug.Assert(EventBusPool != null, nameof(EventBusPool) + " != null");
EventBusPool.OnSubscribe<TEvent>();
}
else
{
var manager = serviceProvider.GetService<LocalEventBusManager<TEvent>>();
if (manager is null) throw new NullReferenceException($"No manager for event {typeof(TEvent).Name}, please add singleton service it.");
manager.AutoHandle();
}
}
}
分布式事件总线
根据需要扩展即可,基本逻辑相同,但可能需要增加确认机制等。