Go学习笔记(九):实现Micro-Kernel(微内核模式)
2020-05-01 20:44:02什么是微内核架构
微内核架构(Microkernel Architecture),也被称为插件化架构(Plugin-in Architecture),是一种面向功能进行拆分的可扩展架构。例如 VS Code、Eclipse 这一类 IDE 软件、UNIX 操作系统等等,都是参照微内核架构设计实现的。
微内核架构的两个核心组件
微内核架构包含两类核心的组件:核心系统(Core System)和插件模块(Plug-in modules)。核心系统负责与具体功能无关的通用功能,例如应用生命周期的管理、插件模块的管理(包括:插件模块的注册、载入、卸载等等);插件模块负责实现具体的功能,例如一个 Web 框架基本上会按照功能模块拆分成如下的插件模块:路由模块、安全模块、HTTP 编解码模块等等,每个模块都通过插件实现,每一个插件都只做一件事情。
微内核基本架构示意图如下所示:
核心系统功能尽量保持稳定,不要因为插件模块的扩展而不断修改,插件模块可以根据功能需求进行不断扩展。
特点与要点
- 特点
- 易于扩展
- 错误隔离
- 保持架构⼀致性
- 要点
- 内核包含公共流程或通⽤逻辑
- 将可变或可扩展部分规划为扩展点
- 抽象扩展点⾏为,定义接⼝
- 利⽤插件进⾏扩展
实例
如下图,我们希望实现Agent在系统主机上,这个Agent可以手机文件信息、进程信息、应用信息,以及提供一个扩展点,可以扩展未来其他要收集的信息,因此我们需要提供一个Extension Point。
1.接口Collector定义
type Collector interface {
Init(evtReceiver EventReceiver) error
Start(agtCtx context.Context) error
Stop() error
Destory() error
}
type Event struct {
Source string
Content string
}
type EventReceiver interface {
OnEvent(evt Event)
}
2.定义Agent结构体
type Agent struct {
collectors map[string]Collector
evtBuf chan Event
cancel context.CancelFunc
ctx context.Context
state int
}
func (agt *Agent) EventProcessGroutine() {
var evtSeg [10]Event
for {
for i := 0; i < 10; i++ {
select {
case evtSeg[i] = <-agt.evtBuf:
case <-agt.ctx.Done():
return
}
}
fmt.Println(evtSeg)
}
}
func NewAgent(sizeEvtBuf int) *Agent {
agt := Agent{
collectors: map[string]Collector{},
evtBuf: make(chan Event, sizeEvtBuf),
state: Waiting,
}
return &agt
}
func (agt *Agent) RegisterCollector(name string, collector Collector) error {
if agt.state != Waiting {
return WrongStateError
}
agt.collectors[name] = collector
return collector.Init(agt)
}
func (agt *Agent) startCollectors() error {
var err error
var errs CollectorsError
var mutex sync.Mutex
for name, collector := range agt.collectors {
go func(name string, collector Collector, ctx context.Context) {
defer func() {
mutex.Unlock()
}()
err = collector.Start(ctx)
mutex.Lock()
if err != nil {
errs.CollectorErrors = append(errs.CollectorErrors,
errors.New(name+":"+err.Error()))
}
}(name, collector, agt.ctx)
}
if len(errs.CollectorErrors) == 0 {
return nil
}
return errs
}
func (agt *Agent) stopCollectors() error {
var err error
var errs CollectorsError
for name, collector := range agt.collectors {
if err = collector.Stop(); err != nil {
errs.CollectorErrors = append(errs.CollectorErrors,
errors.New(name+":"+err.Error()))
}
}
if len(errs.CollectorErrors) == 0 {
return nil
}
return errs
}
func (agt *Agent) destoryCollectors() error {
var err error
var errs CollectorsError
for name, collector := range agt.collectors {
if err = collector.Destory(); err != nil {
errs.CollectorErrors = append(errs.CollectorErrors,
errors.New(name+":"+err.Error()))
}
}
if len(errs.CollectorErrors) == 0 {
return nil
}
return errs
}
func (agt *Agent) Start() error {
if agt.state != Waiting {
return WrongStateError
}
agt.state = Running
agt.ctx, agt.cancel = context.WithCancel(context.Background())
go agt.EventProcessGroutine()
return agt.startCollectors()
}
func (agt *Agent) Stop() error {
if agt.state != Running {
return WrongStateError
}
agt.state = Waiting
agt.cancel()
return agt.stopCollectors()
}
func (agt *Agent) Destory() error {
if agt.state != Waiting {
return WrongStateError
}
return agt.destoryCollectors()
}
func (agt *Agent) OnEvent(evt Event) {
agt.evtBuf <- evt
}
var WrongStateError = errors.New("can not take the operation in the current state")
type CollectorsError struct {
CollectorErrors []error
}
func (ce CollectorsError) Error() string {
var strs []string
for _, err := range ce.CollectorErrors {
strs = append(strs, err.Error())
}
return strings.Join(strs, ";")
}
3.测试
type DemoCollector struct {
evtReceiver EventReceiver
agtCtx context.Context
stopChan chan struct{}
name string
content string
}
func NewCollect(name string, content string) *DemoCollector {
return &DemoCollector{
stopChan: make(chan struct{}),
name: name,
content: content,
}
}
func (c *DemoCollector) Init(evtReceiver EventReceiver) error {
fmt.Println("initialize collector", c.name)
c.evtReceiver = evtReceiver
return nil
}
func (c *DemoCollector) Start(agtCtx context.Context) error {
fmt.Println("start collector", c.name)
for {
select {
case <-agtCtx.Done():
c.stopChan <- struct{}{}
break
default:
time.Sleep(time.Millisecond * 50)
c.evtReceiver.OnEvent(Event{c.name, c.content})
}
}
}
func (c *DemoCollector) Stop() error {
fmt.Println("stop collector", c.name)
select {
case <-c.stopChan:
return nil
case <-time.After(time.Second * 1):
return errors.New("failed to stop for timeout")
}
}
func (c *DemoCollector) Destory() error {
fmt.Println(c.name, "released resources.")
return nil
}
func TestAgent(t *testing.T) {
agt := NewAgent(100)
c1 := NewCollect("c1", "1")
c2 := NewCollect("c2", "2")
agt.RegisterCollector("c1", c1)
agt.RegisterCollector("c2", c2)
if err := agt.Start(); err != nil {
fmt.Printf("start error %v\n", err)
}
fmt.Println(agt.Start())
time.Sleep(time.Second * 1)
agt.Stop()
agt.Destory()
}