diff --git a/core/mapreduce/mapreduce.go b/core/mr/mapreduce.go similarity index 98% rename from core/mapreduce/mapreduce.go rename to core/mr/mapreduce.go index c2c6ee95..f563b5bf 100644 --- a/core/mapreduce/mapreduce.go +++ b/core/mr/mapreduce.go @@ -1,4 +1,4 @@ -package mapreduce +package mr import ( "errors" @@ -16,7 +16,7 @@ const ( minWorkers = 1 ) -var ErrCancelWithNil = errors.New("mapreduce cancelled with nil") +var ErrCancelWithNil = errors.New("mr cancelled with nil") type ( GenerateFunc func(source chan<- interface{}) diff --git a/core/mapreduce/mapreduce_test.go b/core/mr/mapreduce_test.go similarity index 99% rename from core/mapreduce/mapreduce_test.go rename to core/mr/mapreduce_test.go index 153ff38e..50abd150 100644 --- a/core/mapreduce/mapreduce_test.go +++ b/core/mr/mapreduce_test.go @@ -1,4 +1,4 @@ -package mapreduce +package mr import ( "errors" diff --git a/example/mapreduce/countfunc/countfunc.go b/example/mapreduce/countfunc/countfunc.go index beeff375..0fea993b 100644 --- a/example/mapreduce/countfunc/countfunc.go +++ b/example/mapreduce/countfunc/countfunc.go @@ -14,7 +14,7 @@ import ( "sync/atomic" "time" - "zero/core/mapreduce" + "zero/core/mr" "github.com/google/gops/agent" ) @@ -52,7 +52,7 @@ func enumerateLines(filename string) chan string { return output } -func mapper(filename interface{}, writer mapreduce.Writer, cancel func(error)) { +func mapper(filename interface{}, writer mr.Writer, cancel func(error)) { if len(*stopOnFile) > 0 && path.Base(filename.(string)) == *stopOnFile { fmt.Printf("Stop on file: %s\n", *stopOnFile) cancel(errors.New("stop on file")) @@ -80,7 +80,7 @@ func mapper(filename interface{}, writer mapreduce.Writer, cancel func(error)) { writer.Write(result) } -func reducer(input <-chan interface{}, writer mapreduce.Writer, cancel func(error)) { +func reducer(input <-chan interface{}, writer mr.Writer, cancel func(error)) { var result int for count := range input { @@ -110,7 +110,7 @@ func main() { fmt.Println("Processing, please wait...") start := time.Now() - result, err := mapreduce.MapReduce(func(source chan<- interface{}) { + result, err := mr.MapReduce(func(source chan<- interface{}) { filepath.Walk(*dir, func(fpath string, f os.FileInfo, err error) error { if !f.IsDir() && path.Ext(fpath) == ".go" { source <- fpath diff --git a/example/mapreduce/finishvoid/finishvoid.go b/example/mapreduce/finishvoid/finishvoid.go index 854b2c01..b1a3e960 100644 --- a/example/mapreduce/finishvoid/finishvoid.go +++ b/example/mapreduce/finishvoid/finishvoid.go @@ -4,14 +4,14 @@ import ( "fmt" "time" - "zero/core/mapreduce" + "zero/core/mr" "zero/core/timex" ) func main() { start := timex.Now() - mapreduce.FinishVoid(func() { + mr.FinishVoid(func() { time.Sleep(time.Second) }, func() { time.Sleep(time.Second * 5) @@ -20,7 +20,7 @@ func main() { }, func() { time.Sleep(time.Second * 6) }, func() { - if err := mapreduce.Finish(func() error { + if err := mr.Finish(func() error { time.Sleep(time.Second) return nil }, func() error { diff --git a/example/mapreduce/flatmap/flatmap.go b/example/mapreduce/flatmap/flatmap.go index afed5bf5..087e33f5 100644 --- a/example/mapreduce/flatmap/flatmap.go +++ b/example/mapreduce/flatmap/flatmap.go @@ -3,7 +3,7 @@ package main import ( "fmt" - "zero/core/mapreduce" + "zero/core/mr" ) var ( @@ -18,13 +18,13 @@ var ( func main() { var allFriends []string - for v := range mapreduce.Map(func(source chan<- interface{}) { + for v := range mr.Map(func(source chan<- interface{}) { for _, each := range persons { source <- each } - }, func(item interface{}, writer mapreduce.Writer) { + }, func(item interface{}, writer mr.Writer) { writer.Write(friends[item.(string)]) - }, mapreduce.WithWorkers(100)) { + }, mr.WithWorkers(100)) { allFriends = append(allFriends, v.([]string)...) } fmt.Println(allFriends) diff --git a/example/mapreduce/goroutineleak/leak.go b/example/mapreduce/goroutineleak/leak.go index 0a6e43dc..a4d9d2f2 100644 --- a/example/mapreduce/goroutineleak/leak.go +++ b/example/mapreduce/goroutineleak/leak.go @@ -10,7 +10,7 @@ import ( "zero/core/lang" "zero/core/logx" - "zero/core/mapreduce" + "zero/core/mr" "zero/core/proc" ) @@ -47,17 +47,17 @@ func main() { case <-done: return default: - mapreduce.MapReduce(func(source chan<- interface{}) { + mr.MapReduce(func(source chan<- interface{}) { for i := 0; i < 100; i++ { source <- i } - }, func(item interface{}, writer mapreduce.Writer, cancel func(error)) { + }, func(item interface{}, writer mr.Writer, cancel func(error)) { if item.(int) == 40 { cancel(errors.New("any")) return } writer.Write(item) - }, func(pipe <-chan interface{}, writer mapreduce.Writer, cancel func(error)) { + }, func(pipe <-chan interface{}, writer mr.Writer, cancel func(error)) { list := make([]int, 0) for p := range pipe { list = append(list, p.(int)) diff --git a/example/mapreduce/irregular/irregular.go b/example/mapreduce/irregular/irregular.go index 3cafa6e1..026c0e1d 100644 --- a/example/mapreduce/irregular/irregular.go +++ b/example/mapreduce/irregular/irregular.go @@ -4,15 +4,15 @@ import ( "fmt" "time" - "zero/core/mapreduce" + "zero/core/mr" ) func main() { - mapreduce.MapReduceVoid(func(source chan<- interface{}) { + mr.MapReduceVoid(func(source chan<- interface{}) { for i := 0; i < 10; i++ { source <- i } - }, func(item interface{}, writer mapreduce.Writer, cancel func(error)) { + }, func(item interface{}, writer mr.Writer, cancel func(error)) { i := item.(int) if i == 0 { time.Sleep(10 * time.Second) diff --git a/tools/goctl/goctl.go b/tools/goctl/goctl.go index 3ac594f6..3374933d 100644 --- a/tools/goctl/goctl.go +++ b/tools/goctl/goctl.go @@ -15,7 +15,7 @@ import ( "zero/core/hash" "zero/core/lang" "zero/core/logx" - "zero/core/mapreduce" + "zero/core/mr" "zero/core/stringx" "zero/tools/goctl/api/apigen" "zero/tools/goctl/api/dartgen" @@ -345,7 +345,7 @@ func main() { logx.Disable() done := make(chan lang.PlaceholderType) - mapreduce.FinishVoid(func() { + mr.FinishVoid(func() { if os.Getenv(autoUpdate) != "off" && !stringx.Contains(os.Args, "-iu") { update() }