Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
jolestar committed Jan 3, 2016
1 parent 43db639 commit 649f4e8
Show file tree
Hide file tree
Showing 10 changed files with 76 additions and 135 deletions.
7 changes: 4 additions & 3 deletions collections/queue.go
Expand Up @@ -3,6 +3,7 @@ package collections
import (
"errors"
"fmt"
"github.com/jolestar/go-commons-pool/concurrent"
"sync"
"time"
)
Expand Down Expand Up @@ -64,18 +65,18 @@ type LinkedBlockDeque struct {
lock *sync.Mutex

/** Condition for waiting takes */
notEmpty *TimeoutCond
notEmpty *concurrent.TimeoutCond

/** Condition for waiting puts */
notFull *TimeoutCond
notFull *concurrent.TimeoutCond
}

func NewDeque(capacity int) *LinkedBlockDeque {
if capacity < 0 {
panic(errors.New("capacity must > 0"))
}
lock := new(sync.Mutex)
return &LinkedBlockDeque{capacity: capacity, lock: lock, notEmpty: NewTimeoutCond(lock), notFull: NewTimeoutCond(lock)}
return &LinkedBlockDeque{capacity: capacity, lock: lock, notEmpty: concurrent.NewTimeoutCond(lock), notFull: concurrent.NewTimeoutCond(lock)}
}

/**
Expand Down
12 changes: 6 additions & 6 deletions collections/queue_test.go
Expand Up @@ -2,11 +2,11 @@ package collections

import (
"fmt"
"github.com/jolestar/go-commons-pool/concurrent"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
"reflect"
"sync"
"sync/atomic"
"testing"
"time"
)
Expand Down Expand Up @@ -191,19 +191,19 @@ func (this *LinkedBlockDequeTestSuite) TestIteratorRemove() {
endWait := sync.WaitGroup{}
endWait.Add(count + 1)

counts := make(map[int]int32, count)
var hasErr int32 = 0
counts := make(map[int]concurrent.AtomicInteger, count)
var hasErr concurrent.AtomicInteger = concurrent.AtomicInteger(0)
for i := 0; i < count; i++ {
go func(idx int) {
startWait.Wait()
iterator := this.deque.Iterator()
for iterator.HasNext() {
item := iterator.Next()
if item == nil {
hasErr = atomic.AddInt32(&hasErr, int32(1))
hasErr.IncrementAndGet()
} else {
c := counts[idx]
counts[idx] = atomic.AddInt32(&c, int32(1))
c.IncrementAndGet()
}
}
endWait.Done()
Expand Down Expand Up @@ -234,7 +234,7 @@ func (this *LinkedBlockDequeTestSuite) TestIteratorRemove() {
//fmt.Println("counts:", counts)
assert.Equal(this.T(), count/2, this.deque.Size())
assert.Equal(this.T(), count/2, len(list))
assert.Equal(this.T(), int32(0), hasErr)
assert.Equal(this.T(), int32(0), hasErr.Get())
}

func (this *LinkedBlockDequeTestSuite) TestQueueLock() {
Expand Down
2 changes: 1 addition & 1 deletion atomic.go → concurrent/atomic.go
@@ -1,4 +1,4 @@
package pool
package concurrent

import "sync/atomic"

Expand Down
2 changes: 1 addition & 1 deletion atomic_test.go → concurrent/atomic_test.go
@@ -1,4 +1,4 @@
package pool
package concurrent

import (
"github.com/stretchr/testify/assert"
Expand Down
8 changes: 6 additions & 2 deletions collections/cond.go → concurrent/cond.go
@@ -1,4 +1,4 @@
package collections
package concurrent

import (
"sync"
Expand Down Expand Up @@ -36,9 +36,11 @@ func (this *TimeoutCond) WaitWithTimeout(timeout time.Duration) (time.Duration,
return is interrupt
*/
func (this *TimeoutCond) Wait() bool {
//copy signal in lock, avoid data race with Interrupt
ch := this.signal
this.L.Unlock()
defer this.L.Lock()
_, ok := <-this.signal
_, ok := <-ch
return !ok
}

Expand All @@ -50,6 +52,8 @@ func (this *TimeoutCond) Signal() {
}

func (this *TimeoutCond) Interrupt() {
this.L.Lock()
defer this.L.Unlock()
close(this.signal)
this.signal = make(chan int, 0)
}
2 changes: 1 addition & 1 deletion collections/cond_test.go → concurrent/cond_test.go
@@ -1,4 +1,4 @@
package collections
package concurrent

import (
"fmt"
Expand Down
13 changes: 7 additions & 6 deletions pool.go
Expand Up @@ -4,6 +4,7 @@ import (
"errors"
"fmt"
"github.com/jolestar/go-commons-pool/collections"
"github.com/jolestar/go-commons-pool/concurrent"
"math"
"sync"
"time"
Expand Down Expand Up @@ -46,9 +47,9 @@ type ObjectPool struct {
idleObjects *collections.LinkedBlockDeque
allObjects *collections.SyncIdentityMap
factory PooledObjectFactory
createCount AtomicInteger
destroyedByEvictorCount AtomicInteger
destroyedCount AtomicInteger
createCount concurrent.AtomicInteger
destroyedByEvictorCount concurrent.AtomicInteger
destroyedCount concurrent.AtomicInteger
evictor *time.Ticker
evictionIterator collections.Iterator
}
Expand All @@ -57,9 +58,9 @@ func NewObjectPool(factory PooledObjectFactory, config *ObjectPoolConfig) *Objec
pool := ObjectPool{factory: factory, Config: config,
idleObjects: collections.NewDeque(math.MaxInt32),
allObjects: collections.NewSyncMap(),
createCount: AtomicInteger(0),
destroyedByEvictorCount: AtomicInteger(0),
destroyedCount: AtomicInteger(0)}
createCount: concurrent.AtomicInteger(0),
destroyedByEvictorCount: concurrent.AtomicInteger(0),
destroyedCount: concurrent.AtomicInteger(0)}
pool.StartEvictor()
return &pool
}
Expand Down
94 changes: 50 additions & 44 deletions pool_test.go
Expand Up @@ -3,6 +3,7 @@ package pool
import (
"errors"
"fmt"
"github.com/jolestar/go-commons-pool/concurrent"
"github.com/stretchr/testify/suite"
"math"
"math/rand"
Expand Down Expand Up @@ -473,7 +474,7 @@ func (this *PoolTestSuite) TestEvictWhileEmpty() {
this.pool.evict()
}

type TestRunnable struct {
type TestThreadArg struct {
/** pool to borrow from */
pool *ObjectPool

Expand All @@ -491,60 +492,64 @@ type TestRunnable struct {

/** object expected to be borrowed (fail otherwise) */
expectedObject interface{}
}

type TestThreadResult struct {
complete bool
failed bool
error error
}

func NewTestRunnableSimple(pool *ObjectPool, iter int, delay int, randomDelay bool) *TestRunnable {
return NewTestRunnable(pool, iter, delay, delay, randomDelay, nil)
func NewTesThreadArgSimple(pool *ObjectPool, iter int, delay int, randomDelay bool) *TestThreadArg {
return NewTestThreadArg(pool, iter, delay, delay, randomDelay, nil)
}

func NewTestRunnable(pool *ObjectPool, iter int, startDelay int,
holdTime int, randomDelay bool, obj interface{}) *TestRunnable {
return &TestRunnable{pool: pool, iter: iter, startDelay: startDelay, holdTime: holdTime, randomDelay: randomDelay, expectedObject: obj}
func NewTestThreadArg(pool *ObjectPool, iter int, startDelay int,
holdTime int, randomDelay bool, obj interface{}) *TestThreadArg {
return &TestThreadArg{pool: pool, iter: iter, startDelay: startDelay, holdTime: holdTime, randomDelay: randomDelay, expectedObject: obj}
}

func (this *TestRunnable) Run() {
for i := 0; i < this.iter; i++ {
func threadRun(arg *TestThreadArg, resultChan chan TestThreadResult) {
result := TestThreadResult{}
for i := 0; i < arg.iter; i++ {
var startDelay int
if this.randomDelay {
startDelay = int(rand.Int31n(int32(this.startDelay)))
if arg.randomDelay {
startDelay = int(rand.Int31n(int32(arg.startDelay)))
} else {
startDelay = this.startDelay
startDelay = arg.startDelay
}
var holdTime int
if this.randomDelay {
holdTime = int(rand.Int31n(int32(this.holdTime)))
if arg.randomDelay {
holdTime = int(rand.Int31n(int32(arg.holdTime)))
} else {
holdTime = this.holdTime
holdTime = arg.holdTime
}
time.Sleep(time.Duration(startDelay) * time.Millisecond)
obj, err := this.pool.BorrowObject()
obj, err := arg.pool.BorrowObject()
if err != nil {
this.error = err
this.failed = true
this.complete = true
result.error = err
result.failed = true
result.complete = true
break
}

if this.expectedObject != nil && !(this.expectedObject == obj) {
this.error = fmt.Errorf("Expected: %v found: %v", this.expectedObject, obj)
this.failed = true
this.complete = true
if arg.expectedObject != nil && !(arg.expectedObject == obj) {
result.error = fmt.Errorf("Expected: %v found: %v", arg.expectedObject, obj)
result.failed = true
result.complete = true
break
}
time.Sleep(time.Duration(holdTime) * time.Millisecond)
err = this.pool.ReturnObject(obj)
err = arg.pool.ReturnObject(obj)
if err != nil {
this.error = err
this.failed = true
this.complete = true
result.error = err
result.failed = true
result.complete = true
break
}
}
this.complete = true
result.complete = true
resultChan <- result
}

func (this *PoolTestSuite) TestEvictAddObjects() {
Expand All @@ -556,14 +561,16 @@ func (this *PoolTestSuite) TestEvictAddObjects() {
this.pool.BorrowObject() // numActive = 1, numIdle = 0
// Create a test thread that will run once and try a borrow after
// 150ms fixed delay
borrower := NewTestRunnableSimple(this.pool, 1, 150, false)
borrowerThread := NewThreadWithRunnable(borrower)
borrower := NewTesThreadArgSimple(this.pool, 1, 150, false)
//// Set evictor to run in 100 ms - will create idle instance
this.pool.Config.TimeBetweenEvictionRunsMillis = int64(100)
borrowerThread.Start() // Off to the races
borrowerThread.Join()
fmt.Printf("TestEvictAddObjects %v error:%v", borrower, borrower.error)
this.True(!borrower.failed)
ch := make(chan TestThreadResult)
go threadRun(borrower, ch)
//borrowerThread.Start() // Off to the races
//borrowerThread.Join()
result := <-ch
fmt.Printf("TestEvictAddObjects %v error:%v", borrower, result.error)
this.True(!result.failed)
}

func (this *PoolTestSuite) TestEvictLIFO() {
Expand Down Expand Up @@ -827,14 +834,15 @@ func (this *PoolTestSuite) TestMaxTotalUnderLoad() {
this.pool.Config.TimeBetweenEvictionRunsMillis = int64(-1)

// Start threads to borrow objects
threads := make([]*TestRunnable, numThreads)
threadArgs := make([]*TestThreadArg, numThreads)
resultChans := make([]chan TestThreadResult, numThreads)
for i := 0; i < numThreads; i++ {
// Factor of 2 on iterations so main thread does work whilst other
// threads are running. Factor of 2 on delay so average delay for
// other threads == actual delay for main thread
threads[i] = NewTestRunnableSimple(this.pool, numIter*2, delay*2, true)
t := NewThreadWithRunnable(threads[i])
t.Start()
threadArgs[i] = NewTesThreadArgSimple(this.pool, numIter*2, delay*2, true)
resultChans[i] = make(chan TestThreadResult)
go threadRun(threadArgs[i], resultChans[i])
}
// Give the threads a chance to start doing some work
time.Sleep(time.Duration(5000) * time.Millisecond)
Expand All @@ -856,11 +864,9 @@ func (this *PoolTestSuite) TestMaxTotalUnderLoad() {
}

for i := 0; i < numThreads; i++ {
for !(threads[i]).complete {
time.Sleep(time.Duration(500) * time.Millisecond)
}
if threads[i].failed {
this.Fail("Thread %v failed: %v", i, threads[i].error.Error())
result := <-resultChans[i]
if result.failed {
this.Fail("Thread %v failed: %v", i, result.error.Error())
}
}
}
Expand Down Expand Up @@ -985,7 +991,7 @@ func (this *PoolTestSuite) TestEviction() {
}

type TestEvictionPolicy struct {
callCount AtomicInteger
callCount concurrent.AtomicInteger
}

func (this *TestEvictionPolicy) Evict(config *EvictionConfig, underTest *PooledObject, idleCount int) bool {
Expand Down Expand Up @@ -1068,7 +1074,7 @@ func (this *PoolTestSuite) TestEvictionInvalid() {
func() (interface{}, error) {
return &TestObject{}, nil
}, nil, func(object *PooledObject) bool {
fmt.Printf("TestEvictionInvalid valid object %v \n", object)
//fmt.Printf("TestEvictionInvalid valid object %v \n", object)
time.Sleep(time.Duration(1000) * time.Millisecond)
return false
}, nil, nil))
Expand Down
39 changes: 0 additions & 39 deletions thread.go

This file was deleted.

0 comments on commit 649f4e8

Please sign in to comment.