Go开源:Gores - Redis 的消息队列系统

k5135 5年前
   <h2>Gores</h2>    <p>An asynchronous job execution system based on Redis</p>    <h2>Installation</h2>    <p>Get the package</p>    <pre>  <code class="language-go">$ go get github.com/wang502/gores/gores</code></pre>    <p>Import the package</p>    <pre>  <code class="language-go">import "github.com/wang502/gores/gores"</code></pre>    <h2>Usage</h2>    <h3>Configuration</h3>    <p>Add a config.json in your project folder</p>    <pre>  <code class="language-go">{    "REDISURL": "",    "REDIS_PW": "mypassword",    "BLPOP_MAX_BLOCK_TIME" : 1,    "MAX_WORKERS": 2,    "Queues": ["queue1", "queue2"],    "DispatcherTimeout": 5,    "WorkerTimeout": 5  }</code></pre>    <ul>     <li><strong><em>REDISURL</em> </strong> : Redis server address. If you run in a local Redis, the dafault host is</li>     <li><strong><em>REDIS_PW</em> </strong> : Redis password. If the password is not set, then password can be any string.</li>     <li><strong><em>BLPOP_MAX_BLOCK_TIME</em> </strong> : Blocking time when calling BLPOP command in Redis.</li>     <li><strong><em>MAX_WORKERS</em> </strong> : Maximum number of concurrent workers, each worker is a separate goroutine that execute specific task on the fetched item.</li>     <li><strong><em>Queues</em> </strong> : Array of queue names on Redis message broker.</li>     <li><strong><em>DispatcherTimeout</em> </strong> : Duration dispatcher will wait to dispatch new job before quitting.</li>     <li><strong><em>WorkerTimeout</em> </strong> : Duration worker will wait to process new job before quitting.</li>    </ul>    <p>Initialize config</p>    <pre>  <code class="language-go">configPath := flag.String("c", "config.json", "path to configuration file")  flag.Parse()  config, err := gores.InitConfig(*configPath)</code></pre>    <h3>Enqueue item to Redis queue</h3>    <p>An item is a Go map. It is required to have several keys:</p>    <ul>     <li><strong><em>Name</em> </strong> : name of the item to enqueue, items with different names are mapped to different tasks.</li>     <li><strong><em>Queue</em> </strong> : name of the queue you want to put the item in.</li>     <li><strong><em>Args</em> </strong> : the required arguments that you need in order for the workers to execute those tasks.</li>     <li><strong><em>Enqueue_timestamp</em> </strong> : the Unix timestamp of when the item is enqueued.</li>    </ul>    <pre>  <code class="language-go">resq := gores.NewResQ(config)  item := map[string]interface{}{    "Name": "Rectangle",    "Queue": "TestJob",    "Args": map[string]interface{}{                  "Length": 10,                  "Width": 10,            },    "Enqueue_timestamp": time.Now().Unix(),  }    err = resq.Enqueue(item)  if err != nil {      log.Fatalf("ERROR Enqueue item to ResQ")  }</code></pre>    <pre>  <code class="language-go">$ go run main.go -c ./config.json -o produce</code></pre>    <h3>Define tasks</h3>    <pre>  <code class="language-go">package tasks    // task for item with 'Name' = 'Rectangle'  // calculating the area of an rectangle by multiplying Length with Width  func CalculateArea(args map[string]interface{}) error {      var err error        length := args["Length"]      width := args["Width"]      if length == nil || width == nil {          err = errors.New("Map has no required attributes")          return err      }      fmt.Printf("The area is %d\n", int(length.(float64)) * int(width.(float64)))      return err  }</code></pre>    <h3>Launch workers to consume items and execute tasks</h3>    <pre>  <code class="language-go">tasks := map[string]interface{}{                "Item": tasks.PrintItem,                "Rectangle": tasks.CalculateArea,           }  gores.Launch(config, &tasks)</code></pre>    <pre>  <code class="language-go">$ go run main.go -c ./config.json -o consume</code></pre>    <p>The output will be:</p>    <pre>  <code class="language-go">The rectangle area is 100</code></pre>    <h3>Info about processed/failed job</h3>    <pre>  <code class="language-go">resq := gores.NewResQ(config)  if resq == nil {      log.Fatalf("resq is nil")  }  info := resq.Info()  for k, v := range info {      switch v.(type) {      case string:        fmt.Printf("%s : %s\n", k, v)      case int:        fmt.Printf("%s : %d\n", k, v)      case int64:        fmt.Printf("%s : %d\n", k, v)      }  }</code></pre>    <p>The output will be:</p>    <pre>  <code class="language-go">Gores Info:  queues : 2  workers : 0  failed : 0  host :  pending : 0  processed : 1</code></pre>    <h2>Contribution</h2>    <p>Please feel free to suggest new features. Also open to pull request!</p>    <p> </p>    <p> </p>