Spiga

云原生电商微服务实战1:搭建基础框架

2024-09-07 22:29:37

  1. 一直以来想专心写一个基于云原生的微服务案例,但总角色要写的内容太多了。电商案例本身并不会太难,而要搭建好一整套云原生和微服务的基础设施,并不是一项轻松的工作。随着.Net Asprise的推出,再集成Dapr,开发和生产几乎可以保持一样的环境了。这让我惊喜,曾经我们手动搭建k8s,手动配置集群以及可以成为历史,我们需要的就是使用这些工具,重新把核心回归到业务上来。

​ 正好新公司也是做电商业务的,于是我终于决定开始写一套云原生电商微服务的案例。一方面可以学习Asprise和Dapr的相关知识,也是一次属性电商核心业务的机会。

​ 这次的内容会写得比较细,目标是把这个项目当成一个培训项目来写。让不了解云原生、不了解微服务和不了解电商业务的人,也能看得懂该项目。

一、DDD

DDD相关的文章以及很多了,我在几年前也写过一个DDD的学习系列,本文就只是做个总结,毕竟微服务是离不开DDD的。

解决什么问题

  • 问题域
  • 需求分析
  • 分析理解复杂业务领域问题
  • 准确反映业务语言

领域分析概念

  • 领域
  • 子域
  • 核心域、通用域和支撑域
  • 限界上下文

领域建模概念

  • 实体与值对象
  • 聚合与聚合根
  • 领域事件
  • 领域服务
  • 仓储
  • 工作单元模式
  • 规约
  • 应用服务
  • 防腐层

领域驱动设计

CQRS: 命令与查询分离,作为一种战术办法,是实现DDD建模领域的最佳途径之一。

充血模型: 让模型自带业务逻辑,业务属性的改变只有模型自身可以操作。

二、整洁架构

核心原则

  1. 独立于框架:整洁架构的系统核心业务逻辑不依赖于具体的软件框架,业务逻辑部分都能够独立运行。这样在框架更新或者替换时,对核心业务的影响最小。
  2. 可测试性:架构设计使得业务规则可以很方便地被测试。因为业务逻辑是独立于外部组件(如数据库、用户界面等)的,所以可以使用单元测试来验证业务规则的正确性。比如,在一个电商系统中,“计算商品折扣”的业务规则可以通过提供模拟的商品价格数据来进行单元测试,而不需要真正地连接数据库或者启动整个用户界面。
  3. 独立于UI(用户界面):业务逻辑与用户界面相互独立。这意味着可以方便地替换用户界面,比如从一个命令行界面转换为图形界面,或者从Web界面转换为移动应用界面,而不会影响到业务逻辑。
  4. 独立于数据库:系统的核心不依赖于数据库的类型和实现。不管是使用关系型数据库(如MySQL)还是非关系型数据库(如MongoDB),业务逻辑部分都能保持稳定。

洋葱架构

依赖原则:上图的同心圆代表软件的不同部分。总的来说,越往里面,代码级别越高。外面的圆是实现机制,而内层的圆是原则。

Entities:Entities封装了企业级的业务规则。一个Entity可以是一个带方法的对象,也可以是一个数据结构和方法集。Entities可以被用于企业的其他应用。

Use Cases: 这一层包含了应用特有的业务规则。它封装和实现了系统的所有用例。这些用例协调数据从entities的流入和流出,并且指导entities使用它们的企业级业务规则来达到用例的目标。

三、项目框架

上图是本电商项目每个微服务实现使用的分层架构,在具体实现每个微服务前,先为项目打好地基,编写一些通用类库。

上图是目前搭好的电商项目整体架构,其中:

  • aspire:.net 8开始提供的开发工具,用于构建和运行云原生应用程序
  • gateways:网关项目
  • services:各个微服务的实现
  • app:存放非核心域服务
  • works:后台作业
  • pakages:项目需要用到独立的可以打包的nuget包,可以拿到其他项目中使用
  • shared:项目的共享类库,为实现各个微服务提供共享的基础类。

四、领域层模型规范

本项目使用.NET 9开发,现在我们先从shared开始。

  1. 在shared文件夹创建DDD.DHT.SharedKernel类库项目,修改默认命名空间为DDD.DHT

  2. 创建文件夹Domain,分别实现如下3个类

    IEntity.cs

    1. namespace DDM.DHT.Domain;
    2.  
    3. public interface IEntity;
    4.  
    5. public interface IEntity<TId> : IEntity
    6. {
    7. TId Id { get; set; }
    8. }

    IAggregateRoot.cs

    1. namespace DDM.DHT.Domain;
    2.  
    3. public interface IAggregateRoot;

    BaseEntity.cs

    1. using System.ComponentModel.DataAnnotations;
    2. using System.ComponentModel.DataAnnotations.Schema;
    3.  
    4. namespace DDM.DHT.Domain;
    5.  
    6. public abstract class BaseEntity<TId> : IEntity<TId>
    7. {
    8. private readonly List<BaseEvent> _domainEvents = [];
    9.  
    10. [NotMapped]
    11. public IReadOnlyCollection<BaseEvent> DomainEvents => _domainEvents.AsReadOnly();
    12.  
    13. [Key]
    14. public virtual TId Id { get; set; } = default!;
    15.  
    16. public void AddDomainEvent(BaseEvent domainEvent)
    17. {
    18. _domainEvents.Add(domainEvent);
    19. }
    20.  
    21. public void RemoveDomainEvent(BaseEvent domainEvent)
    22. {
    23. _domainEvents.Remove(domainEvent);
    24. }
    25.  
    26. public void ClearDomainEvents()
    27. {
    28. _domainEvents.Clear();
    29. }
    30. }
  3. 在shared文件夹创建DDM.DHT.Core.Common类库项目,也修改默认命名空间为DDD.DHT。这个类库与SharedEvent的区别是,该类库实现主要是针对本项目的。而SharedEvent实际上可以做出nuget包供其他项目使用

  4. 在DDM.DHT.Core.Common项目中创建Domain文件夹,实现如下3个类:

    BaseEntity.cs

    1. namespace DDM.DHT.Domain;
    2.  
    3. public abstract class BaseEntity : BaseEntity<long>;

    BaseAuditEntity.cs

    1. namespace DDM.DHT.Domain;
    2.  
    3. public abstract class BaseAuditEntity : BaseEntity
    4. {
    5. public DateTime? CreatedAt { get; set; }
    6. public DateTime? LastModifiedAt { get; set; }
    7. }

    AuditWithUserEntity.cs

    1. namespace DDM.DHT.Domain;
    2.  
    3. public abstract class AuditWithUserEntity : BaseAuditEntity
    4. {
    5. public long? CreatedBy { get; set; }
    6. public long? LastModifiedBy { get; set; }
    7. }

五、通用仓储规范

仓储要重点说一下,按照DDD的要求,所有聚合的属性都是通过聚合根来操作的,而仓储是用来实现数据持久化的。因此根据DDD的要求,只有聚合根才会有仓储接口和具体实现。然而在具体开发中,这种约束会使得代码变得复杂,有些简单的,可能仅仅只有CRUD的实体,因为不是聚合根,没有自己的仓储实现,设计者可能被迫把这类实体也设计成聚合根了。

读过ABP源码的朋友应该清楚,ABP的仓储就不是直接约束的IAggregateRoot,而是改成了IEntity。这样完全放开,又存在未来开发者随便修改聚合属性,破坏聚合根的管辖职责。

结合这些情况,本项目对仓储基类的设计做了一些特别的约束。见代码分析:

  1. 在DDM.DHT.SharedKernel项目添加Repository文件夹,创建IReadRepository.cs

    1. public interface IReadRepository<T> where T : class, IEntity
    2. {
    3. Task<T?> GetByIdAsync<TKey>(TKey id, CancellationToken cancellationToken = default);
    4.  
    5. Task<List<T>> GetListAsync(ISpecification<T>? specification = null, CancellationToken cancellationToken = default);
    6.  
    7. Task<T?> GetSingleOrDefaultAsync(ISpecification<T>? specification = null,
    8. CancellationToken cancellationToken = default);
    9.  
    10. Task<int> CountAsync(ISpecification<T>? specification = null, CancellationToken cancellationToken = default);
    11.  
    12. Task<bool> AnyAsync(ISpecification<T>? specification = null, CancellationToken cancellationToken = default);
    13. }

    注意看,这个接口的泛型约束是IEntity,而接口方法只是定义了一些读方法。如果只是读数据,该仓储是不会破坏到聚合内部的数据的,因此约束可以放开到IEntity。

    另外ISpecification这个接口是一个规约接口,为仓储提供非id属性的条件查询实现。

    Specification.cs实现如下

    1. public abstract class Specification<T> : ISpecification<T> where T : class, IEntity
    2. {
    3. public Expression<Func<T, bool>>? FilterCondition { get; protected init; }
    4. public List<Expression<Func<T, object>>> Includes { get; } = [];
    5. public List<string> IncludeStrings { get; } = [];
    6. public Expression<Func<T, object>>? OrderBy { get; private set; }
    7. public Expression<Func<T, object>>? OrderByDescending { get; private set; }
    8. public Expression<Func<T, object>>? GroupBy { get; private set; }
    9. public int Take { get; private set; }
    10. public int Skip { get; private set; }
    11. public bool IsPagingEnabled { get; private set; }
    12.  
    13. protected void AddInclude(Expression<Func<T, object>> includeExpression)
    14. {
    15. Includes.Add(includeExpression);
    16. }
    17.  
    18. protected void AddInclude(string includeString)
    19. {
    20. IncludeStrings.Add(includeString);
    21. }
    22.  
    23. protected void SetPaging(int skip, int take)
    24. {
    25. Skip = skip;
    26. Take = take;
    27. IsPagingEnabled = true;
    28. }
    29.  
    30. protected void SetOrderBy(Expression<Func<T, object>> orderByExpression)
    31. {
    32. OrderBy = orderByExpression;
    33. }
    34.  
    35. protected void SetOrderByDescending(Expression<Func<T, object>> orderByDescExpression)
    36. {
    37. OrderByDescending = orderByDescExpression;
    38. }
    39.  
    40. protected void SetGroupBy(Expression<Func<T, object>> groupByExpression)
    41. {
    42. GroupBy = groupByExpression;
    43. }
    44. }
  2. 接着我们继续创建IRepository.cs类

    1. //该仓储操作的聚合根实体类型
    2. public interface IRepository<T> : IReadRepository<T> where T : class, IEntity, IAggregateRoot
    3. {
    4. T Add(T entity);
    5.  
    6. void Update(T entity);
    7.  
    8. void Delete(T entity);
    9.  
    10. Task<int> BatchDeleteAsync(ISpecification<T>? specification = null, CancellationToken cancellationToken = default);
    11.  
    12. Task<int> SaveChangesAsync(CancellationToken cancellationToken = default);
    13. }

    这个仓储接口继承IReadRepository,同时它的泛型约束添加了IAggregateRoot,也就是说它既拥有了能灵活查询实体对象的优势,又提供了仅聚合根能操作实体属性变更的约束。

  3. 我们继续添加一个IGenericRepository仓储,代码如下:

    1. //该仓储操作的通用实体类型
    2. public interface IGenericRepository<T> : IReadRepository<T> where T : class, IEntity
    3. {
    4. Task<int> SaveChangesAsync(CancellationToken cancellationToken = default);
    5. }

    这个接口很有意思,它继承IReadRepository仓储后,多了一个SaveChangesAsync方法。也就是说这个仓储可以把查询获取的实体对象属性变更后,重新持久化。

    再看这个仓储接口的约束是IEntity,也就是说它为那些简单的只需要做CRUD的实体提供了简单了仓储实现,而不依赖与聚合根。

  4. 在shared文件夹,创建DDM.DHT.Infrastructure.EFCore项目。添加Pomelo.EntityFrameworkCore.MySql 9.0.0引用,注意目前该版本是preview的版本。

  5. 创建Repositories文件夹,分别去实现IReadRepository、IRepository和IGenericRepository接口。代码我这类不贴了,可下载完整的项目文件来查看。

六、通用数据返回对象

一般我们在开发项目时,每个接口要返回数据类型是不一样的,比如有些接口返回的是整数、有的返回的是List等等,那么前端在解析不同的返回数据类型时就会很麻烦,为了解决这个问题,需要对返回结果进行统一的封装。

不仅仅是前端访问接口,即使是后端程序各个服务之间,或者各个层次之间都会有数据返回。

因此需要规范一个通用的数据返回对象。

  1. 在DDD.DHT.SharedKernel项目添加Return文件夹,创建统一的IResult接口和实现,代码如下。
  1. public enum ResultStatus
  2. {
  3. Ok,
  4. Error,
  5. Forbidden,
  6. Unauthorized,
  7. NotFound,
  8. Invalid
  9. }
  10.  
  11. public interface IResult
  12. {
  13. IEnumerable<string>? Errors { get; }
  14.  
  15. bool IsSuccess { get; }
  16.  
  17. ResultStatus Status { get; }
  18.  
  19. object? GetValue();
  20. }
  21.  
  22. public class Result<T> : IResult
  23. {
  24. public Result() : this(default(T))
  25. {
  26. }
  27.  
  28. protected internal Result(T? value)
  29. {
  30. Value = value;
  31. }
  32.  
  33. protected internal Result(ResultStatus status)
  34. {
  35. Status = status;
  36. }
  37.  
  38. public T? Value { get; init; }
  39.  
  40. public bool IsSuccess => Status == ResultStatus.Ok;
  41.  
  42. public IEnumerable<string>? Errors { get; protected set; }
  43.  
  44. public ResultStatus Status { get; protected set; } = ResultStatus.Ok;
  45.  
  46. public object? GetValue()
  47. {
  48. return Value;
  49. }
  50.  
  51. public static implicit operator Result<T>(Result result)
  52. {
  53. return new Result<T>(default(T))
  54. {
  55. Status = result.Status,
  56. Errors = result.Errors
  57. };
  58. }
  59. }
  60.  
  61. public class Result : Result<Result>
  62. {
  63. public Result() : base(null)
  64. {
  65. }
  66.  
  67. protected internal Result(Result value) : base(value)
  68. {
  69. }
  70.  
  71. protected internal Result(ResultStatus status) : base(status)
  72. {
  73. }
  74.  
  75. public static Result From(IResult result)
  76. {
  77. return new Result(result.Status)
  78. {
  79. Errors = result.Errors
  80. };
  81. }
  82.  
  83. public static Result Success()
  84. {
  85. return new Result(ResultStatus.Ok);
  86. }
  87.  
  88. public static Result<T> Success<T>(T value)
  89. {
  90. return new Result<T>(value);
  91. }
  92.  
  93. public static Result Failure()
  94. {
  95. return new Result(ResultStatus.Error);
  96. }
  97.  
  98. public static Result Failure(params string[] errors)
  99. {
  100. return new Result(ResultStatus.Error)
  101. {
  102. Errors = errors.AsEnumerable()
  103. };
  104. }
  105.  
  106. public static Result NotFound()
  107. {
  108. return new Result(ResultStatus.NotFound);
  109. }
  110.  
  111. public static Result NotFound(params string[] error)
  112. {
  113. return new Result(ResultStatus.NotFound)
  114. {
  115. Errors = error.AsEnumerable()
  116. };
  117. }
  118.  
  119. public static Result Forbidden()
  120. {
  121. return new Result(ResultStatus.Forbidden);
  122. }
  123.  
  124. public static Result Unauthorized()
  125. {
  126. return new Result(ResultStatus.Unauthorized);
  127. }
  128.  
  129. public static Result Invalid()
  130. {
  131. return new Result(ResultStatus.Invalid);
  132. }
  133.  
  134. public static Result Invalid(params string[] errors)
  135. {
  136. return new Result(ResultStatus.Invalid)
  137. {
  138. Errors = errors.AsEnumerable()
  139. };
  140. }
  141. }
  1. 有了通用的泛型返回对象,我们再补偿一个通用的分页数据返回对象
  1. public class PagedMetaData
  2. {
  3. public int CurrentPage { get; set; }
  4. public int TotalPages { get; set; }
  5. public int PageSize { get; set; }
  6. public long TotalCount { get; set; }
  7.  
  8. public bool HasPrevious => CurrentPage > 1;
  9.  
  10. public bool HasNext => CurrentPage < TotalPages;
  11. }
  12.  
  13. public class Pagination
  14. {
  15. private const int MaxPageSize = 100;
  16.  
  17. public int PageNumber { get; set; } = 1;
  18.  
  19. private int _pageSize = 10;
  20.  
  21. public int PageSize
  22. {
  23. get => _pageSize;
  24. set => _pageSize = value > MaxPageSize ? MaxPageSize : value;
  25. }
  26. }
  27.  
  28. public class PagedList<T> : List<T>
  29. {
  30. public PagedList(IEnumerable<T> items, long count, Pagination pagination)
  31. {
  32. MetaData = new PagedMetaData
  33. {
  34. TotalCount = count,
  35. PageSize = pagination.PageSize,
  36. CurrentPage = pagination.PageNumber,
  37. TotalPages = (int)Math.Ceiling(count / (double)pagination.PageSize)
  38. };
  39.  
  40. AddRange(items);
  41. }
  42.  
  43. public PagedMetaData MetaData { get; set; }
  44. }
  1. 进一步,我们再为仓储编写一个分页对象的扩展方法。在DDM.DHT.Infrastructure.EFCore项目创建QueryableExtensions.cs文件
  1. public static class QueryableExtensions
  2. {
  3. public static async Task<PagedList<T>?> ToPageListAsync<T>(this IQueryable<T> queryable, Pagination pagination) where T : class
  4. {
  5. var count = queryable.Count();
  6. var items = await queryable
  7. .Skip((pagination.PageNumber - 1) * pagination.PageSize)
  8. .Take(pagination.PageSize)
  9. .ToListAsync();
  10. return items.Count == 0 ? null : new PagedList<T>(items, count, pagination);
  11. }
  12. }

七、实现审计属性自动赋值

还记得我们在DDM.DHT.Core.Common项目中创建的3个抽象实体类吗?BaseEntity、BaseAuditEntity和AuditWithUserEntity。从这3个类中我们可以发现BaseEntity基类中我们把该项目的Id属性设置成了long类型,因为我们这个项目的默认Id就都用的long,所以AuditWithUserEntity中的创建和修改人Id也设置成了long。

现在有一个问题,在仓储层做数据持久化的时候能不能自动保存这些审计对象的值了,因为他们与业务无关,仅仅是审计时需要用到。我们不想在业务代码中去设置这些属性值。

答案肯定是可以做到的,现在我们来看一下如何实现。

在实现之前我们需要先定义一个IUser接口,用来保存系统的当前登录用户。而审计数据赋的值,就是IUser接口的用户Id。

在DDM.DHT.Core.Common项目中创建Interfaces文件夹,再创建IUser.cs文件,代码如下

  1. public enum UserType
  2. {
  3. User,
  4. Worker
  5. }
  6.  
  7. public interface IUser
  8. {
  9. long? Id { get; }
  10.  
  11. string? UserName { get; }
  12.  
  13. UserType UserType { get; }
  14. }

UserType枚举值是用来区分操作是正常登录用户,还是后台作业用户。

接着在DDM.DHT.Infrastructure.EFCore项目创建Interceptors文件夹,创建AuditEntityInterceptor.cs类,代码如下:

  1. public class AuditEntityInterceptor(IUser currentUser) : SaveChangesInterceptor
  2. {
  3. public override InterceptionResult<int> SavingChanges(DbContextEventData eventData, InterceptionResult<int> result)
  4. {
  5. UpdateEntities(eventData.Context);
  6.  
  7. return base.SavingChanges(eventData, result);
  8. }
  9.  
  10. public override ValueTask<InterceptionResult<int>> SavingChangesAsync(DbContextEventData eventData,
  11. InterceptionResult<int> result, CancellationToken cancellationToken = default)
  12. {
  13. UpdateEntities(eventData.Context);
  14.  
  15. return base.SavingChangesAsync(eventData, result, cancellationToken);
  16. }
  17.  
  18. public void UpdateEntities(DbContext? context)
  19. {
  20. if (context == null) return;
  21.  
  22. foreach (var entry in context.ChangeTracker.Entries<BaseAuditEntity>())
  23. {
  24. if (entry.State is not (EntityState.Added or EntityState.Modified)) continue;
  25.  
  26. var now = DateTime.Now;
  27.  
  28. if (entry.State == EntityState.Added)
  29. {
  30. entry.Entity.CreatedAt = now;
  31. entry.Entity.LastModifiedAt = now;
  32. }
  33. else
  34. {
  35. entry.Entity.LastModifiedAt = now;
  36. }
  37. }
  38.  
  39. foreach (var entry in context.ChangeTracker.Entries<AuditWithUserEntity>())
  40. {
  41. if (entry.State is not (EntityState.Added or EntityState.Modified)) continue;
  42.  
  43. if (currentUser.Id is null) continue;
  44.  
  45. if (entry.State == EntityState.Added)
  46. entry.Entity.CreatedBy = currentUser.Id;
  47. else
  48. entry.Entity.LastModifiedBy = currentUser.Id;
  49. }
  50. }
  51. }

SaveChangesInterceptor是EFCore提供的实体在SaveChange时的拦截器,我们可以提供一个基于该拦截器的实现,在实体类型是继承BaseAuditEntity时,自动保存当前时间属性。如果实现是继承AuditWithUserEntity,则自动保存当前用户为操作记录人。

八、实现命令查询职责模式

  1. 命令查询职责分离(CQRS,Command Query Responsibility Segregation)是一种架构模式,它将系统中的写操作读操作分离开来。CQRS 模式能够提升系统的可伸缩性、性能和可维护性,尤其适用于复杂的业务场景和高并发的系统。在传统的 CRUD(增、删、改、查)架构中,读写操作通常共享同一数据模型,而 CQRS 将这两者彻底分开,让它们有独立的模型、接口和存储方式。

CQRS 模式特别适合与事件溯源(Event Sourcing)一起使用,可以通过事件追溯系统的状态变化,确保数据的一致性。

该系统使用MediatR来提供CQRS的支持,我们先来看接口,在DDM.DHT.Core.Common项目创建Messaging,再创建4个类,代码如下:

  1. //ICommand.cs
  2. public interface ICommand<out TResponse> : IRequest<TResponse>;
  3.  
  4. //ICommandHandler.cs
  5. public interface ICommandHandler<in TCommand, TResponse> : IRequestHandler<TCommand, TResponse> where TCommand : ICommand<TResponse>;
  6.  
  7. //IQuery.cs
  8. public interface IQuery<out TResponse> : IRequest<TResponse>;
  9.  
  10. //IQueryHandler.cs
  11. public interface IQueryHandler<in TQuery, TResponse> : IRequestHandler<TQuery, TResponse> where TQuery : IQuery<TResponse>;

MediatR中介者模式:这是一种旨在解耦对象之间通信的策略,MediatR是实现中介模式一种成熟的实现。我们只需要知道怎么使用它即可。从本质上讲,MediatR 在三种主要模式下运行:

  • Request:涉及具有服务响应的单个接收方。
  • Notification:在没有服务响应的情况下与多个接收方接合。
  • StreamRequest:利用单个接收器进行具有服务响应的流操作。

就该项目而言,我们主要关注Request行为,尤其是探索MediatR的管道。

  1. MediatR Pipeline

在中介请求流中,发布者(发送_操作_)和订阅者(处理程序)之间存在明显的区别。

通过利用 MediatR 管道,我们可以有效地拦截此流程,并将自定义逻辑引入流程。

要实现管道,需要从接口继承IPipelineBehavior。接下来我们添加一个命令日志收集器,在DDM.DHT.Infrastructure.EFCore项目Interceptors文件夹添加LoggingBehavior.cs:

  1. public class LoggingBehavior<TRequest, TResponse> : IPipelineBehavior<TRequest, TResponse> where TRequest : ICommand<TResponse>
  2. {
  3. private readonly ILogger<LoggingBehavior<TRequest, TResponse>> _logger;
  4. public LoggingBehavior(ILogger<LoggingBehavior<TRequest, TResponse>> logger) => _logger = logger;
  5.  
  6. public async Task<TResponse> Handle(TRequest request, RequestHandlerDelegate<TResponse> next, CancellationToken cancellationToken)
  7. {
  8. _logger.LogInformation("----- Handling command {CommandName} ({@Command})", GetGenericTypeName(request!), request);
  9. var response = await next();
  10. _logger.LogInformation("----- Command {CommandName} handled - response: {@Response}", GetGenericTypeName(request!), response);
  11.  
  12. return response;
  13. }
  14. }

上面的代码所示,我们可以看到此方法允许在调用ICommand的管道的后续步骤之前和之后插入逻辑。假设这些命令的日志都被记录到CommandEvent的数据库,或者其他持久化存储中。我们后续时候可以对分析这些日志,实现事件溯源。

九、实现领域事件

我们再来看一下MediatR的一种模式,Notification模式,它可以与零个或多个接受放通信,这种模式特别适合做事件发布。不让用户实体更新了,同时发出一个用户更新已经更新的事件。至于该事件被多少其他程序订阅,那是以后的事,这样事件发布者和订阅者就实现了解耦。

现在我们回到DDM.DHT.SharedKernel项目,在Domain文件夹添加BaseEvent.cs类

  1. public abstract class BaseEvent : INotification
  2. {
  3. /// <summary>
  4. /// 发生日期
  5. /// </summary>
  6. public DateTime DateOccurred { get; protected set; } = DateTime.Now;
  7. }

该类继承INotification接口,因此领域事件可以有零或者多个订阅者实现。

接着我们在DDM.DHT.Infrastructure.EFCore项目的Interceptors文件夹添加DispatchDomainEventsInterceptor.cs类

  1. public class DispatchDomainEventsInterceptor(IPublisher publisher) : SaveChangesInterceptor
  2. {
  3. public override int SavedChanges(SaveChangesCompletedEventData eventData, int result)
  4. {
  5. DispatchDomainEvents(eventData.Context).GetAwaiter().GetResult();
  6. return base.SavedChanges(eventData, result);
  7. }
  8.  
  9. public override async ValueTask<int> SavedChangesAsync(SaveChangesCompletedEventData eventData, int result,
  10. CancellationToken cancellationToken = default)
  11. {
  12. await DispatchDomainEvents(eventData.Context);
  13. return await base.SavedChangesAsync(eventData, result, cancellationToken);
  14. }
  15.  
  16. private async Task DispatchDomainEvents(DbContext? context)
  17. {
  18. if (context == null) return;
  19.  
  20. var entities = context.ChangeTracker
  21. .Entries<BaseEntity>()
  22. .Where(e => e.Entity.DomainEvents.Any())
  23. .Select(e => e.Entity)
  24. .ToList();
  25.  
  26. var domainEvents = entities
  27. .SelectMany(e => e.DomainEvents)
  28. .ToList();
  29.  
  30. entities.ForEach(e => e.ClearDomainEvents());
  31.  
  32. foreach (var domainEvent in domainEvents)
  33. await publisher.Publish(domainEvent);
  34. }
  35. }

上面代码的核心语句就是最后那句

  1. await publisher.Publish(domainEvent)
,publisher对象是通过依赖注入的IPublisher实例,这是一个MediatR框架提供的对象,它的功能就是发布事件。

这段代码同时继承SaveChangesInterceptor抽象类,因此在实体执行SavedChange时,可以确保同时执行了DispatchDomainEvents方法,实现工作单元模式。

十、实现集成事件

上一部分内容中,领域事件的实现中我们可以知道一个领域事件可以实现多个订阅者,并且同时实现了工作单元模式。看起来好像很厉害,但实际上受到EFCore的事务特性的限制,领域事件只能在一个DbContext上下文中实现事务。如果是跨微服务的事件订阅,怎么办呢?

分布式事务的实现并不是一件容易的事情,我们的项目通过DotNetCore.CAP库来实现。

首先重温一下CAP的概念:

CAP定理是分布式系统中的一个重要理论,主要由埃里克·布鲁尔在2000年提出,并在2002年由塞思·吉尔伯特南希·林奇正式证明。CAP定理指出,在一个分布式系统中,最多只能同时满足以下三项中的两项:

  • 一致性(Consistency):所有节点在同一时间看到相同的数据。
  • 可用性(Availability):每个请求都会在有限的时间内得到响应。
  • 分区容错性(Partition Tolerance):系统在网络分区的情况下仍能继续工作。 因此,设计者在面对网络分区时,必须在一致性和可用性之间做出选择,无法同时保证这三项特性。

DotNetCore CAP通常运用在分布式事务的场景,主要解决的是不同程序之间远程调用的事务一致性。

集成CAP的实现

  1. 在shared文件夹创建新的项目DDM.DHT.Infrastructure.CAP,需要实现CAP的微服务单独引用该项目。

  2. 创建EFDbContext类,实现开启事务的逻辑:

    1. public class EFDbContext(DbContextOptions options) : DbContext(options)
    2. {
    3. private IDbContextTransaction? _currentTransaction;
    4. public IDbContextTransaction? GetCurrentTransaction() => _currentTransaction;
    5. /// <summary>
    6. /// 事务是否开启
    7. /// </summary>
    8. public bool HasActiveTransaction => _currentTransaction != null;
    9.  
    10. public Task<IDbContextTransaction> BeginTransactionAsync()
    11. {
    12. if (_currentTransaction != null) return null;
    13. _currentTransaction = Database.BeginTransaction();
    14. return Task.FromResult(_currentTransaction);
    15. }
    16.  
    17. public async Task CommitTransactionAsync(IDbContextTransaction transaction)
    18. {
    19. if (transaction == null) throw new ArgumentNullException(nameof(transaction));
    20. if (transaction != _currentTransaction) throw new InvalidOperationException($"传入的事务{transaction.TransactionId}并不是当前事务");
    21.  
    22. try
    23. {
    24. await SaveChangesAsync();
    25. transaction.Commit();
    26. }
    27. catch
    28. {
    29. RollbackTransaction();
    30. throw;
    31. }
    32. finally
    33. {
    34. if (_currentTransaction != null)
    35. {
    36. _currentTransaction.Dispose();
    37. _currentTransaction = null;
    38. }
    39. }
    40. }
    41.  
    42. public void RollbackTransaction()
    43. {
    44. try
    45. {
    46. _currentTransaction?.Rollback();
    47. }
    48. finally
    49. {
    50. if (_currentTransaction != null)
    51. {
    52. _currentTransaction.Dispose();
    53. _currentTransaction = null;
    54. }
    55. }
    56. }
    57. }
  3. 创建TransactionBehavior.cs类,通过拦截器开启事务

    1. public class TransactionBehavior<TDbContext, TRequest, TResponse> : IPipelineBehavior<TRequest, TResponse> where TDbContext : EFDbContext where TRequest : ICommand<TResponse>
    2. {
    3. ILogger _logger;
    4. TDbContext _dbContext;
    5. ICapPublisher _capBus;
    6. public TransactionBehavior(TDbContext dbContext, ICapPublisher capBus, ILogger logger)
    7. {
    8. _dbContext = dbContext ?? throw new ArgumentNullException(nameof(dbContext));
    9. _capBus = capBus ?? throw new ArgumentNullException(nameof(capBus));
    10. _logger = logger ?? throw new ArgumentNullException(nameof(logger));
    11. }
    12.  
    13.  
    14. public async Task<TResponse> Handle(TRequest request, RequestHandlerDelegate<TResponse> next, CancellationToken cancellationToken)
    15. {
    16. var response = default(TResponse);
    17. var typeName = request.GetGenericTypeName();
    18.  
    19. try
    20. {
    21. if (_dbContext.HasActiveTransaction)
    22. {
    23. return await next();
    24. }
    25. //定义数据库操作执行的策略,比如可以在里面嵌入一些重试的逻辑
    26. var strategy = _dbContext.Database.CreateExecutionStrategy();
    27.  
    28. await strategy.ExecuteAsync(async () =>
    29. {
    30. using (var transaction = await _dbContext.BeginTransactionAsync(_capBus))
    31. {
    32. using (_logger.BeginScope("TransactionContext:{TransactionId}", transaction.TransactionId))
    33. {
    34. _logger.LogInformation("----- 开始事务 {TransactionId} {CommandName}({@Command})", transaction.TransactionId, typeName, request);
    35. response = await next();
    36. _logger.LogInformation("----- 提交事务 {TransactionId} {CommandName}", transaction.TransactionId, typeName);
    37.  
    38. await _dbContext.CommitTransactionAsync(transaction);
    39. //Guid transactionId = transaction.TransactionId;
    40. }
    41. }
    42. });
    43. return response;
    44. }
    45. catch (Exception ex)
    46. {
    47. _logger.LogError(ex, "处理事务出错 {CommandName} ({@Command})", typeName, request);
    48. throw;
    49. }
    50. }
    51. }
  4. 创建DependencyInjection.cs类,注入CAP实例和配置

    1. public static class DependencyInjection
    2. {
    3. public static IServiceCollection AddCAP<TMasterDbContext>(
    4. this IServiceCollection services, IConfiguration configuration)
    5. where TMasterDbContext : DbContext
    6. {
    7. var masterDbConn = configuration.GetConnectionString("MasterDb");
    8.  
    9. services.AddCap(x =>
    10. {
    11. x.UseEntityFramework<TMasterDbContext>();
    12. x.UseMySql(masterDbConn!);
    13. x.UseRabbitMQ(options =>
    14. {
    15. configuration.GetSection("RabbitMQ").Bind(options);
    16. });
    17. x.UseDashboard();
    18. });
    19.  
    20. return services;
    21. }
    22. }

    要使用CAP还需要2个步骤,目前还在框架阶段,后面两个步骤在使用的时候具体介绍,下面内容只是先把后续3个步骤写出来。

  5. 在要使用CAP的微服务项目中,让微服务的DbContext对象继承EFDbContext类,用来继承事务逻辑。

  6. 创建新的拦截器,注入,如

    1. public class UserDbContextTransactionBehavior<TRequest, TResponse> : TransactionBehavior<UserDbContext, TRequest, TResponse> where TRequest : ICommand<TResponse>
    2. {
    3. public UserDbContextTransactionBehavior(UserDbContext dbContext, ICapPublisher capBus, ILogger<UserDbContextTransactionBehavior<TRequest, TResponse>> logger) : base(dbContext, capBus, logger)
    4. {
    5. }
    6. }

    这个类主要目的是把ICapPublisher注入进来

  7. 添加依赖注入

    1. services.AddTransient(typeof(IPipelineBehavior<,>), typeof(UserDbContextTransactionBehavior<,>));

ps:这里CAP是实现还依赖了RabbitMQ中间件,具体的使用效果以后遇到真实需求了再详细介绍。

到目前已经把领域层规范和EFCore基础设施搭建好了,还有一些其他基础设施层的通用封装,如Cache、Quartz、MessageBus等,这些也等以后再展开介绍。