ensure TestParallel doesn't deadlock on error (#1808)

This commit is contained in:
Egon Elbre 2019-04-24 13:15:46 +03:00 committed by GitHub
parent 7ba1a2bc53
commit c284cfde30
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 87 additions and 38 deletions

View File

@ -38,6 +38,7 @@ pipeline {
STORJ_POSTGRES_TEST = 'postgres://postgres@localhost/teststorj?sslmode=disable' STORJ_POSTGRES_TEST = 'postgres://postgres@localhost/teststorj?sslmode=disable'
} }
steps { steps {
sh 'psql -U postgres -c \'alter system set max_connections = 1000;\''
sh 'psql -U postgres -c \'create database teststorj;\'' sh 'psql -U postgres -c \'create database teststorj;\''
sh 'go run scripts/use-ports.go -from 1024 -to 10000 &' sh 'go run scripts/use-ports.go -from 1024 -to 10000 &'
sh 'go test -vet=off -timeout 9m -json -race ./... 2>&1 | tee tests.json | go run ./scripts/xunit.go -out tests.xml' sh 'go test -vet=off -timeout 9m -json -race ./... 2>&1 | tee tests.json | go run ./scripts/xunit.go -out tests.xml'

38
internal/errs2/group.go Normal file
View File

@ -0,0 +1,38 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package errs2
import "sync"
// Group is a collection of goroutines working on subtasks that are part of
// the same overall task.
type Group struct {
wg sync.WaitGroup
mu sync.Mutex
errors []error
}
// Go calls the given function in a new goroutine.
func (group *Group) Go(f func() error) {
group.wg.Add(1)
go func() {
defer group.wg.Done()
if err := f(); err != nil {
group.mu.Lock()
defer group.mu.Unlock()
group.errors = append(group.errors, err)
}
}()
}
// Wait blocks until all function calls from the Go method have returned, then
// returns all errors (if any) from them.
func (group *Group) Wait() []error {
group.wg.Wait()
return group.errors
}

View File

@ -0,0 +1,32 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package errs2_test
import (
"fmt"
"testing"
"github.com/stretchr/testify/require"
"storj.io/storj/internal/errs2"
)
func TestGroup(t *testing.T) {
group := errs2.Group{}
group.Go(func() error {
return fmt.Errorf("first")
})
group.Go(func() error {
return nil
})
group.Go(func() error {
return fmt.Errorf("second")
})
group.Go(func() error {
return fmt.Errorf("third")
})
allErrors := group.Wait()
require.Len(t, allErrors, 3)
}

View File

@ -6,11 +6,11 @@ package queue_test
import ( import (
"sort" "sort"
"strconv" "strconv"
"sync"
"testing" "testing"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"storj.io/storj/internal/errs2"
"storj.io/storj/internal/testcontext" "storj.io/storj/internal/testcontext"
"storj.io/storj/pkg/pb" "storj.io/storj/pkg/pb"
"storj.io/storj/satellite" "storj.io/storj/satellite"
@ -109,71 +109,49 @@ func TestSequential(t *testing.T) {
} }
func TestParallel(t *testing.T) { func TestParallel(t *testing.T) {
t.Skip("flaky")
satellitedbtest.Run(t, func(t *testing.T, db satellite.DB) { satellitedbtest.Run(t, func(t *testing.T, db satellite.DB) {
ctx := testcontext.New(t) ctx := testcontext.New(t)
defer ctx.Cleanup() defer ctx.Cleanup()
q := db.RepairQueue() q := db.RepairQueue()
const N = 100 const N = 100
errs := make(chan error, N*2)
entries := make(chan *pb.InjuredSegment, N) entries := make(chan *pb.InjuredSegment, N)
var wg sync.WaitGroup
wg.Add(N) var inserts errs2.Group
// Add to queue concurrently // Add to queue concurrently
for i := 0; i < N; i++ { for i := 0; i < N; i++ {
go func(i int) { i := i
defer wg.Done() inserts.Go(func() error {
err := q.Insert(ctx, &pb.InjuredSegment{ return q.Insert(ctx, &pb.InjuredSegment{
Path: strconv.Itoa(i), Path: strconv.Itoa(i),
LostPieces: []int32{int32(i)}, LostPieces: []int32{int32(i)},
}) })
if err != nil { })
errs <- err
}
}(i)
} }
wg.Wait() require.Empty(t, inserts.Wait(), "unexpected queue.Insert errors")
if len(errs) > 0 {
for err := range errs {
t.Error(err)
}
t.Fatal("unexpected queue.Insert errors")
}
wg.Add(N)
// Remove from queue concurrently // Remove from queue concurrently
var remove errs2.Group
for i := 0; i < N; i++ { for i := 0; i < N; i++ {
go func(i int) { remove.Go(func() error {
defer wg.Done()
s, err := q.Select(ctx) s, err := q.Select(ctx)
if err != nil { if err != nil {
errs <- err return err
} }
err = q.Delete(ctx, s) err = q.Delete(ctx, s)
if err != nil { if err != nil {
errs <- err return err
} }
entries <- s entries <- s
}(i) return nil
})
} }
wg.Wait()
close(errs) require.Empty(t, remove.Wait(), "unexpected queue.Select/Delete errors")
close(entries) close(entries)
if len(errs) > 0 {
for err := range errs {
t.Error(err)
}
t.Fatal("unexpected queue.Select/Delete errors")
}
var items []*pb.InjuredSegment var items []*pb.InjuredSegment
for segment := range entries { for segment := range entries {
items = append(items, segment) items = append(items, segment)