storj/internal/sync2/throttle.go
Alexander Leitner 2eb660d4b7 Bandwidth allocation pipeline data (#276)
* Moving retrieve into multiple goroutines

* Make sure we pass nil errors into err channel

* restore tests

* incorporate locks in retrieve.go

* deserialize data only if we have something to deserealize when receiving bandwidth allocation in server store

* Adding logic for retrieve to be more efficient

* Add channel?

* hmm

* implement Throttle concurrency primitive

* using throttle

* Remove unused variables

* Egon comments addressed

* Get ba total correct

* Consume without waiting

* incrementally increase signing size

* Get downloads working with throttle

* Removed logging

* Make sure we handle errors properly

* Fix tests
>
>
Co-authored-by: Kaloyan <kaloyan@storj.io>

* Can't Fatalf in goroutine

* Add missing returns to tests

* add capacity to channel, smarter allocations

* rename things and don't use size as limit

* replace things with sync2.Throttle

* fix compilation errors

* add note about security

* fix ordering

* Max length is actually 64 bytes for piece ID

* Max length is actually 64 bytes for piece ID

* fix limit

* error comes from pending allocs, so no need to relog

* Optimize throughput

* TODO

* Deleted allocation manager

* Return when someone sends a smaller bandwidth allocation than the previous message

* review comments
2018-09-10 03:18:41 -06:00

129 lines
3.2 KiB
Go

// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information
package sync2
import (
"sync"
)
// Throttle implements two-sided throttling, between a consumer and producer
type Throttle struct {
mu sync.Mutex
consumer sync.Cond
producer sync.Cond
// error tracking for terminating Consume and Allocate
errs []error
// how much is available in the throttle
// consumer decreases availability and blocks when it's below zero
// producer increses availability and blocks as needed
available int64
}
// NewThrottle returns a new Throttle primitive
func NewThrottle() *Throttle {
var throttle Throttle
throttle.consumer.L = &throttle.mu
throttle.producer.L = &throttle.mu
return &throttle
}
// Consume subtracts amount from the throttle
func (throttle *Throttle) Consume(amount int64) error {
throttle.mu.Lock()
defer throttle.mu.Unlock()
throttle.available -= amount
throttle.producer.Signal()
return throttle.combinedError()
}
// ConsumeOrWait tries to consume at most maxAmount
func (throttle *Throttle) ConsumeOrWait(maxAmount int64) (int64, error) {
throttle.mu.Lock()
defer throttle.mu.Unlock()
for throttle.alive() && throttle.available <= 0 {
throttle.consumer.Wait()
}
available := throttle.available
if available > maxAmount {
available = maxAmount
}
throttle.available -= available
throttle.producer.Signal()
return available, throttle.combinedError()
}
// WaitUntilAbove waits until availability drops below limit
func (throttle *Throttle) WaitUntilAbove(limit int64) error {
throttle.mu.Lock()
defer throttle.mu.Unlock()
for throttle.alive() && throttle.available <= limit {
throttle.consumer.Wait()
}
return throttle.combinedError()
}
// Produce adds amount to the throttle
func (throttle *Throttle) Produce(amount int64) error {
throttle.mu.Lock()
defer throttle.mu.Unlock()
throttle.available += amount
throttle.consumer.Signal()
return throttle.combinedError()
}
// ProduceAndWaitUntilBelow adds amount to the throttle and waits until it's below the given threshold
func (throttle *Throttle) ProduceAndWaitUntilBelow(amount, limit int64) error {
throttle.mu.Lock()
defer throttle.mu.Unlock()
throttle.available += amount
throttle.consumer.Signal()
for throttle.alive() && throttle.available >= limit {
throttle.producer.Wait()
}
return throttle.combinedError()
}
// WaitUntilBelow waits until availability drops below limit
func (throttle *Throttle) WaitUntilBelow(limit int64) error {
throttle.mu.Lock()
defer throttle.mu.Unlock()
for throttle.alive() && throttle.available >= limit {
throttle.producer.Wait()
}
return throttle.combinedError()
}
// Fail stops both consumer and allocator
func (throttle *Throttle) Fail(err error) {
throttle.mu.Lock()
defer throttle.mu.Unlock()
throttle.errs = append(throttle.errs, err)
throttle.consumer.Signal()
throttle.producer.Signal()
}
// must hold mutex when calling this
func (throttle *Throttle) alive() bool { return len(throttle.errs) == 0 }
func (throttle *Throttle) combinedError() error {
if len(throttle.errs) == 0 {
return nil
}
// TODO: combine errors
return throttle.errs[0]
}
// Err returns the finishing error
func (throttle *Throttle) Err() error {
throttle.mu.Lock()
defer throttle.mu.Unlock()
return throttle.combinedError()
}