diff --git a/cmd/storj-sim/network.go b/cmd/storj-sim/network.go index de2fe2190..b2d79003f 100644 --- a/cmd/storj-sim/network.go +++ b/cmd/storj-sim/network.go @@ -146,7 +146,10 @@ func networkTest(flags *Flags, command string, args []string) error { processes.Start(ctx, &group, "run") for _, process := range processes.List { - process.Status.Started.Wait() + process.Status.Started.Wait(ctx) + } + if err := ctx.Err(); err != nil { + return err } cmd := exec.CommandContext(ctx, command, args...) diff --git a/cmd/storj-sim/process.go b/cmd/storj-sim/process.go index bc2f115f7..7dceed580 100644 --- a/cmd/storj-sim/process.go +++ b/cmd/storj-sim/process.go @@ -176,7 +176,9 @@ func (process *Process) Exec(ctx context.Context, command string) (err error) { // wait for dependencies to start for _, fence := range process.Wait { - fence.Wait() + if !fence.Wait(ctx) { + return ctx.Err() + } } // in case we have an explicit delay then sleep diff --git a/internal/sync2/fence.go b/internal/sync2/fence.go index 5899365f2..68366fb2e 100644 --- a/internal/sync2/fence.go +++ b/internal/sync2/fence.go @@ -4,75 +4,62 @@ package sync2 import ( - "runtime" + "context" "sync" - "sync/atomic" ) // Fence allows to wait for something to happen. type Fence struct { - status uint32 - wait sync.Mutex + setup sync.Once + release sync.Once + done chan struct{} } -/* -General flow of the fence: - -init: - first arriver caller to `init` will setup a lock in `wait` - -Wait callers: - try to lock/unlock in Wait, this will block until the initial lock will be released - -Release caller: - first caller will release the initial lock -*/ - -const ( - statusUninitialized = iota - statusInitializing - statusBlocked - statusReleased -) - // init sets up the initial lock into wait func (fence *Fence) init() { - // wait for initialization - for atomic.LoadUint32(&fence.status) <= statusInitializing { - // first arriver sets up lock - if atomic.CompareAndSwapUint32(&fence.status, statusUninitialized, statusInitializing) { - fence.wait.Lock() - atomic.StoreUint32(&fence.status, statusBlocked) - } else { - runtime.Gosched() - } - } -} - -// Wait waits for wait to be unlocked -func (fence *Fence) Wait() { - // fast-path - if fence.Released() { - return - } - fence.init() - // start waiting on the initial lock to be released - fence.wait.Lock() - // intentionally empty critical section to wait for Release - //nolint - fence.wait.Unlock() -} - -// Released returns whether the fence has been released. -func (fence *Fence) Released() bool { - return atomic.LoadUint32(&fence.status) >= statusReleased + fence.setup.Do(func() { + fence.done = make(chan struct{}) + }) } // Release releases everyone from Wait func (fence *Fence) Release() { fence.init() - // the first one releases the status - if atomic.CompareAndSwapUint32(&fence.status, statusBlocked, statusReleased) { - fence.wait.Unlock() + fence.release.Do(func() { close(fence.done) }) +} + +// Wait waits for wait to be unlocked. +// Returns true when it was successfully released. +func (fence *Fence) Wait(ctx context.Context) bool { + fence.init() + + select { + case <-fence.done: + return true + default: + select { + case <-ctx.Done(): + return false + case <-fence.done: + return true + } } } + +// Released returns whether the fence has been released. +func (fence *Fence) Released() bool { + fence.init() + + select { + case <-fence.done: + return true + default: + return false + } +} + +// Done returns channel that will be closed when the fence is released. +func (fence *Fence) Done() chan struct{} { + fence.init() + return fence.done +} diff --git a/internal/sync2/fence_test.go b/internal/sync2/fence_test.go index 2e06aa4c7..1b29704e3 100644 --- a/internal/sync2/fence_test.go +++ b/internal/sync2/fence_test.go @@ -4,6 +4,7 @@ package sync2_test import ( + "context" "errors" "sync/atomic" "testing" @@ -12,18 +13,24 @@ import ( "golang.org/x/sync/errgroup" "storj.io/storj/internal/sync2" + "storj.io/storj/internal/testcontext" ) func TestFence(t *testing.T) { t.Parallel() + ctx := testcontext.NewWithTimeout(t, 30*time.Second) + defer ctx.Cleanup() + var group errgroup.Group var fence sync2.Fence var done int32 for i := 0; i < 10; i++ { group.Go(func() error { - fence.Wait() + if !fence.Wait(ctx) { + return errors.New("got false from Wait") + } if atomic.LoadInt32(&done) == 0 { return errors.New("fence not yet released") } @@ -46,3 +53,33 @@ func TestFence(t *testing.T) { t.Fatal(err) } } + +func TestFence_ContextCancel(t *testing.T) { + t.Parallel() + + tctx := testcontext.NewWithTimeout(t, 30*time.Second) + defer tctx.Cleanup() + + ctx, cancel := context.WithCancel(tctx) + + var group errgroup.Group + var fence sync2.Fence + + for i := 0; i < 10; i++ { + group.Go(func() error { + if fence.Wait(ctx) { + return errors.New("got true from Wait") + } + return nil + }) + } + + // wait a bit for all goroutines to hit the fence + time.Sleep(100 * time.Millisecond) + + cancel() + + if err := group.Wait(); err != nil { + t.Fatal(err) + } +} diff --git a/internal/version/checker/service.go b/internal/version/checker/service.go index bd59511f0..55af8ec4e 100644 --- a/internal/version/checker/service.go +++ b/internal/version/checker/service.go @@ -85,8 +85,10 @@ func (srv *Service) Run(ctx context.Context) (err error) { } // IsAllowed returns whether if the Service is allowed to operate or not -func (srv *Service) IsAllowed() bool { - srv.checked.Wait() +func (srv *Service) IsAllowed(ctx context.Context) bool { + if !srv.checked.Wait(ctx) { + return false + } srv.mu.Lock() defer srv.mu.Unlock() return srv.allowed diff --git a/storagenode/console/service.go b/storagenode/console/service.go index 2c5bc2fbe..62f304eef 100644 --- a/storagenode/console/service.go +++ b/storagenode/console/service.go @@ -133,7 +133,7 @@ func (s *Service) GetDashboardData(ctx context.Context) (_ *Dashboard, err error data.NodeID = s.contact.Local().Id data.Wallet = s.walletAddress data.Version = s.versionInfo.Version - data.UpToDate = s.version.IsAllowed() + data.UpToDate = s.version.IsAllowed(ctx) data.StartedAt = s.startedAt data.LastPinged = s.pingStats.WhenLastPinged() diff --git a/storagenode/contact/chore.go b/storagenode/contact/chore.go index 3f6fab17d..2b45af792 100644 --- a/storagenode/contact/chore.go +++ b/storagenode/contact/chore.go @@ -48,8 +48,8 @@ func (chore *Chore) Run(ctx context.Context) (err error) { defer mon.Task()(&ctx)(&err) chore.log.Info("Storagenode contact chore starting up") - if err = chore.service.waitForSelfData(ctx); err != nil { - return err + if !chore.service.initialized.Wait(ctx) { + return ctx.Err() } return chore.Loop.Run(ctx, func(ctx context.Context) error { diff --git a/storagenode/contact/contact_test.go b/storagenode/contact/contact_test.go index 5b9914997..60a55c194 100644 --- a/storagenode/contact/contact_test.go +++ b/storagenode/contact/contact_test.go @@ -5,6 +5,7 @@ package contact_test import ( "testing" + "time" "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" @@ -31,6 +32,8 @@ func TestStoragenodeContactEndpoint(t *testing.T) { firstPing := pingStats.WhenLastPinged() + time.Sleep(time.Second) //HACKFIX: windows has large time granularity + resp, err = conn.ContactClient().PingNode(ctx, &pb.ContactPingRequest{}) require.NotNil(t, resp) require.NoError(t, err) diff --git a/storagenode/contact/service.go b/storagenode/contact/service.go index 8f46b2786..31bfe89eb 100644 --- a/storagenode/contact/service.go +++ b/storagenode/contact/service.go @@ -4,7 +4,6 @@ package contact import ( - "context" "sync" "time" @@ -12,6 +11,7 @@ import ( "go.uber.org/zap" "gopkg.in/spacemonkeygo/monkit.v2" + "storj.io/storj/internal/sync2" "storj.io/storj/pkg/pb" "storj.io/storj/satellite/overlay" ) @@ -36,16 +36,14 @@ type Service struct { mu sync.Mutex self *overlay.NodeDossier - gotSelfData bool - dataWaiter chan struct{} + initialized sync2.Fence } // NewService creates a new contact service func NewService(log *zap.Logger, self *overlay.NodeDossier) *Service { return &Service{ - log: log, - self: self, - dataWaiter: make(chan struct{}), + log: log, + self: self, } } @@ -63,20 +61,6 @@ func (service *Service) UpdateSelf(capacity *pb.NodeCapacity) { if capacity != nil { service.self.Capacity = *capacity } - if !service.gotSelfData { - service.gotSelfData = true - close(service.dataWaiter) - } -} -// waitForSelfData waits for any incoming call to the .UpdateSelf() method on this -// contact.Service instance, then returns. If the given context is canceled first, -// the relevant error is returned instead. -func (service *Service) waitForSelfData(ctx context.Context) error { - select { - case <-service.dataWaiter: - return nil - case <-ctx.Done(): - return ctx.Err() - } + service.initialized.Release() }