2eb660d4b7
* Moving retrieve into multiple goroutines * Make sure we pass nil errors into err channel * restore tests * incorporate locks in retrieve.go * deserialize data only if we have something to deserealize when receiving bandwidth allocation in server store * Adding logic for retrieve to be more efficient * Add channel? * hmm * implement Throttle concurrency primitive * using throttle * Remove unused variables * Egon comments addressed * Get ba total correct * Consume without waiting * incrementally increase signing size * Get downloads working with throttle * Removed logging * Make sure we handle errors properly * Fix tests > > Co-authored-by: Kaloyan <kaloyan@storj.io> * Can't Fatalf in goroutine * Add missing returns to tests * add capacity to channel, smarter allocations * rename things and don't use size as limit * replace things with sync2.Throttle * fix compilation errors * add note about security * fix ordering * Max length is actually 64 bytes for piece ID * Max length is actually 64 bytes for piece ID * fix limit * error comes from pending allocs, so no need to relog * Optimize throughput * TODO * Deleted allocation manager * Return when someone sends a smaller bandwidth allocation than the previous message * review comments
129 lines
3.2 KiB
Go
129 lines
3.2 KiB
Go
// Copyright (C) 2018 Storj Labs, Inc.
|
|
// See LICENSE for copying information
|
|
|
|
package sync2
|
|
|
|
import (
|
|
"sync"
|
|
)
|
|
|
|
// Throttle implements two-sided throttling, between a consumer and producer
|
|
type Throttle struct {
|
|
mu sync.Mutex
|
|
consumer sync.Cond
|
|
producer sync.Cond
|
|
|
|
// error tracking for terminating Consume and Allocate
|
|
errs []error
|
|
|
|
// how much is available in the throttle
|
|
// consumer decreases availability and blocks when it's below zero
|
|
// producer increses availability and blocks as needed
|
|
available int64
|
|
}
|
|
|
|
// NewThrottle returns a new Throttle primitive
|
|
func NewThrottle() *Throttle {
|
|
var throttle Throttle
|
|
throttle.consumer.L = &throttle.mu
|
|
throttle.producer.L = &throttle.mu
|
|
return &throttle
|
|
}
|
|
|
|
// Consume subtracts amount from the throttle
|
|
func (throttle *Throttle) Consume(amount int64) error {
|
|
throttle.mu.Lock()
|
|
defer throttle.mu.Unlock()
|
|
throttle.available -= amount
|
|
throttle.producer.Signal()
|
|
return throttle.combinedError()
|
|
}
|
|
|
|
// ConsumeOrWait tries to consume at most maxAmount
|
|
func (throttle *Throttle) ConsumeOrWait(maxAmount int64) (int64, error) {
|
|
throttle.mu.Lock()
|
|
defer throttle.mu.Unlock()
|
|
|
|
for throttle.alive() && throttle.available <= 0 {
|
|
throttle.consumer.Wait()
|
|
}
|
|
|
|
available := throttle.available
|
|
if available > maxAmount {
|
|
available = maxAmount
|
|
}
|
|
throttle.available -= available
|
|
throttle.producer.Signal()
|
|
|
|
return available, throttle.combinedError()
|
|
}
|
|
|
|
// WaitUntilAbove waits until availability drops below limit
|
|
func (throttle *Throttle) WaitUntilAbove(limit int64) error {
|
|
throttle.mu.Lock()
|
|
defer throttle.mu.Unlock()
|
|
for throttle.alive() && throttle.available <= limit {
|
|
throttle.consumer.Wait()
|
|
}
|
|
return throttle.combinedError()
|
|
}
|
|
|
|
// Produce adds amount to the throttle
|
|
func (throttle *Throttle) Produce(amount int64) error {
|
|
throttle.mu.Lock()
|
|
defer throttle.mu.Unlock()
|
|
throttle.available += amount
|
|
throttle.consumer.Signal()
|
|
return throttle.combinedError()
|
|
}
|
|
|
|
// ProduceAndWaitUntilBelow adds amount to the throttle and waits until it's below the given threshold
|
|
func (throttle *Throttle) ProduceAndWaitUntilBelow(amount, limit int64) error {
|
|
throttle.mu.Lock()
|
|
defer throttle.mu.Unlock()
|
|
throttle.available += amount
|
|
throttle.consumer.Signal()
|
|
for throttle.alive() && throttle.available >= limit {
|
|
throttle.producer.Wait()
|
|
}
|
|
return throttle.combinedError()
|
|
}
|
|
|
|
// WaitUntilBelow waits until availability drops below limit
|
|
func (throttle *Throttle) WaitUntilBelow(limit int64) error {
|
|
throttle.mu.Lock()
|
|
defer throttle.mu.Unlock()
|
|
for throttle.alive() && throttle.available >= limit {
|
|
throttle.producer.Wait()
|
|
}
|
|
return throttle.combinedError()
|
|
}
|
|
|
|
// Fail stops both consumer and allocator
|
|
func (throttle *Throttle) Fail(err error) {
|
|
throttle.mu.Lock()
|
|
defer throttle.mu.Unlock()
|
|
|
|
throttle.errs = append(throttle.errs, err)
|
|
throttle.consumer.Signal()
|
|
throttle.producer.Signal()
|
|
}
|
|
|
|
// must hold mutex when calling this
|
|
func (throttle *Throttle) alive() bool { return len(throttle.errs) == 0 }
|
|
|
|
func (throttle *Throttle) combinedError() error {
|
|
if len(throttle.errs) == 0 {
|
|
return nil
|
|
}
|
|
// TODO: combine errors
|
|
return throttle.errs[0]
|
|
}
|
|
|
|
// Err returns the finishing error
|
|
func (throttle *Throttle) Err() error {
|
|
throttle.mu.Lock()
|
|
defer throttle.mu.Unlock()
|
|
return throttle.combinedError()
|
|
}
|