2019-01-24 20:15:10 +00:00
|
|
|
// Copyright (C) 2019 Storj Labs, Inc.
|
2018-09-10 10:18:42 +01:00
|
|
|
// See LICENSE for copying information
|
|
|
|
|
2018-12-20 14:51:39 +00:00
|
|
|
package sync2_test
|
2018-09-10 10:18:42 +01:00
|
|
|
|
|
|
|
import (
|
|
|
|
"fmt"
|
|
|
|
"io"
|
|
|
|
"math/rand"
|
|
|
|
"sync"
|
|
|
|
"sync/atomic"
|
|
|
|
"testing"
|
|
|
|
"time"
|
2018-12-20 14:51:39 +00:00
|
|
|
|
|
|
|
"storj.io/storj/internal/sync2"
|
2018-09-10 10:18:42 +01:00
|
|
|
)
|
|
|
|
|
|
|
|
func ExampleThrottle() {
|
2018-12-20 14:51:39 +00:00
|
|
|
throttle := sync2.NewThrottle()
|
2018-09-10 10:18:42 +01:00
|
|
|
var wg sync.WaitGroup
|
|
|
|
|
|
|
|
// consumer
|
|
|
|
go func() {
|
|
|
|
defer wg.Done()
|
|
|
|
totalConsumed := int64(0)
|
|
|
|
for {
|
|
|
|
available, err := throttle.ConsumeOrWait(8)
|
|
|
|
if err != nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
fmt.Println("- consuming ", available, " total=", totalConsumed)
|
|
|
|
totalConsumed += available
|
|
|
|
|
|
|
|
// do work for available amount
|
|
|
|
time.Sleep(time.Duration(available) * time.Millisecond)
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
|
|
|
// producer
|
|
|
|
go func() {
|
|
|
|
defer wg.Done()
|
|
|
|
|
|
|
|
step := int64(8)
|
|
|
|
for total := int64(64); total >= 0; total -= step {
|
|
|
|
err := throttle.ProduceAndWaitUntilBelow(step, step*3)
|
|
|
|
if err != nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
fmt.Println("+ producing", step, " left=", total)
|
|
|
|
time.Sleep(time.Duration(rand.Intn(8)) * time.Millisecond)
|
|
|
|
}
|
|
|
|
|
|
|
|
throttle.Fail(io.EOF)
|
|
|
|
}()
|
|
|
|
|
|
|
|
wg.Wait()
|
|
|
|
|
|
|
|
fmt.Println("done", throttle.Err())
|
|
|
|
}
|
|
|
|
|
|
|
|
func TestThrottleBasic(t *testing.T) {
|
2019-02-20 09:22:53 +00:00
|
|
|
t.Parallel()
|
|
|
|
|
2018-12-20 14:51:39 +00:00
|
|
|
throttle := sync2.NewThrottle()
|
2018-09-10 10:18:42 +01:00
|
|
|
var stage int64
|
|
|
|
c := make(chan error, 1)
|
|
|
|
|
|
|
|
// consumer
|
|
|
|
go func() {
|
|
|
|
consume, _ := throttle.ConsumeOrWait(4)
|
|
|
|
if v := atomic.LoadInt64(&stage); v != 1 || consume != 4 {
|
|
|
|
c <- fmt.Errorf("did not block in time: %d / %d", v, consume)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
consume, _ = throttle.ConsumeOrWait(4)
|
|
|
|
if v := atomic.LoadInt64(&stage); v != 1 || consume != 4 {
|
|
|
|
c <- fmt.Errorf("did not block in time: %d / %d", v, consume)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
atomic.AddInt64(&stage, 2)
|
|
|
|
c <- nil
|
|
|
|
}()
|
|
|
|
|
|
|
|
// slowly produce
|
|
|
|
time.Sleep(time.Millisecond)
|
|
|
|
// set stage to 1
|
|
|
|
atomic.AddInt64(&stage, 1)
|
|
|
|
_ = throttle.Produce(8)
|
|
|
|
// wait until consumer consumes twice
|
|
|
|
_ = throttle.WaitUntilBelow(3)
|
|
|
|
// wait slightly for updating stage
|
|
|
|
time.Sleep(time.Millisecond)
|
|
|
|
|
|
|
|
if v := atomic.LoadInt64(&stage); v != 3 {
|
|
|
|
t.Fatalf("did not unblock in time: %d", v)
|
|
|
|
}
|
|
|
|
|
|
|
|
if err := <-c; err != nil {
|
|
|
|
t.Fatalf(err.Error())
|
|
|
|
}
|
|
|
|
}
|