1 Star 0 Fork 0

hez2010/rxgo

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
filters.go 8.78 KB
一键复制 编辑 原始数据 按行查看 历史
hez2010 提交于 2020-11-10 10:38 +08:00 . fix: trans -> filters
package rxgo
import (
"context"
"reflect"
"sync"
)
// filter node implementation of streamOperator
type filtersOperater struct {
opFunc func(ctx context.Context, o *Observable, item reflect.Value, out chan interface{}) (end bool)
}
func (flop filtersOperater) op(ctx context.Context, o *Observable) {
// must hold defintion of flow resourcs here, such as chan etc., that is allocated when connected
// this resurces may be changed when operation routine is running.
in := o.pred.outflow
out := o.outflow
//fmt.Println(o.name, "operator in/out chan ", in, out)
var wg sync.WaitGroup
go func() {
end := false
for x := range in {
if end {
continue
}
// can not pass a interface as parameter (pointer) to gorountion for it may change its value outside!
xv := reflect.ValueOf(x)
// send an error to stream if the flip not accept error
if e, ok := x.(error); ok && !o.flip_accept_error {
o.sendToFlow(ctx, e, out)
continue
}
// scheduler
switch threading := o.threading; threading {
case ThreadingDefault:
if flop.opFunc(ctx, o, xv, out) {
end = true
}
case ThreadingIO:
fallthrough
case ThreadingComputing:
wg.Add(1)
go func() {
defer wg.Done()
if flop.opFunc(ctx, o, xv, out) {
end = true
}
}()
default:
}
}
wg.Wait() //waiting all go-routines completed
o.closeFlow(out)
}()
}
// Skip skip specified number of items
func (parent *Observable) Skip(count int) (o *Observable) {
o = parent.newFilterObservable("skip")
o.skip = count
o.index = 0
o.operator = skipOperator
return o
}
var skipOperator = filtersOperater{func(ctx context.Context, o *Observable, x reflect.Value, out chan interface{}) (end bool) {
skip, stop, e := userNext()
item := x.Interface()
o.index++
if o.index > o.skip {
if stop {
end = true
return
}
if skip {
return
}
if e != nil {
item = e
}
// send data
if !end {
end = o.sendToFlow(ctx, item, out)
}
}
return
}}
// Last take the last item
func (parent *Observable) Last() (o *Observable) {
o = parent.newFilterObservable("last")
o.index = 0
o.operator = lastOperator
return o
}
var lastOperator = filtersOperater{func(ctx context.Context, o *Observable, x reflect.Value, out chan interface{}) (end bool) {
skip, stop, e := userNext()
item := x.Interface()
o.index++
hasElement := false
o.pred.ElementAt(o.index).Subscribe(func(res interface{}) {
hasElement = true
})
if !hasElement {
if stop {
end = true
return
}
if skip {
return
}
if e != nil {
item = e
}
// send data
if !end {
end = o.sendToFlow(ctx, item, out)
}
}
return
}}
// Take take specified number of items
func (parent *Observable) Take(count int) (o *Observable) {
o = parent.newFilterObservable("take")
o.take = count
o.index = 0
o.operator = takeOperator
return o
}
var takeOperator = filtersOperater{func(ctx context.Context, o *Observable, x reflect.Value, out chan interface{}) (end bool) {
skip, stop, e := userNext()
item := x.Interface()
o.index++
if o.index <= o.take {
if stop {
end = true
return
}
if skip {
return
}
if e != nil {
item = e
}
// send data
if !end {
end = o.sendToFlow(ctx, item, out)
}
}
return
}}
// TakeLast take last specified number of items
func (parent *Observable) TakeLast(count int) (o *Observable) {
o = parent.newFilterObservable("takeLast")
o.take = count
o.index = 0
o.operator = takeLastOperator
return o
}
var takeLastOperator = filtersOperater{func(ctx context.Context, o *Observable, x reflect.Value, out chan interface{}) (end bool) {
skip, stop, e := userNext()
item := x.Interface()
o.index++
take := true
o.pred.ElementAt(o.index + o.take - 1).Subscribe(func(res interface{}) {
take = false
})
if take {
if stop {
end = true
return
}
if skip {
return
}
if e != nil {
item = e
}
// send data
if !end {
end = o.sendToFlow(ctx, item, out)
}
}
return
}}
// SkipLast take last specified number of items
func (parent *Observable) SkipLast(count int) (o *Observable) {
o = parent.newFilterObservable("skipLast")
o.skip = count
o.index = 0
o.operator = skipLastOperator
return o
}
var skipLastOperator = filtersOperater{func(ctx context.Context, o *Observable, x reflect.Value, out chan interface{}) (end bool) {
skip, stop, e := userNext()
item := x.Interface()
o.index++
take := false
o.pred.ElementAt(o.index + o.skip - 1).Subscribe(func(res interface{}) {
take = true
})
if take {
if stop {
end = true
return
}
if skip {
return
}
if e != nil {
item = e
}
// send data
if !end {
end = o.sendToFlow(ctx, item, out)
}
}
return
}}
// ElementAt take the first item
func (parent *Observable) ElementAt(index int) (o *Observable) {
o = parent.newFilterObservable("elementAt")
o.index = 0
o.take = index
o.operator = elementAtOperator
return o
}
var elementAtOperator = filtersOperater{func(ctx context.Context, o *Observable, x reflect.Value, out chan interface{}) (end bool) {
skip, stop, e := userNext()
item := x.Interface()
o.index++
if o.index-1 == o.take {
if stop {
end = true
return
}
if skip {
return
}
if e != nil {
item = e
}
// send data
if !end {
end = o.sendToFlow(ctx, item, out)
}
}
return
}}
// First take the first item
func (parent *Observable) FirstWhile(f interface{}) (o *Observable) {
// check validation of f
fv := reflect.ValueOf(f)
inType := []reflect.Type{typeAny}
outType := []reflect.Type{typeBool}
b, ctx_sup := checkFuncUpcast(fv, inType, outType, true)
if !b {
panic(ErrFuncFlip)
}
o = parent.newFilterObservable("firstWhile")
o.flip_accept_error = checkFuncAcceptError(fv)
o.flip_sup_ctx = ctx_sup
o.flip = fv.Interface()
o.take = 0
o.operator = firstWhileOperator
return o
}
var firstWhileOperator = filtersOperater{func(ctx context.Context, o *Observable, x reflect.Value, out chan interface{}) (end bool) {
fv := reflect.ValueOf(o.flip)
var params = []reflect.Value{x}
rs, skip, stop, e := userFuncCall(fv, params)
var item interface{} = rs[0].Interface()
if stop {
end = true
return
}
if skip {
return
}
if e != nil {
item = e
}
// send data
if !end {
if b, ok := item.(bool); ok && b && o.take == 0 {
o.take++
end = o.sendToFlow(ctx, x.Interface(), out)
}
}
return
}}
// First take the first item
func (parent *Observable) First() (o *Observable) {
o = parent.newFilterObservable("first")
o.index = 0
o.operator = firstOperator
return o
}
var firstOperator = filtersOperater{func(ctx context.Context, o *Observable, x reflect.Value, out chan interface{}) (end bool) {
skip, stop, e := userNext()
item := x.Interface()
o.index++
if o.index == 1 {
if stop {
end = true
return
}
if skip {
return
}
if e != nil {
item = e
}
// send data
if !end {
end = o.sendToFlow(ctx, item, out)
}
}
return
}}
// Distinct remove duplicated items
func (parent *Observable) Distinct() (o *Observable) {
o = parent.newFilterObservable("distinct")
o.operator = distinctOperator
return o
}
var distinctOperator = filtersOperater{func(ctx context.Context, o *Observable, x reflect.Value, out chan interface{}) (end bool) {
skip, stop, e := userNext()
item := x.Interface()
if o.hashset == nil {
o.hashset = make(map[interface{}]bool)
}
_, exists := o.hashset[item]
if !exists {
o.hashset[item] = true
if stop {
end = true
return
}
if skip {
return
}
if e != nil {
item = e
}
// send data
if !end {
end = o.sendToFlow(ctx, item, out)
}
}
return
}}
// Filter `func(x anytype) bool` filters items in the original Observable and returns
// a new Observable with the filtered items.
func (parent *Observable) Filter(f interface{}) (o *Observable) {
// check validation of f
fv := reflect.ValueOf(f)
inType := []reflect.Type{typeAny}
outType := []reflect.Type{typeBool}
b, ctx_sup := checkFuncUpcast(fv, inType, outType, true)
if !b {
panic(ErrFuncFlip)
}
o = parent.newFilterObservable("filter")
o.flip_accept_error = checkFuncAcceptError(fv)
o.flip_sup_ctx = ctx_sup
o.flip = fv.Interface()
o.operator = filterOperater
return o
}
var filterOperater = filtersOperater{func(ctx context.Context, o *Observable, x reflect.Value, out chan interface{}) (end bool) {
fv := reflect.ValueOf(o.flip)
var params = []reflect.Value{x}
rs, skip, stop, e := userFuncCall(fv, params)
var item interface{} = rs[0].Interface()
if stop {
end = true
return
}
if skip {
return
}
if e != nil {
item = e
}
// send data
if !end {
if b, ok := item.(bool); ok && b {
end = o.sendToFlow(ctx, x.Interface(), out)
}
}
return
}}
func (parent *Observable) newFilterObservable(name string) (o *Observable) {
//new Observable
o = newObservable()
o.Name = name
//chain Observables
parent.next = o
o.pred = parent
o.root = parent.root
return o
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/hez2010/rxgo.git
git@gitee.com:hez2010/rxgo.git
hez2010
rxgo
rxgo
master

搜索帮助