云原生电商微服务实战1:搭建基础框架
2024-09-07 22:29:37- 一直以来想专心写一个基于云原生的微服务案例,但总角色要写的内容太多了。电商案例本身并不会太难,而要搭建好一整套云原生和微服务的基础设施,并不是一项轻松的工作。随着.Net Asprise的推出,再集成Dapr,开发和生产几乎可以保持一样的环境了。这让我惊喜,曾经我们手动搭建k8s,手动配置集群以及可以成为历史,我们需要的就是使用这些工具,重新把核心回归到业务上来。
正好新公司也是做电商业务的,于是我终于决定开始写一套云原生电商微服务的案例。一方面可以学习Asprise和Dapr的相关知识,也是一次属性电商核心业务的机会。
这次的内容会写得比较细,目标是把这个项目当成一个培训项目来写。让不了解云原生、不了解微服务和不了解电商业务的人,也能看得懂该项目。
一、DDD
DDD相关的文章以及很多了,我在几年前也写过一个DDD的学习系列,本文就只是做个总结,毕竟微服务是离不开DDD的。
解决什么问题
- 问题域
- 需求分析
- 分析理解复杂业务领域问题
- 准确反映业务语言
领域分析概念
- 领域
- 子域
- 核心域、通用域和支撑域
- 限界上下文
领域建模概念
- 实体与值对象
- 聚合与聚合根
- 领域事件
- 领域服务
- 仓储
- 工作单元模式
- 规约
- 应用服务
- 防腐层
领域驱动设计
CQRS: 命令与查询分离,作为一种战术办法,是实现DDD建模领域的最佳途径之一。
充血模型: 让模型自带业务逻辑,业务属性的改变只有模型自身可以操作。
二、整洁架构
核心原则
- 独立于框架:整洁架构的系统核心业务逻辑不依赖于具体的软件框架,业务逻辑部分都能够独立运行。这样在框架更新或者替换时,对核心业务的影响最小。
- 可测试性:架构设计使得业务规则可以很方便地被测试。因为业务逻辑是独立于外部组件(如数据库、用户界面等)的,所以可以使用单元测试来验证业务规则的正确性。比如,在一个电商系统中,“计算商品折扣”的业务规则可以通过提供模拟的商品价格数据来进行单元测试,而不需要真正地连接数据库或者启动整个用户界面。
- 独立于UI(用户界面):业务逻辑与用户界面相互独立。这意味着可以方便地替换用户界面,比如从一个命令行界面转换为图形界面,或者从Web界面转换为移动应用界面,而不会影响到业务逻辑。
- 独立于数据库:系统的核心不依赖于数据库的类型和实现。不管是使用关系型数据库(如MySQL)还是非关系型数据库(如MongoDB),业务逻辑部分都能保持稳定。
洋葱架构
依赖原则:上图的同心圆代表软件的不同部分。总的来说,越往里面,代码级别越高。外面的圆是实现机制,而内层的圆是原则。
Entities:Entities封装了企业级的业务规则。一个Entity可以是一个带方法的对象,也可以是一个数据结构和方法集。Entities可以被用于企业的其他应用。
Use Cases: 这一层包含了应用特有的业务规则。它封装和实现了系统的所有用例。这些用例协调数据从entities的流入和流出,并且指导entities使用它们的企业级业务规则来达到用例的目标。
三、项目框架
上图是本电商项目每个微服务实现使用的分层架构,在具体实现每个微服务前,先为项目打好地基,编写一些通用类库。
上图是目前搭好的电商项目整体架构,其中:
- aspire:.net 8开始提供的开发工具,用于构建和运行云原生应用程序
- gateways:网关项目
- services:各个微服务的实现
- app:存放非核心域服务
- works:后台作业
- pakages:项目需要用到独立的可以打包的nuget包,可以拿到其他项目中使用
- shared:项目的共享类库,为实现各个微服务提供共享的基础类。
四、领域层模型规范
本项目使用.NET 9开发,现在我们先从shared开始。
-
在shared文件夹创建DDD.DHT.SharedKernel类库项目,修改默认命名空间为DDD.DHT
-
创建文件夹Domain,分别实现如下3个类
IEntity.cs
- namespace DDM.DHT.Domain;
- public interface IEntity;
- public interface IEntity<TId> : IEntity
- {
- TId Id { get; set; }
- }
IAggregateRoot.cs
- namespace DDM.DHT.Domain;
- public interface IAggregateRoot;
BaseEntity.cs
- using System.ComponentModel.DataAnnotations;
- using System.ComponentModel.DataAnnotations.Schema;
- namespace DDM.DHT.Domain;
- public abstract class BaseEntity<TId> : IEntity<TId>
- {
- private readonly List<BaseEvent> _domainEvents = [];
- [NotMapped]
- public IReadOnlyCollection<BaseEvent> DomainEvents => _domainEvents.AsReadOnly();
- [Key]
- public virtual TId Id { get; set; } = default!;
- public void AddDomainEvent(BaseEvent domainEvent)
- {
- _domainEvents.Add(domainEvent);
- }
- public void RemoveDomainEvent(BaseEvent domainEvent)
- {
- _domainEvents.Remove(domainEvent);
- }
- public void ClearDomainEvents()
- {
- _domainEvents.Clear();
- }
- }
-
在shared文件夹创建DDM.DHT.Core.Common类库项目,也修改默认命名空间为DDD.DHT。这个类库与SharedEvent的区别是,该类库实现主要是针对本项目的。而SharedEvent实际上可以做出nuget包供其他项目使用
-
在DDM.DHT.Core.Common项目中创建Domain文件夹,实现如下3个类:
BaseEntity.cs
- namespace DDM.DHT.Domain;
- public abstract class BaseEntity : BaseEntity<long>;
BaseAuditEntity.cs
- namespace DDM.DHT.Domain;
- public abstract class BaseAuditEntity : BaseEntity
- {
- public DateTime? CreatedAt { get; set; }
- public DateTime? LastModifiedAt { get; set; }
- }
AuditWithUserEntity.cs
- namespace DDM.DHT.Domain;
- public abstract class AuditWithUserEntity : BaseAuditEntity
- {
- public long? CreatedBy { get; set; }
- public long? LastModifiedBy { get; set; }
- }
五、通用仓储规范
仓储要重点说一下,按照DDD的要求,所有聚合的属性都是通过聚合根来操作的,而仓储是用来实现数据持久化的。因此根据DDD的要求,只有聚合根才会有仓储接口和具体实现。然而在具体开发中,这种约束会使得代码变得复杂,有些简单的,可能仅仅只有CRUD的实体,因为不是聚合根,没有自己的仓储实现,设计者可能被迫把这类实体也设计成聚合根了。
读过ABP源码的朋友应该清楚,ABP的仓储就不是直接约束的IAggregateRoot,而是改成了IEntity。这样完全放开,又存在未来开发者随便修改聚合属性,破坏聚合根的管辖职责。
结合这些情况,本项目对仓储基类的设计做了一些特别的约束。见代码分析:
-
在DDM.DHT.SharedKernel项目添加Repository文件夹,创建IReadRepository.cs
- public interface IReadRepository<T> where T : class, IEntity
- {
- Task<T?> GetByIdAsync<TKey>(TKey id, CancellationToken cancellationToken = default);
- Task<List<T>> GetListAsync(ISpecification<T>? specification = null, CancellationToken cancellationToken = default);
- Task<T?> GetSingleOrDefaultAsync(ISpecification<T>? specification = null,
- CancellationToken cancellationToken = default);
- Task<int> CountAsync(ISpecification<T>? specification = null, CancellationToken cancellationToken = default);
- Task<bool> AnyAsync(ISpecification<T>? specification = null, CancellationToken cancellationToken = default);
- }
注意看,这个接口的泛型约束是IEntity,而接口方法只是定义了一些读方法。如果只是读数据,该仓储是不会破坏到聚合内部的数据的,因此约束可以放开到IEntity。
另外ISpecification这个接口是一个规约接口,为仓储提供非id属性的条件查询实现。
Specification.cs实现如下
- public abstract class Specification<T> : ISpecification<T> where T : class, IEntity
- {
- public Expression<Func<T, bool>>? FilterCondition { get; protected init; }
- public List<Expression<Func<T, object>>> Includes { get; } = [];
- public List<string> IncludeStrings { get; } = [];
- public Expression<Func<T, object>>? OrderBy { get; private set; }
- public Expression<Func<T, object>>? OrderByDescending { get; private set; }
- public Expression<Func<T, object>>? GroupBy { get; private set; }
- public int Take { get; private set; }
- public int Skip { get; private set; }
- public bool IsPagingEnabled { get; private set; }
- protected void AddInclude(Expression<Func<T, object>> includeExpression)
- {
- Includes.Add(includeExpression);
- }
- protected void AddInclude(string includeString)
- {
- IncludeStrings.Add(includeString);
- }
- protected void SetPaging(int skip, int take)
- {
- Skip = skip;
- Take = take;
- IsPagingEnabled = true;
- }
- protected void SetOrderBy(Expression<Func<T, object>> orderByExpression)
- {
- OrderBy = orderByExpression;
- }
- protected void SetOrderByDescending(Expression<Func<T, object>> orderByDescExpression)
- {
- OrderByDescending = orderByDescExpression;
- }
- protected void SetGroupBy(Expression<Func<T, object>> groupByExpression)
- {
- GroupBy = groupByExpression;
- }
- }
-
接着我们继续创建IRepository.cs类
- //该仓储操作的聚合根实体类型
- public interface IRepository<T> : IReadRepository<T> where T : class, IEntity, IAggregateRoot
- {
- T Add(T entity);
- void Update(T entity);
- void Delete(T entity);
- Task<int> BatchDeleteAsync(ISpecification<T>? specification = null, CancellationToken cancellationToken = default);
- Task<int> SaveChangesAsync(CancellationToken cancellationToken = default);
- }
这个仓储接口继承IReadRepository,同时它的泛型约束添加了IAggregateRoot,也就是说它既拥有了能灵活查询实体对象的优势,又提供了仅聚合根能操作实体属性变更的约束。
-
我们继续添加一个IGenericRepository仓储,代码如下:
- //该仓储操作的通用实体类型
- public interface IGenericRepository<T> : IReadRepository<T> where T : class, IEntity
- {
- Task<int> SaveChangesAsync(CancellationToken cancellationToken = default);
- }
这个接口很有意思,它继承IReadRepository仓储后,多了一个SaveChangesAsync方法。也就是说这个仓储可以把查询获取的实体对象属性变更后,重新持久化。
再看这个仓储接口的约束是IEntity,也就是说它为那些简单的只需要做CRUD的实体提供了简单了仓储实现,而不依赖与聚合根。
-
在shared文件夹,创建DDM.DHT.Infrastructure.EFCore项目。添加Pomelo.EntityFrameworkCore.MySql 9.0.0引用,注意目前该版本是preview的版本。
-
创建Repositories文件夹,分别去实现IReadRepository、IRepository和IGenericRepository接口。代码我这类不贴了,可下载完整的项目文件来查看。
六、通用数据返回对象
一般我们在开发项目时,每个接口要返回数据类型是不一样的,比如有些接口返回的是整数、有的返回的是List等等,那么前端在解析不同的返回数据类型时就会很麻烦,为了解决这个问题,需要对返回结果进行统一的封装。
不仅仅是前端访问接口,即使是后端程序各个服务之间,或者各个层次之间都会有数据返回。
因此需要规范一个通用的数据返回对象。
- 在DDD.DHT.SharedKernel项目添加Return文件夹,创建统一的IResult接口和实现,代码如下。
- public enum ResultStatus
- {
- Ok,
- Error,
- Forbidden,
- Unauthorized,
- NotFound,
- Invalid
- }
-
- public interface IResult
- {
- IEnumerable<string>? Errors { get; }
-
- bool IsSuccess { get; }
-
- ResultStatus Status { get; }
-
- object? GetValue();
- }
-
- public class Result<T> : IResult
- {
- public Result() : this(default(T))
- {
- }
-
- protected internal Result(T? value)
- {
- Value = value;
- }
-
- protected internal Result(ResultStatus status)
- {
- Status = status;
- }
-
- public T? Value { get; init; }
-
- public bool IsSuccess => Status == ResultStatus.Ok;
-
- public IEnumerable<string>? Errors { get; protected set; }
-
- public ResultStatus Status { get; protected set; } = ResultStatus.Ok;
-
- public object? GetValue()
- {
- return Value;
- }
-
- public static implicit operator Result<T>(Result result)
- {
- return new Result<T>(default(T))
- {
- Status = result.Status,
- Errors = result.Errors
- };
- }
- }
-
- public class Result : Result<Result>
- {
- public Result() : base(null)
- {
- }
-
- protected internal Result(Result value) : base(value)
- {
- }
-
- protected internal Result(ResultStatus status) : base(status)
- {
- }
-
- public static Result From(IResult result)
- {
- return new Result(result.Status)
- {
- Errors = result.Errors
- };
- }
-
- public static Result Success()
- {
- return new Result(ResultStatus.Ok);
- }
-
- public static Result<T> Success<T>(T value)
- {
- return new Result<T>(value);
- }
-
- public static Result Failure()
- {
- return new Result(ResultStatus.Error);
- }
-
- public static Result Failure(params string[] errors)
- {
- return new Result(ResultStatus.Error)
- {
- Errors = errors.AsEnumerable()
- };
- }
-
- public static Result NotFound()
- {
- return new Result(ResultStatus.NotFound);
- }
-
- public static Result NotFound(params string[] error)
- {
- return new Result(ResultStatus.NotFound)
- {
- Errors = error.AsEnumerable()
- };
- }
-
- public static Result Forbidden()
- {
- return new Result(ResultStatus.Forbidden);
- }
-
- public static Result Unauthorized()
- {
- return new Result(ResultStatus.Unauthorized);
- }
-
- public static Result Invalid()
- {
- return new Result(ResultStatus.Invalid);
- }
-
- public static Result Invalid(params string[] errors)
- {
- return new Result(ResultStatus.Invalid)
- {
- Errors = errors.AsEnumerable()
- };
- }
- }
- 有了通用的泛型返回对象,我们再补偿一个通用的分页数据返回对象
- public class PagedMetaData
- {
- public int CurrentPage { get; set; }
- public int TotalPages { get; set; }
- public int PageSize { get; set; }
- public long TotalCount { get; set; }
-
- public bool HasPrevious => CurrentPage > 1;
-
- public bool HasNext => CurrentPage < TotalPages;
- }
-
- public class Pagination
- {
- private const int MaxPageSize = 100;
-
- public int PageNumber { get; set; } = 1;
-
- private int _pageSize = 10;
-
- public int PageSize
- {
- get => _pageSize;
- set => _pageSize = value > MaxPageSize ? MaxPageSize : value;
- }
- }
-
- public class PagedList<T> : List<T>
- {
- public PagedList(IEnumerable<T> items, long count, Pagination pagination)
- {
- MetaData = new PagedMetaData
- {
- TotalCount = count,
- PageSize = pagination.PageSize,
- CurrentPage = pagination.PageNumber,
- TotalPages = (int)Math.Ceiling(count / (double)pagination.PageSize)
- };
-
- AddRange(items);
- }
-
- public PagedMetaData MetaData { get; set; }
- }
- 进一步,我们再为仓储编写一个分页对象的扩展方法。在DDM.DHT.Infrastructure.EFCore项目创建QueryableExtensions.cs文件
- public static class QueryableExtensions
- {
- public static async Task<PagedList<T>?> ToPageListAsync<T>(this IQueryable<T> queryable, Pagination pagination) where T : class
- {
- var count = queryable.Count();
- var items = await queryable
- .Skip((pagination.PageNumber - 1) * pagination.PageSize)
- .Take(pagination.PageSize)
- .ToListAsync();
- return items.Count == 0 ? null : new PagedList<T>(items, count, pagination);
- }
- }
七、实现审计属性自动赋值
还记得我们在DDM.DHT.Core.Common项目中创建的3个抽象实体类吗?BaseEntity、BaseAuditEntity和AuditWithUserEntity。从这3个类中我们可以发现BaseEntity基类中我们把该项目的Id属性设置成了long类型,因为我们这个项目的默认Id就都用的long,所以AuditWithUserEntity中的创建和修改人Id也设置成了long。
现在有一个问题,在仓储层做数据持久化的时候能不能自动保存这些审计对象的值了,因为他们与业务无关,仅仅是审计时需要用到。我们不想在业务代码中去设置这些属性值。
答案肯定是可以做到的,现在我们来看一下如何实现。
在实现之前我们需要先定义一个IUser接口,用来保存系统的当前登录用户。而审计数据赋的值,就是IUser接口的用户Id。
在DDM.DHT.Core.Common项目中创建Interfaces文件夹,再创建IUser.cs文件,代码如下
- public enum UserType
- {
- User,
- Worker
- }
-
- public interface IUser
- {
- long? Id { get; }
-
- string? UserName { get; }
-
- UserType UserType { get; }
- }
UserType枚举值是用来区分操作是正常登录用户,还是后台作业用户。
接着在DDM.DHT.Infrastructure.EFCore项目创建Interceptors文件夹,创建AuditEntityInterceptor.cs类,代码如下:
- public class AuditEntityInterceptor(IUser currentUser) : SaveChangesInterceptor
- {
- public override InterceptionResult<int> SavingChanges(DbContextEventData eventData, InterceptionResult<int> result)
- {
- UpdateEntities(eventData.Context);
-
- return base.SavingChanges(eventData, result);
- }
-
- public override ValueTask<InterceptionResult<int>> SavingChangesAsync(DbContextEventData eventData,
- InterceptionResult<int> result, CancellationToken cancellationToken = default)
- {
- UpdateEntities(eventData.Context);
-
- return base.SavingChangesAsync(eventData, result, cancellationToken);
- }
-
- public void UpdateEntities(DbContext? context)
- {
- if (context == null) return;
-
- foreach (var entry in context.ChangeTracker.Entries<BaseAuditEntity>())
- {
- if (entry.State is not (EntityState.Added or EntityState.Modified)) continue;
-
- var now = DateTime.Now;
-
- if (entry.State == EntityState.Added)
- {
- entry.Entity.CreatedAt = now;
- entry.Entity.LastModifiedAt = now;
- }
- else
- {
- entry.Entity.LastModifiedAt = now;
- }
- }
-
- foreach (var entry in context.ChangeTracker.Entries<AuditWithUserEntity>())
- {
- if (entry.State is not (EntityState.Added or EntityState.Modified)) continue;
-
- if (currentUser.Id is null) continue;
-
- if (entry.State == EntityState.Added)
- entry.Entity.CreatedBy = currentUser.Id;
- else
- entry.Entity.LastModifiedBy = currentUser.Id;
- }
- }
- }
SaveChangesInterceptor是EFCore提供的实体在SaveChange时的拦截器,我们可以提供一个基于该拦截器的实现,在实体类型是继承BaseAuditEntity时,自动保存当前时间属性。如果实现是继承AuditWithUserEntity,则自动保存当前用户为操作记录人。
八、实现命令查询职责模式
- 命令查询职责分离(CQRS,Command Query Responsibility Segregation)是一种架构模式,它将系统中的写操作与读操作分离开来。CQRS 模式能够提升系统的可伸缩性、性能和可维护性,尤其适用于复杂的业务场景和高并发的系统。在传统的 CRUD(增、删、改、查)架构中,读写操作通常共享同一数据模型,而 CQRS 将这两者彻底分开,让它们有独立的模型、接口和存储方式。
CQRS 模式特别适合与事件溯源(Event Sourcing)一起使用,可以通过事件追溯系统的状态变化,确保数据的一致性。
该系统使用MediatR来提供CQRS的支持,我们先来看接口,在DDM.DHT.Core.Common项目创建Messaging,再创建4个类,代码如下:
- //ICommand.cs
- public interface ICommand<out TResponse> : IRequest<TResponse>;
-
- //ICommandHandler.cs
- public interface ICommandHandler<in TCommand, TResponse> : IRequestHandler<TCommand, TResponse> where TCommand : ICommand<TResponse>;
-
- //IQuery.cs
- public interface IQuery<out TResponse> : IRequest<TResponse>;
-
- //IQueryHandler.cs
- public interface IQueryHandler<in TQuery, TResponse> : IRequestHandler<TQuery, TResponse> where TQuery : IQuery<TResponse>;
MediatR中介者模式:这是一种旨在解耦对象之间通信的策略,MediatR是实现中介模式一种成熟的实现。我们只需要知道怎么使用它即可。从本质上讲,MediatR 在三种主要模式下运行:
- Request:涉及具有服务响应的单个接收方。
- Notification:在没有服务响应的情况下与多个接收方接合。
- StreamRequest:利用单个接收器进行具有服务响应的流操作。
就该项目而言,我们主要关注Request行为,尤其是探索MediatR的管道。
- MediatR Pipeline
在中介请求流中,发布者(发送_操作_)和订阅者(处理程序)之间存在明显的区别。
通过利用 MediatR 管道,我们可以有效地拦截此流程,并将自定义逻辑引入流程。
要实现管道,需要从接口继承IPipelineBehavior。接下来我们添加一个命令日志收集器,在DDM.DHT.Infrastructure.EFCore项目Interceptors文件夹添加LoggingBehavior.cs:
- public class LoggingBehavior<TRequest, TResponse> : IPipelineBehavior<TRequest, TResponse> where TRequest : ICommand<TResponse>
- {
- private readonly ILogger<LoggingBehavior<TRequest, TResponse>> _logger;
- public LoggingBehavior(ILogger<LoggingBehavior<TRequest, TResponse>> logger) => _logger = logger;
-
- public async Task<TResponse> Handle(TRequest request, RequestHandlerDelegate<TResponse> next, CancellationToken cancellationToken)
- {
- _logger.LogInformation("----- Handling command {CommandName} ({@Command})", GetGenericTypeName(request!), request);
- var response = await next();
- _logger.LogInformation("----- Command {CommandName} handled - response: {@Response}", GetGenericTypeName(request!), response);
-
- return response;
- }
- }
上面的代码所示,我们可以看到此方法允许在调用ICommand
九、实现领域事件
我们再来看一下MediatR的一种模式,Notification模式,它可以与零个或多个接受放通信,这种模式特别适合做事件发布。不让用户实体更新了,同时发出一个用户更新已经更新的事件。至于该事件被多少其他程序订阅,那是以后的事,这样事件发布者和订阅者就实现了解耦。
现在我们回到DDM.DHT.SharedKernel项目,在Domain文件夹添加BaseEvent.cs类
- public abstract class BaseEvent : INotification
- {
- /// <summary>
- /// 发生日期
- /// </summary>
- public DateTime DateOccurred { get; protected set; } = DateTime.Now;
- }
该类继承INotification接口,因此领域事件可以有零或者多个订阅者实现。
接着我们在DDM.DHT.Infrastructure.EFCore项目的Interceptors文件夹添加DispatchDomainEventsInterceptor.cs类
- public class DispatchDomainEventsInterceptor(IPublisher publisher) : SaveChangesInterceptor
- {
- public override int SavedChanges(SaveChangesCompletedEventData eventData, int result)
- {
- DispatchDomainEvents(eventData.Context).GetAwaiter().GetResult();
- return base.SavedChanges(eventData, result);
- }
-
- public override async ValueTask<int> SavedChangesAsync(SaveChangesCompletedEventData eventData, int result,
- CancellationToken cancellationToken = default)
- {
- await DispatchDomainEvents(eventData.Context);
- return await base.SavedChangesAsync(eventData, result, cancellationToken);
- }
-
- private async Task DispatchDomainEvents(DbContext? context)
- {
- if (context == null) return;
-
- var entities = context.ChangeTracker
- .Entries<BaseEntity>()
- .Where(e => e.Entity.DomainEvents.Any())
- .Select(e => e.Entity)
- .ToList();
-
- var domainEvents = entities
- .SelectMany(e => e.DomainEvents)
- .ToList();
-
- entities.ForEach(e => e.ClearDomainEvents());
-
- foreach (var domainEvent in domainEvents)
- await publisher.Publish(domainEvent);
- }
- }
上面代码的核心语句就是最后那句
,publisher对象是通过依赖注入的IPublisher实例,这是一个MediatR框架提供的对象,它的功能就是发布事件。
这段代码同时继承SaveChangesInterceptor抽象类,因此在实体执行SavedChange时,可以确保同时执行了DispatchDomainEvents方法,实现工作单元模式。
十、实现集成事件
上一部分内容中,领域事件的实现中我们可以知道一个领域事件可以实现多个订阅者,并且同时实现了工作单元模式。看起来好像很厉害,但实际上受到EFCore的事务特性的限制,领域事件只能在一个DbContext上下文中实现事务。如果是跨微服务的事件订阅,怎么办呢?
分布式事务的实现并不是一件容易的事情,我们的项目通过DotNetCore.CAP库来实现。
首先重温一下CAP的概念:
CAP定理是分布式系统中的一个重要理论,主要由埃里克·布鲁尔在2000年提出,并在2002年由塞思·吉尔伯特和南希·林奇正式证明。CAP定理指出,在一个分布式系统中,最多只能同时满足以下三项中的两项:
- 一致性(Consistency):所有节点在同一时间看到相同的数据。
- 可用性(Availability):每个请求都会在有限的时间内得到响应。
- 分区容错性(Partition Tolerance):系统在网络分区的情况下仍能继续工作。 因此,设计者在面对网络分区时,必须在一致性和可用性之间做出选择,无法同时保证这三项特性。
DotNetCore CAP通常运用在分布式事务的场景,主要解决的是不同程序之间远程调用的事务一致性。
集成CAP的实现:
-
在shared文件夹创建新的项目DDM.DHT.Infrastructure.CAP,需要实现CAP的微服务单独引用该项目。
-
创建EFDbContext类,实现开启事务的逻辑:
- public class EFDbContext(DbContextOptions options) : DbContext(options)
- {
- private IDbContextTransaction? _currentTransaction;
- public IDbContextTransaction? GetCurrentTransaction() => _currentTransaction;
- /// <summary>
- /// 事务是否开启
- /// </summary>
- public bool HasActiveTransaction => _currentTransaction != null;
- public Task<IDbContextTransaction> BeginTransactionAsync()
- {
- if (_currentTransaction != null) return null;
- _currentTransaction = Database.BeginTransaction();
- return Task.FromResult(_currentTransaction);
- }
- public async Task CommitTransactionAsync(IDbContextTransaction transaction)
- {
- if (transaction == null) throw new ArgumentNullException(nameof(transaction));
- if (transaction != _currentTransaction) throw new InvalidOperationException($"传入的事务{transaction.TransactionId}并不是当前事务");
- try
- {
- await SaveChangesAsync();
- transaction.Commit();
- }
- catch
- {
- RollbackTransaction();
- throw;
- }
- finally
- {
- if (_currentTransaction != null)
- {
- _currentTransaction.Dispose();
- _currentTransaction = null;
- }
- }
- }
- public void RollbackTransaction()
- {
- try
- {
- _currentTransaction?.Rollback();
- }
- finally
- {
- if (_currentTransaction != null)
- {
- _currentTransaction.Dispose();
- _currentTransaction = null;
- }
- }
- }
- }
-
创建TransactionBehavior.cs类,通过拦截器开启事务
- public class TransactionBehavior<TDbContext, TRequest, TResponse> : IPipelineBehavior<TRequest, TResponse> where TDbContext : EFDbContext where TRequest : ICommand<TResponse>
- {
- ILogger _logger;
- TDbContext _dbContext;
- ICapPublisher _capBus;
- public TransactionBehavior(TDbContext dbContext, ICapPublisher capBus, ILogger logger)
- {
- _dbContext = dbContext ?? throw new ArgumentNullException(nameof(dbContext));
- _capBus = capBus ?? throw new ArgumentNullException(nameof(capBus));
- _logger = logger ?? throw new ArgumentNullException(nameof(logger));
- }
- public async Task<TResponse> Handle(TRequest request, RequestHandlerDelegate<TResponse> next, CancellationToken cancellationToken)
- {
- var response = default(TResponse);
- var typeName = request.GetGenericTypeName();
- try
- {
- if (_dbContext.HasActiveTransaction)
- {
- return await next();
- }
- //定义数据库操作执行的策略,比如可以在里面嵌入一些重试的逻辑
- var strategy = _dbContext.Database.CreateExecutionStrategy();
- await strategy.ExecuteAsync(async () =>
- {
- using (var transaction = await _dbContext.BeginTransactionAsync(_capBus))
- {
- using (_logger.BeginScope("TransactionContext:{TransactionId}", transaction.TransactionId))
- {
- _logger.LogInformation("----- 开始事务 {TransactionId} {CommandName}({@Command})", transaction.TransactionId, typeName, request);
- response = await next();
- _logger.LogInformation("----- 提交事务 {TransactionId} {CommandName}", transaction.TransactionId, typeName);
- await _dbContext.CommitTransactionAsync(transaction);
- //Guid transactionId = transaction.TransactionId;
- }
- }
- });
- return response;
- }
- catch (Exception ex)
- {
- _logger.LogError(ex, "处理事务出错 {CommandName} ({@Command})", typeName, request);
- throw;
- }
- }
- }
-
创建DependencyInjection.cs类,注入CAP实例和配置
- public static class DependencyInjection
- {
- public static IServiceCollection AddCAP<TMasterDbContext>(
- this IServiceCollection services, IConfiguration configuration)
- where TMasterDbContext : DbContext
- {
- var masterDbConn = configuration.GetConnectionString("MasterDb");
- services.AddCap(x =>
- {
- x.UseEntityFramework<TMasterDbContext>();
- x.UseMySql(masterDbConn!);
- x.UseRabbitMQ(options =>
- {
- configuration.GetSection("RabbitMQ").Bind(options);
- });
- x.UseDashboard();
- });
- return services;
- }
- }
要使用CAP还需要2个步骤,目前还在框架阶段,后面两个步骤在使用的时候具体介绍,下面内容只是先把后续3个步骤写出来。
-
在要使用CAP的微服务项目中,让微服务的DbContext对象继承EFDbContext类,用来继承事务逻辑。
-
创建新的拦截器,注入,如
- public class UserDbContextTransactionBehavior<TRequest, TResponse> : TransactionBehavior<UserDbContext, TRequest, TResponse> where TRequest : ICommand<TResponse>
- {
- public UserDbContextTransactionBehavior(UserDbContext dbContext, ICapPublisher capBus, ILogger<UserDbContextTransactionBehavior<TRequest, TResponse>> logger) : base(dbContext, capBus, logger)
- {
- }
- }
这个类主要目的是把ICapPublisher注入进来
-
添加依赖注入
- services.AddTransient(typeof(IPipelineBehavior<,>), typeof(UserDbContextTransactionBehavior<,>));
ps:这里CAP是实现还依赖了RabbitMQ中间件,具体的使用效果以后遇到真实需求了再详细介绍。
到目前已经把领域层规范和EFCore基础设施搭建好了,还有一些其他基础设施层的通用封装,如Cache、Quartz、MessageBus等,这些也等以后再展开介绍。