From c284cfde300c412331029d9d3f4aed9078ed9008 Mon Sep 17 00:00:00 2001 From: Egon Elbre Date: Wed, 24 Apr 2019 13:15:46 +0300 Subject: [PATCH] ensure TestParallel doesn't deadlock on error (#1808) --- Jenkinsfile.public | 1 + internal/errs2/group.go | 38 +++++++++++++++++++++ internal/errs2/group_test.go | 32 ++++++++++++++++++ pkg/datarepair/queue/queue_test.go | 54 +++++++++--------------------- 4 files changed, 87 insertions(+), 38 deletions(-) create mode 100644 internal/errs2/group.go create mode 100644 internal/errs2/group_test.go diff --git a/Jenkinsfile.public b/Jenkinsfile.public index 0c455e1db..b2324523e 100644 --- a/Jenkinsfile.public +++ b/Jenkinsfile.public @@ -38,6 +38,7 @@ pipeline { STORJ_POSTGRES_TEST = 'postgres://postgres@localhost/teststorj?sslmode=disable' } steps { + sh 'psql -U postgres -c \'alter system set max_connections = 1000;\'' sh 'psql -U postgres -c \'create database teststorj;\'' sh 'go run scripts/use-ports.go -from 1024 -to 10000 &' sh 'go test -vet=off -timeout 9m -json -race ./... 2>&1 | tee tests.json | go run ./scripts/xunit.go -out tests.xml' diff --git a/internal/errs2/group.go b/internal/errs2/group.go new file mode 100644 index 000000000..8cbad5f67 --- /dev/null +++ b/internal/errs2/group.go @@ -0,0 +1,38 @@ +// Copyright (C) 2019 Storj Labs, Inc. +// See LICENSE for copying information. + +package errs2 + +import "sync" + +// Group is a collection of goroutines working on subtasks that are part of +// the same overall task. +type Group struct { + wg sync.WaitGroup + mu sync.Mutex + errors []error +} + +// Go calls the given function in a new goroutine. +func (group *Group) Go(f func() error) { + group.wg.Add(1) + + go func() { + defer group.wg.Done() + + if err := f(); err != nil { + group.mu.Lock() + defer group.mu.Unlock() + + group.errors = append(group.errors, err) + } + }() +} + +// Wait blocks until all function calls from the Go method have returned, then +// returns all errors (if any) from them. +func (group *Group) Wait() []error { + group.wg.Wait() + + return group.errors +} diff --git a/internal/errs2/group_test.go b/internal/errs2/group_test.go new file mode 100644 index 000000000..97eb13ec9 --- /dev/null +++ b/internal/errs2/group_test.go @@ -0,0 +1,32 @@ +// Copyright (C) 2019 Storj Labs, Inc. +// See LICENSE for copying information. + +package errs2_test + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/require" + + "storj.io/storj/internal/errs2" +) + +func TestGroup(t *testing.T) { + group := errs2.Group{} + group.Go(func() error { + return fmt.Errorf("first") + }) + group.Go(func() error { + return nil + }) + group.Go(func() error { + return fmt.Errorf("second") + }) + group.Go(func() error { + return fmt.Errorf("third") + }) + + allErrors := group.Wait() + require.Len(t, allErrors, 3) +} diff --git a/pkg/datarepair/queue/queue_test.go b/pkg/datarepair/queue/queue_test.go index 239b84efd..7fe4066e5 100644 --- a/pkg/datarepair/queue/queue_test.go +++ b/pkg/datarepair/queue/queue_test.go @@ -6,11 +6,11 @@ package queue_test import ( "sort" "strconv" - "sync" "testing" "github.com/stretchr/testify/require" + "storj.io/storj/internal/errs2" "storj.io/storj/internal/testcontext" "storj.io/storj/pkg/pb" "storj.io/storj/satellite" @@ -109,71 +109,49 @@ func TestSequential(t *testing.T) { } func TestParallel(t *testing.T) { - t.Skip("flaky") - satellitedbtest.Run(t, func(t *testing.T, db satellite.DB) { ctx := testcontext.New(t) defer ctx.Cleanup() q := db.RepairQueue() const N = 100 - errs := make(chan error, N*2) entries := make(chan *pb.InjuredSegment, N) - var wg sync.WaitGroup - wg.Add(N) + + var inserts errs2.Group // Add to queue concurrently for i := 0; i < N; i++ { - go func(i int) { - defer wg.Done() - err := q.Insert(ctx, &pb.InjuredSegment{ + i := i + inserts.Go(func() error { + return q.Insert(ctx, &pb.InjuredSegment{ Path: strconv.Itoa(i), LostPieces: []int32{int32(i)}, }) - if err != nil { - errs <- err - } - }(i) + }) } - wg.Wait() + require.Empty(t, inserts.Wait(), "unexpected queue.Insert errors") - if len(errs) > 0 { - for err := range errs { - t.Error(err) - } - - t.Fatal("unexpected queue.Insert errors") - } - - wg.Add(N) // Remove from queue concurrently + var remove errs2.Group for i := 0; i < N; i++ { - go func(i int) { - defer wg.Done() + remove.Go(func() error { s, err := q.Select(ctx) if err != nil { - errs <- err + return err } err = q.Delete(ctx, s) if err != nil { - errs <- err + return err } entries <- s - }(i) + return nil + }) } - wg.Wait() - close(errs) + + require.Empty(t, remove.Wait(), "unexpected queue.Select/Delete errors") close(entries) - if len(errs) > 0 { - for err := range errs { - t.Error(err) - } - - t.Fatal("unexpected queue.Select/Delete errors") - } - var items []*pb.InjuredSegment for segment := range entries { items = append(items, segment)