RxGo —— Go 语言的 Reactive 扩展

EzeRinehart 3年前
   <p>RxGo 是 Go 语言的 Reactive 扩展。</p>    <h2>安装</h2>    <pre>  <code class="language-go">go get -u github.com/jochasinga/rxgo</code></pre>    <h2>用法</h2>    <pre>  <code class="language-go">watcher := observer.Observer{        // Register a handler function for every next available item.      NextHandler: func(item interface{}) {          fmt.Printf("Processing: %v\n", item)      },        // Register a handler for any emitted error.      ErrHandler: func(err error) {          fmt.Printf("Encountered error: %v\n", err)      },        // Register a handler when a stream is completed.      DoneHandler: func() {          fmt.Println("Done!")      },  }    it, _ := iterable.New([]interface{}{1, 2, 3, 4, errors.New("bang"), 5})  source := observable.From(it)  sub := source.Subscribe(watcher)    // wait for the async operation  <-sub</code></pre>    <p>以上将:</p>    <ul>     <li> <p>将切片中每个数字的格式字符串 print 为4。</p> </li>     <li> <p>print 错误“bang”</p> </li>    </ul>    <p>重要的是要记住,只有一个 OnError 或 OnDone 可以在 stream 中调用。 如果 stream 中有错误,处理停止,OnDone 将永远不会被调用,反之亦然。</p>    <p>概念是将所有“side effects”分组到这些处理程序中,让一个 Observer 或任何 EventHandler 处理它们。</p>    <pre>  <code class="language-go">package main  import (      "fmt"      "time"        "github.com/jochasinga/rx"      "github.com/jochasinga/rx/handlers"  )    func main() {        score := 9        onNext := handlers.NextFunc(func(item interface{}) {          if num, ok := item.(int); ok {              score += num          }      })        onDone := handlers.DoneFunc(func() {          score *= 2      })        watcher := observer.New(onNext, onDone)        // Create an `Observable` from a single item and subscribe to the observer.      sub := observable.Just(1).Subscribe(watcher)      <-sub        fmt.Println(score) // 20  }</code></pre>    <p> </p>    <p> </p>    <p> </p>