【橙子老哥】C# CAP源码原理剖析深入解读

hello,大家好呀,欢迎来到橙子老哥的分享时刻,希望大家一起学习,一起进步。1、概述好久不见,今天我们来看看老朋友:CAP,当然这个cap是NCC下的一个开源工具包,用于解决分布式下事务最终一致性问题跟shardingcore一样,被nc

【橙子老哥】C# CAP源码原理剖析深入解读

hello,大家好呀,欢迎来到橙子老哥的分享时刻,希望大家一起学习,一起进步。

1、概述

好久不见,今天我们来看看老朋友:CAP,当然这个cap是NCC下的一个开源工具包,用于解决分布式下事务最终一致性问题

shardingcore一样,被ncc收编,所以代码质量还是杠杠的,推荐大家阅读学习

有人疑惑了,这个之前不是讲过一遍吗?【橙子老哥】C# 实操分布式事务解决方案

其实分布式事务解决方案内容非常多,每个场景都有不同的方案,不是几篇文章就能讲完的,之前那篇讲了经典的3类

  1. tcc
  2. 2pc
  3. 3pc

还有很多方式,包括以下:

  1. 同步执行(适合实时性要求不高,并发不高,不引入任何中间件)
  2. 基于数据库的分布式事务(适合实时性要求高,并发要求高,资源足,只依赖数据库)
  3. 基于消息驱动数据库的分布式事务(实时性要求高,并发要求高,可引入第三方中间件)

而本篇讲的CAP源码的原理就是基于消息驱动数据库的分布式事务最终一致性

2、流程

以下是我整出来的CAP源码核心对象思维导图:

在这里插入图片描述

可以看出,东西其实不多,主要就是3个核心流程

  1. IBootstrapper (初始化启动)
  2. ICapPublisher (发布消息)
  3. Dispatcher(调度器)执行

而不管是初始化,还是在发布消息的时候,最终都调用了调度器去执行

  • 初始化的时候:找出所有消息订阅者线程挂起监听事件,有事件过来通过调度器去执行对应的方法
  • 发布消息的时候:通过ICapPublisher对象发送消息,也是调用的调度器去执行

3、初始化

我们在ServiceCollectionExtensions.AddCap

(this IServiceCollection services, Action setupAction)入口看看初始化做了什么 它在加了一堆核心对象之后,最后做了一个初始化

代码语言:javascript代码运行次数:0运行复制
services.AddSingleton<Bootstrapper>();
services.AddHostedService(sp => sp.GetRequiredService<Bootstrapper>());
services.AddSingleton<IBootstrapper>(sp => sp.GetRequiredService<Bootstrapper>());

通过AddHostedService的方式进行运行,这个是Asp.NetCore提供的初始化的方法

代码语言:javascript代码运行次数:0运行复制
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
    await BootstrapAsync(stoppingToken).ConfigureAwait(false);
}

public async Task BootstrapAsync(CancellationToken cancellationToken = default)
    {
        if (_cts != null)
        {
            _logger.LogInformation("### CAP background task is already started!");

            return;
        }

        _logger.LogDebug("### CAP background task is starting.");

        _cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);

        CheckRequirement();

        //获取全部进程服务IProcessingServer,目的是给下面的_cts.Token.Register 注册一个释放的事件
        _processors = _serviceProvider.GetServices<IProcessingServer>();

        try
        {
            //通过IStorageInitializer进行一个持久化的初始化,mysql的就是一堆创建语句,这里就不进入了
            await _serviceProvider.GetRequiredService<IStorageInitializer>().InitializeAsync(_cts.Token).ConfigureAwait(false);
        }
        catch (Exception e)
        {
            if (e is InvalidOperationException) throw;
            _logger.LogError(e, "Initializing the storage structure failed!");
        }


        //这里注册一个事件,这些初始化进程被取消的时候执行
        _cts.Token.Register(() =>
        {
            _logger.LogDebug("### CAP background task is stopping.");


            foreach (var item in _processors)
                try
                {
                    item.Dispose();
                }
                catch (OperationCanceledException ex)
                {
                    _logger.ExpectedOperationCanceledException(ex);
                }
        });

        //再往里面走一点,走到下面的方法
        await BootstrapCoreAsync().ConfigureAwait(false);

        _disposed = false;
        _logger.LogInformation("### CAP started!");
    }

   protected virtual async Task BootstrapCoreAsync()
    {
        //拿上面注入的进程服务,全部调用了一下Start
        foreach (var item in _processors)
        {
            try
            {
                _cts!.Token.ThrowIfCancellationRequested();

                await item.Start(_cts!.Token);
            }
            catch (OperationCanceledException)
            {
                // ignore
            }
            catch (Exception ex)
            {
                _logger.ProcessorsStartedError(ex);
            }
        }
    }

从上面的代码可以看到,包了一层,目的是执行store的初始化 和 IProcessingServer的star初始化,IProcessingServer有4个实现

  1. CapProcessingServer 包一层,做无限递归
  2. ConsulProcessingNodeServer(给consul服务注册)
  3. DiagnosticRegister(诊断器注册)使用DiagnosticListener做aop操作
  4. Dispatcher(调度器初始化注册下)
  5. ConsumerRegister(订阅者注册)

上面重点的是第1点和第5点,由于篇幅有限,其他的大家可以通过前面梳理好的思维导图自己跟踪一下

CapProcessingServer是个很有意思的东西,它的start方法,又全部调用了IProcessor的ProcessAsync,包了4个IProcessor

  1. TransportCheckProcessor 检测进程
  2. MessageNeedToRetryProcessor 消息重试进程
  3. MessageDelayedProcessor 延迟消息进程
  4. CollectorProcessor 收集者进程(删除过期数据)
代码语言:javascript代码运行次数:0运行复制
    public Task Start(CancellationToken stoppingToken)
    {
        stoppingToken.Register(() => _cts.Cancel());

        _logger.ServerStarting();

        _context = new ProcessingContext(_provider, _cts.Token);

        var processorTasks = GetProcessors()
            //这里执行了递归
            .Select(InfiniteRetry)
            .Select(p => p.ProcessAsync(_context));
         _compositeTask = Task.WhenAll(processorTasks);

        return Task.CompletedTask;
    }

    private IProcessor InfiniteRetry(IProcessor inner)
    {
        returnnew InfiniteRetryProcessor(inner, _loggerFactory);
    }

private IProcessor[] GetProcessors()
    {
        var returnedProcessors = new List<IProcessor>
        {
            _provider.GetRequiredService<TransportCheckProcessor>(),
            _provider.GetRequiredService<MessageNeedToRetryProcessor>(),
            _provider.GetRequiredService<MessageDelayedProcessor>(),
            _provider.GetRequiredService<CollectorProcessor>()
        };

        return returnedProcessors.ToArray();
    }

而这4个执行之前调用,InfiniteRetry,这个方法又是把参数传给了,InfiniteRetryProcessor传给了自己,形成了死循环,这个线程就一直被挂起来了

代码语言:javascript代码运行次数:0运行复制
InfiniteRetryProcessor:

public async Task ProcessAsync(ProcessingContext context)
    {
        while (!context.IsStopping)
            try
            {
                await _inner.ProcessAsync(context).ConfigureAwait(false);
            }
            catch (OperationCanceledException)
            {
                //ignore
            }
            catch (Exception ex)
            {
                _logger.LogWarning(ex, "Processor '{ProcessorName}' failed. Retrying...", _inner.ToString());
                await context.WaitAsync(TimeSpan.FromSeconds(2)).ConfigureAwait(false);
            }
    }

所以,CAP这么一玩,其实还挺巧妙的,把需要无线执行挂起的进程和只用初始化一次的进程又很好的区分开了

另一个核心的进程服务,是前面的ConsumerRegister(订阅者注册),它的start方法和其他事件插件一样,反射找到所有订阅者,然后进行监听

代码语言:javascript代码运行次数:0运行复制
 public void Execute()
    {
        //通过MethodMatcherCache _selector 获取到所有订阅者,这里面的代码都比较简单,就不深入了
        var groupingMatches = _selector.GetCandidatesMethodsOfGroupNameGrouped();

        foreach (var matchGroup in groupingMatches)
        {
            ICollection<string> topics;
            //获取限制
            var limit = _selector.GetGroupConcurrentLimit(matchGroup.Key);
            try
            {
                // 这里通过工厂包一层拿到对应的CnsumerClient
                using (var client = _consumerClientFactory.Create(matchGroup.Key, limit))
                {
                    client.OnLogCallback = WriteLog;
                    //有些消息队列是需要先同步主题的,要先执行下这个
                    topics = client.FetchTopics(matchGroup.Value.Select(x => x.TopicName));
                }
            }

            //根据配置的cap消费者线程数量,开启多少个线程
            for (var i = 0; i < _options.ConsumerThreadCount; i++)
            {
                var topicIds = topics.Select(t => t);
                //核心,开线程
                _ = Task.Factory.StartNew(() =>
                  {
                      try
                      {
                          // ReSharper disable once ConvertToUsingDeclaration
                          using (var client = _consumerClientFactory.Create(matchGroup.Key, limit))
                          {
                              _serverAddress = client.BrokerAddress;
                             // 注册一个消息回调,回调方法里面最终执行了IDispatcher.EnqueueToExecute(调度器的执行方法)
                              RegisterMessageProcessor(client);
                             //订阅消息
                              client.Subscribe(topicIds);
                             //进行监听,while 死循环,将线程挂起
                              client.Listening(_pollingDelay, _cts.Token);
                          }
                      }
                  }, _cts.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default);
            }
        }

        _compositeTask = Task.CompletedTask;
    }

以上订阅的操作,是给每一个消费者开启一个挂起的线程进行监听,同时在监听事件中注册事件接受方法,这个消息回调方法 执行了IDispatcher.EnqueueToExecute,将执行本地具体的方法 很多的事件插件,其实都基本是这样玩的,所以这块看着还挺熟悉的

4、消息发布

接下来,我们到了ICapPublisher ,看看publish做了什么,这里代码比较多,我过滤掉一些校验,日志等非核心的东西

代码语言:javascript代码运行次数:0运行复制
private async Task PublishInternalAsync<T>(string name, T? value, IDictionary<string, string?> headers, TimeSpan? delayTime = null,
        CancellationToken cancellationToken = default)
    {
            //诊断器aop
            tracingTimestamp = TracingBefore(message);
            //如果我们没有开事务(==没有开事务,那你用啥子cap,所以下面这个if代码不太明白意义,就只是直接当一个mq发出去了)
            if (Transaction?.DbTransaction == null)
            {
                //store存储消息
                var mediumMessage = await _storage.StoreMessageAsync(name, message).ConfigureAwait(false);

                //诊断器aop
                TracingAfter(tracingTimestamp, message);


                if (delayTime != null)
                {
                    //通过调度器,发送延迟消息
                    await _dispatcher.EnqueueToScheduler(mediumMessage, publishTime).ConfigureAwait(false);
                }
                else
                {
                    //通过调度器,直接发送消息
                    await _dispatcher.EnqueueToPublish(mediumMessage).ConfigureAwait(false);
                }
            }
            else
            {
                //这里我们开启了事务
                var transaction = (CapTransactionBase)Transaction;

                //store存储消息
                var mediumMessage = await _storage.StoreMessageAsync(name, message, transaction.DbTransaction)
                    .ConfigureAwait(false);

                //诊断器aop
                TracingAfter(tracingTimestamp, message);

                //这里注意哦,并没有直接发送事件,而是塞到了事务中的队列中(_bufferList)
                transaction.AddToSent(mediumMessage);

                //提交事务,把我们的 store要插入的消息和其他的业务事务,一起打包提交
                if (transaction.AutoCommit) await transaction.CommitAsync(cancellationToken).ConfigureAwait(false);
            }
    }

上面的核心是mq并没有发出去,而是塞到了业务的事务,一起跟着事务提交,而我们的mq被存储到transaction对象中的队列中了而已,我们继续看提交的时候做了什么

代码语言:javascript代码运行次数:0运行复制
 public override async Task CommitAsync(CancellationToken cancellationToken = default)
    {
        Debug.Assert(DbTransaction != null);

        switch (DbTransaction)
        {
            case DbTransaction dbTransaction:
                await dbTransaction.CommitAsync(cancellationToken).ConfigureAwait(false);
                break;
            case IDbContextTransaction dbContextTransaction:
                await dbContextTransaction.CommitAsync(cancellationToken).ConfigureAwait(false);
                break;
        }
        //注意这个,这个是遍历之前发消息的存储的队列
        await FlushAsync();
    }

   protected virtual async Task FlushAsync()
    {
        while (!_bufferList.IsEmpty)
        {
            if (_bufferList.TryDequeue(outvar message))
            {
                var isDelayMessage = message.Origin.Headers.ContainsKey(Headers.DelayTime);
                if (isDelayMessage)
                {

                    await _dispatcher.EnqueueToScheduler(message, DateTime.Parse(message.Origin.Headers[Headers.SentTime]!, CultureInfo.InvariantCulture)).ConfigureAwait(false);

                }
                else
                {
                    await _dispatcher.EnqueueToPublish(message).ConfigureAwait(false);
                }
            }
        }
    }

看到这里,也就很清楚了,核心的地方其实就这么几句,发送事件的时候,没有真的发,而是存了下来,等我们的消息表和业务的事务一起提交之后,再通过_dispatcher调度器发送出去 当mq发送成功之后,再把消息表状态更改,这样就确保了可靠性

5、Dispatcher 执行(调度)器

前面其实核心都过了一遍了,最后剩下的就是这个调度器,可以看出,不管是发送消息,还算消费消息,都是通过这个所谓的调度器进行调度执行的,但是这个里面倒没有什么难点

我们重点看它下面两个方法:

  1. EnqueueToExecute执行方法
  2. EnqueueToPublish 发布消息

EnqueueToExecute其实就是和大部分的事件插件一样,根据订阅者信息和参数,反射去执行具体订阅的方法 具体就是:ActivatorUtilities.GetServiceOrCreateInstance 反射创建订阅者,然后executor.ExecuteAsync 反射执行方法

EnqueueToPublish也只是多包了几层,为了兼容多种消息队列,最终也是调用发送一个消息队列而已 具体就是通过IMessageSender(消息发送器)包一层,调用具体的Transport的去SendAsync

6、总结

如果你看到这里,那就很清楚,我们讲的CAP源码原理就是基于消息驱动数据库的分布式事务

这里主要我总结几点:

  • 使用数据库事务保证消息表和自己的业务一致
  • 使用消息驱动推送来提高服务通讯效率
  • 使用数据库表轮询方式确保消息重试

这个流程跟ABP的分布式事件其实大差不多,但是其中做的事情又有明显的区别 这里,我先给大家埋个坑:ABP的分布式事件 和 CAP的事件 有什么本质区别呢?清楚了这个,也基本知道CAP核心到底做了什么,剩下的交给大家的评论区发表自己的观点吧~

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。原始发表:2025-04-03,如有侵权请联系 cloudcommunity@tencent 删除c#事件事务原理源码

发布者:admin,转转请注明出处:http://www.yc00.com/web/1747990322a4715834.html

相关推荐

  • 【橙子老哥】C# CAP源码原理剖析深入解读

    hello,大家好呀,欢迎来到橙子老哥的分享时刻,希望大家一起学习,一起进步。1、概述好久不见,今天我们来看看老朋友:CAP,当然这个cap是NCC下的一个开源工具包,用于解决分布式下事务最终一致性问题跟shardingcore一样,被nc

    5小时前
    10

发表回复

评论列表(0条)

  • 暂无评论

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信