From df37597ac3b8c6514fb4574f7027aa8f13d41c04 Mon Sep 17 00:00:00 2001 From: kevin Date: Wed, 16 Sep 2020 16:48:51 +0800 Subject: [PATCH] simplify mapreduce code --- core/mr/mapreduce.go | 21 +++++---------------- readme.md | 1 + 2 files changed, 6 insertions(+), 16 deletions(-) diff --git a/core/mr/mapreduce.go b/core/mr/mapreduce.go index 4c7e6a45..15d1f439 100644 --- a/core/mr/mapreduce.go +++ b/core/mr/mapreduce.go @@ -79,7 +79,7 @@ func Map(generate GenerateFunc, mapper MapFunc, opts ...Option) chan interface{} collector := make(chan interface{}, options.workers) done := syncx.NewDoneChan() - go mapDispatcher(mapper, source, collector, done.Done(), options.workers) + go executeMappers(mapper, source, collector, done.Done(), options.workers) return collector } @@ -126,7 +126,10 @@ func MapReduceWithSource(source <-chan interface{}, mapper MapperFunc, reducer R reducer(collector, writer, cancel) drain(collector) }() - go mapperDispatcher(mapper, source, collector, done.Done(), cancel, options.workers) + + go executeMappers(func(item interface{}, writer Writer) { + mapper(item, writer, cancel) + }, source, collector, done.Done(), options.workers) value, ok := <-output if err := retErr.Load(); err != nil { @@ -226,20 +229,6 @@ func executeMappers(mapper MapFunc, input <-chan interface{}, collector chan<- i } } -func mapDispatcher(mapper MapFunc, input <-chan interface{}, collector chan<- interface{}, - done <-chan lang.PlaceholderType, workers int) { - executeMappers(func(item interface{}, writer Writer) { - mapper(item, writer) - }, input, collector, done, workers) -} - -func mapperDispatcher(mapper MapperFunc, input <-chan interface{}, collector chan<- interface{}, - done <-chan lang.PlaceholderType, cancel func(error), workers int) { - executeMappers(func(item interface{}, writer Writer) { - mapper(item, writer, cancel) - }, input, collector, done, workers) -} - func newOptions() *mapReduceOptions { return &mapReduceOptions{ workers: defaultWorkers, diff --git a/readme.md b/readme.md index 42987a77..dc10287b 100644 --- a/readme.md +++ b/readme.md @@ -157,6 +157,7 @@ go get -u github.com/tal-tech/go-zero * [通过MapReduce降低服务响应时间](doc/mapreduce.md) * [关键字替换和敏感词过滤工具](doc/keywords.md) * [进程内缓存使用方法](doc/collection.md) +* [防止缓存击穿之进程内共享调用](doc/sharedcalls.md) * [基于prometheus的微服务指标监控](doc/metric.md) * [文本序列化和反序列化](doc/mapping.md)