Spiga

微服务架构(六):事件总线

2018-04-04 14:27:26

什么是事物

例如:事物 所有看到的一切都是事物,不能看到的也是事物

例如:团队微服务,成员微服务,聚合微服务,网关api,认证中心等等包括类,对象

所有的事件都是事物变化的结果

大家接触事件最早就是在js 或者是c#高级特性。大家对于事件不默认,但是对于事件不是很好理解

什么是事件

事件就是指事物状态的变化,每一次事物变化的结果都称作为事件

什么是事件总线

就是用来管理所有的事件的一种机制就称作为事件总线

包括事件发布,事件存储,事件订阅,事件处理的统称

作用:

事件总线是一种机制,它允许不同的组件彼此通信而不彼此了解。 组件可以将事件发送到Eventbus,而无需知道是谁来接听或有多少其他人来接听。 组件也可以侦听Eventbus上的事件,而无需知道谁发送了事件。 这样,组件可以相互通信而无需相互依赖。 同样,很容易替换一个组件。 只要新组件了解正在发送和接收的事件,其他组件就永远不会知道.

为什么要使用事件总线

将微服务系统各组件之间进行解耦

使用业务的发展来说

事件总线框架

CAP

masstransit

CAP内部概念

事件 : 就是一些状态信息

发布者:发布事件的角色 cap

订阅者:订阅消费事件的角色 cap

消息传输器:传输事件

消息存储器:存储事件

CAP存储事件消息队列类型Transport

Azure

rabbitmq

kafaka

In Memory Queue

CAP存储事件持久化类型

SQL Server

MySQL

PostgreSQL

MongoDB

InMemoryStorage

CAP事件监控

Dashboard

微服务系统中如何使用CAP

条件

  1. 微服务系统
  2. RabbitMQ
  3. SqlServer
  4. CAP

步骤

  1. 微服务系统准备

    微服务系统全部准备完毕

  2. RabbitMQ准备

    2.1 环境准备

    Erlang下载地址:https://www.erlang.org/downloads
    RabbitMQ下载地址:https://www.rabbitmq.com/download.html

    2.2 RabbitMQ 启动

    1、在安装目录下添加可视化插件

rabbitmq-plugins enable rabbitmq_management
  1. 在安装目录下启动
rabbitmq-server
  1. 查看rabbitmq状态
rabbitmqctl status
  1. 在浏览器输入http://127.0.0.1:15672
    访问rabbitmq后台系统

  2. SqlServer准备

    SqlServer启动,安装

  3. CAP准备

    4.1 CAP环境

    CAP官网地址:https://cap.dotnetcore.xyz/user-guide/zh/monitoring/dashboard/

    4.2 CPA配置

CAP Nuget DotNetCore.CAP
CAP传输器Nuget DotNetCore.CAP.RabbitMQ
CAP持久化DotNetCore.CAP.SqlServer

1.在startup.cs中添加

// 8、添加事件总线cap
            services.AddCap(x => {
                // 8.1 使用内存存储消息(消息发送失败处理)
                x.UseInMemoryStorage();

                // 8.2 使用RabbitMQ进行事件中心处理
                x.UseRabbitMQ(rb => {
                    rb.HostName = "localhost";
                    rb.UserName = "guest";
                    rb.Password = "guest";
                    rb.Port = 5672;
                    rb.VirtualHost = "/";
                });
            });

2 在AggregateController.cs中注入ICapPublisher

private readonly ICapPublisher capPublisher;
public TeamsController(ICapPublisher capPublisher)
    {
        this.capPublisher = capPublisher;
    }

3 在微服务项目2中方法上添加特性[CapSubscribe]

/// <summary>
        /// 视频添加(异步添加)
        /// </summary>
        /// <param name="Video"></param>
        /// <returns></returns>
        [NonAction]
        [CapSubscribe("tontcap")]
        public ActionResult<Video> PostVideo(Video Video)
        {
            videoService.Create(Video);
       return CreatedAtAction("GetVideo", new { id = Video.Id }, Video);
    }
  1. 效果展示

RabbitMQ宕机情况

步骤

  1. 将RabbitMQ直接关闭

事件消息无法发送,存储到内存缓存中

  1. 当将RabbitMQ启动后,消息正常发送

    内部使用定时器轮询机制实现

AggregateService宕机情况

AggregateService 执行业务成功,发送消息前宕机

使用本地消息表解决(思想:持久化操作)

条件

  1. 本地消息表

步骤

  1. 在AggregateService服务中

    1.1 创建Context文件,然后在Context文件夹内创建AggregateContext

/// <summary>
    /// Aggregate服务上下文
    /// </summary>
    public class AggregateContext : DbContext
    {
        public AggregateContext(DbContextOptions<AggregateContext> options) : base(options)
        {
        }
}

1.2 在appsettings.json中添加

{
  "ConnectionStrings": {
    "DefaultConnection": "Data Source=.;Initial Catalog=aggregateservice;Persist Security Info=True;User ID=sa;Password="
  }
}

1.3 在startup.cs中添加消息持久化

// 9、注册上下文到IOC容器
            services.AddDbContext<AggregateContext>(options =>
            {
                options.UseSqlServer(Configuration.GetConnectionString("DefaultConnection"));
            });
        // 8、添加事件总线cap
        services.AddCap(x => {
            // 8.1 使用EntityFramework进行存储操作
            x.UseEntityFramework<AggregateContext>();
            // 8.2 使用sqlserver进行事务处理
            x.UseSqlServer(Configuration.GetConnectionString("DefaultConnection"));
        // 8.2 使用RabbitMQ进行事件中心处理
        x.UseRabbitMQ(rb => {
            rb.HostName = "localhost";
            rb.UserName = "guest";
            rb.Password = "guest";
            rb.Port = 5672;
            rb.VirtualHost = "/";
        });
    });

1.4测试演示效果

`
数据库中多了两张表cap.Published和cap.Received

当业务执行成功,发送消息时,聚合微服务宕机,消息被持久化到数据库

当重启聚合微服务时,消息发送成功,被成功消费

1.5 原理

  1. 定时器 消息重试
  2. 幂等性 一个函数每次都是相同的结果,状态只有一个