Spiga

Go学习笔记(八):实现Pipe-Filter(管道过滤器)

2020-04-27 22:50:03

Pipe-Filter 模式,即管道过滤器模式,这是一种非常经典的架构模式,这种模式与工业制造生产流水线非常类似,就像薯片的生产过程,从土豆的清洗、去皮、切片、烘干、油炸,到最后打包完成,整个生产过程被拆分成了多个环节,每个环节处理完成之后,通过传送带传送到下一个环节的机器。整个生产过程每个环节都是独立的,但又环环相扣,只要有一个环节出问题了,生产出来的薯片就会有质量问题。

适用的场景

  • ⾮常适合与数据处理及数据分析系统
  • Filter封装数据处理的功能
  • Pipe⽤于连接Filter传递数据或者在异步处理过程中缓冲数据流
    • 进程内同步调⽤时,pipe演变为数据在⽅法调⽤间传递
  • 松耦合:Filter只跟数据(格式)耦合

Filter 和组合模式

23 个经典设计模式里面有一个设计模式叫组合模式,当 Pipe-Filter 遇上组合模式时,多个 Filter 又可以再组合成一个新的 Filter,如下图所示,组合出来的 Filter 接收的数据与第一个 Filter 保持一致,返回的数据与最后一个 Filter 保持一致。通过组合,就可以将多个简单的 Filter 可以组合成一个更复杂的 Filter。应用这一套理论去实践,我们会发现,Filter 既可以做的很轻便,也可以做得很强大。

实例

接下来我们用Go语言实现上图实例:
1.定义filter接口

// Package pipefilter is to define the interfaces and the structures for pipe-filter style implementation
package pipefilter

// Request is the input of the filter
type Request interface{}

// Response is the output of the filter
type Response interface{}

// Filter interface is the definition of the data processing components
// Pipe-Filter structure
type Filter interface {
	Process(data Request) (Response, error)
}

2.实现SplitFilter、ToIntFilter和SumFilter

package pipefilter

import (
	"errors"
	"strings"
)

var SplitFilterWrongFormatError = errors.New("input data should be string")

type SplitFilter struct {
	delimiter string
}

func NewSplitFilter(delimiter string) *SplitFilter {
	return &SplitFilter{delimiter}
}

func (sf *SplitFilter) Process(data Request) (Response, error) {
	str, ok := data.(string) //检查数据格式/类型,是否可以处理
	if !ok {
		return nil, SplitFilterWrongFormatError
	}
	parts := strings.Split(str, sf.delimiter)
	return parts, nil
}

package pipefilter

import (
	"errors"
	"strconv"
)

var ToIntFilterWrongFormatError = errors.New("input data should be []string")

type ToIntFilter struct {
}

func NewToIntFilter() *ToIntFilter {
	return &ToIntFilter{}
}

func (tif *ToIntFilter) Process(data Request) (Response, error) {
	parts, ok := data.([]string)
	if !ok {
		return nil, ToIntFilterWrongFormatError
	}
	ret := []int{}
	for _, part := range parts {
		s, err := strconv.Atoi(part)
		if err != nil {
			return nil, err
		}
		ret = append(ret, s)
	}
	return ret, nil
}

package pipefilter

import "errors"

var SumFilterWrongFormatError = errors.New("input data should be []int")

type SumFilter struct {
}

func NewSumFilter() *SumFilter {
	return &SumFilter{}
}

func (sf *SumFilter) Process(data Request) (Response, error) {
	elems, ok := data.([]int)
	if !ok {
		return nil, SumFilterWrongFormatError
	}
	ret := 0
	for _, elem := range elems {
		ret += elem
	}
	return ret, nil
}

3.组合3个Filter

package pipefilter

// NewStraightPipeline create a new StraightPipelineWithWallTime
func NewStraightPipeline(name string, filters ...Filter) *StraightPipeline {
	return &StraightPipeline{
		Name:    name,
		Filters: &filters,
	}
}

// StraightPipeline is composed of the filters, and the filters are piled as a straigt line.
type StraightPipeline struct {
	Name    string
	Filters *[]Filter
}

// Process is to process the coming data by the pipeline
func (f *StraightPipeline) Process(data Request) (Response, error) {
	var ret interface{}
	var err error
	for _, filter := range *f.Filters {
		ret, err = filter.Process(data)
		if err != nil {
			return ret, err
		}
		data = ret
	}
	return ret, err
}

4.测试调用

func TestStraightPipeline(t *testing.T) {
	spliter := NewSplitFilter(",")
	converter := NewToIntFilter()
	sum := NewSumFilter()
	sp := NewStraightPipeline("p1", spliter, converter, sum)
	ret, err := sp.Process("1,2,3")
	if err != nil {
		t.Fatal(err)
	}
	if ret != 6 {
		t.Fatalf("The expected is 6, but the actual is %d", ret)
	}
}