internal/sync2: make Fence accept context (#3393)

This commit is contained in:
Egon Elbre 2019-10-28 16:04:31 +02:00 committed by GitHub
parent 366da85e0a
commit 93353df4d6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 103 additions and 85 deletions

View File

@ -146,7 +146,10 @@ func networkTest(flags *Flags, command string, args []string) error {
processes.Start(ctx, &group, "run")
for _, process := range processes.List {
process.Status.Started.Wait()
process.Status.Started.Wait(ctx)
}
if err := ctx.Err(); err != nil {
return err
}
cmd := exec.CommandContext(ctx, command, args...)

View File

@ -176,7 +176,9 @@ func (process *Process) Exec(ctx context.Context, command string) (err error) {
// wait for dependencies to start
for _, fence := range process.Wait {
fence.Wait()
if !fence.Wait(ctx) {
return ctx.Err()
}
}
// in case we have an explicit delay then sleep

View File

@ -4,75 +4,62 @@
package sync2
import (
"runtime"
"context"
"sync"
"sync/atomic"
)
// Fence allows to wait for something to happen.
type Fence struct {
status uint32
wait sync.Mutex
setup sync.Once
release sync.Once
done chan struct{}
}
/*
General flow of the fence:
init:
first arriver caller to `init` will setup a lock in `wait`
Wait callers:
try to lock/unlock in Wait, this will block until the initial lock will be released
Release caller:
first caller will release the initial lock
*/
const (
statusUninitialized = iota
statusInitializing
statusBlocked
statusReleased
)
// init sets up the initial lock into wait
func (fence *Fence) init() {
// wait for initialization
for atomic.LoadUint32(&fence.status) <= statusInitializing {
// first arriver sets up lock
if atomic.CompareAndSwapUint32(&fence.status, statusUninitialized, statusInitializing) {
fence.wait.Lock()
atomic.StoreUint32(&fence.status, statusBlocked)
} else {
runtime.Gosched()
}
}
}
// Wait waits for wait to be unlocked
func (fence *Fence) Wait() {
// fast-path
if fence.Released() {
return
}
fence.init()
// start waiting on the initial lock to be released
fence.wait.Lock()
// intentionally empty critical section to wait for Release
//nolint
fence.wait.Unlock()
}
// Released returns whether the fence has been released.
func (fence *Fence) Released() bool {
return atomic.LoadUint32(&fence.status) >= statusReleased
fence.setup.Do(func() {
fence.done = make(chan struct{})
})
}
// Release releases everyone from Wait
func (fence *Fence) Release() {
fence.init()
// the first one releases the status
if atomic.CompareAndSwapUint32(&fence.status, statusBlocked, statusReleased) {
fence.wait.Unlock()
fence.release.Do(func() { close(fence.done) })
}
// Wait waits for wait to be unlocked.
// Returns true when it was successfully released.
func (fence *Fence) Wait(ctx context.Context) bool {
fence.init()
select {
case <-fence.done:
return true
default:
select {
case <-ctx.Done():
return false
case <-fence.done:
return true
}
}
}
// Released returns whether the fence has been released.
func (fence *Fence) Released() bool {
fence.init()
select {
case <-fence.done:
return true
default:
return false
}
}
// Done returns channel that will be closed when the fence is released.
func (fence *Fence) Done() chan struct{} {
fence.init()
return fence.done
}

View File

@ -4,6 +4,7 @@
package sync2_test
import (
"context"
"errors"
"sync/atomic"
"testing"
@ -12,18 +13,24 @@ import (
"golang.org/x/sync/errgroup"
"storj.io/storj/internal/sync2"
"storj.io/storj/internal/testcontext"
)
func TestFence(t *testing.T) {
t.Parallel()
ctx := testcontext.NewWithTimeout(t, 30*time.Second)
defer ctx.Cleanup()
var group errgroup.Group
var fence sync2.Fence
var done int32
for i := 0; i < 10; i++ {
group.Go(func() error {
fence.Wait()
if !fence.Wait(ctx) {
return errors.New("got false from Wait")
}
if atomic.LoadInt32(&done) == 0 {
return errors.New("fence not yet released")
}
@ -46,3 +53,33 @@ func TestFence(t *testing.T) {
t.Fatal(err)
}
}
func TestFence_ContextCancel(t *testing.T) {
t.Parallel()
tctx := testcontext.NewWithTimeout(t, 30*time.Second)
defer tctx.Cleanup()
ctx, cancel := context.WithCancel(tctx)
var group errgroup.Group
var fence sync2.Fence
for i := 0; i < 10; i++ {
group.Go(func() error {
if fence.Wait(ctx) {
return errors.New("got true from Wait")
}
return nil
})
}
// wait a bit for all goroutines to hit the fence
time.Sleep(100 * time.Millisecond)
cancel()
if err := group.Wait(); err != nil {
t.Fatal(err)
}
}

View File

@ -85,8 +85,10 @@ func (srv *Service) Run(ctx context.Context) (err error) {
}
// IsAllowed returns whether if the Service is allowed to operate or not
func (srv *Service) IsAllowed() bool {
srv.checked.Wait()
func (srv *Service) IsAllowed(ctx context.Context) bool {
if !srv.checked.Wait(ctx) {
return false
}
srv.mu.Lock()
defer srv.mu.Unlock()
return srv.allowed

View File

@ -133,7 +133,7 @@ func (s *Service) GetDashboardData(ctx context.Context) (_ *Dashboard, err error
data.NodeID = s.contact.Local().Id
data.Wallet = s.walletAddress
data.Version = s.versionInfo.Version
data.UpToDate = s.version.IsAllowed()
data.UpToDate = s.version.IsAllowed(ctx)
data.StartedAt = s.startedAt
data.LastPinged = s.pingStats.WhenLastPinged()

View File

@ -48,8 +48,8 @@ func (chore *Chore) Run(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
chore.log.Info("Storagenode contact chore starting up")
if err = chore.service.waitForSelfData(ctx); err != nil {
return err
if !chore.service.initialized.Wait(ctx) {
return ctx.Err()
}
return chore.Loop.Run(ctx, func(ctx context.Context) error {

View File

@ -5,6 +5,7 @@ package contact_test
import (
"testing"
"time"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
@ -31,6 +32,8 @@ func TestStoragenodeContactEndpoint(t *testing.T) {
firstPing := pingStats.WhenLastPinged()
time.Sleep(time.Second) //HACKFIX: windows has large time granularity
resp, err = conn.ContactClient().PingNode(ctx, &pb.ContactPingRequest{})
require.NotNil(t, resp)
require.NoError(t, err)

View File

@ -4,7 +4,6 @@
package contact
import (
"context"
"sync"
"time"
@ -12,6 +11,7 @@ import (
"go.uber.org/zap"
"gopkg.in/spacemonkeygo/monkit.v2"
"storj.io/storj/internal/sync2"
"storj.io/storj/pkg/pb"
"storj.io/storj/satellite/overlay"
)
@ -36,8 +36,7 @@ type Service struct {
mu sync.Mutex
self *overlay.NodeDossier
gotSelfData bool
dataWaiter chan struct{}
initialized sync2.Fence
}
// NewService creates a new contact service
@ -45,7 +44,6 @@ func NewService(log *zap.Logger, self *overlay.NodeDossier) *Service {
return &Service{
log: log,
self: self,
dataWaiter: make(chan struct{}),
}
}
@ -63,20 +61,6 @@ func (service *Service) UpdateSelf(capacity *pb.NodeCapacity) {
if capacity != nil {
service.self.Capacity = *capacity
}
if !service.gotSelfData {
service.gotSelfData = true
close(service.dataWaiter)
}
}
// waitForSelfData waits for any incoming call to the .UpdateSelf() method on this
// contact.Service instance, then returns. If the given context is canceled first,
// the relevant error is returned instead.
func (service *Service) waitForSelfData(ctx context.Context) error {
select {
case <-service.dataWaiter:
return nil
case <-ctx.Done():
return ctx.Err()
}
service.initialized.Release()
}