satellite/metainfo: parallel deletion tests
- parallel deletion of 50 objects and their 50 copies (one copy per object) This test is skipped because it's creating deadlocks that are not automatically retried on postgres - parallel deletion of 1 object and its 50 copies. Fixes https://github.com/storj/storj/issues/4745 Change-Id: Id7a28251c06bb12b5edcc88721f60bf7a4bc0492
This commit is contained in:
parent
b74340b1d6
commit
4725a3878c
@ -9,11 +9,13 @@ import (
|
|||||||
"net"
|
"net"
|
||||||
"sort"
|
"sort"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
"github.com/zeebo/errs"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
|
||||||
"storj.io/common/errs2"
|
"storj.io/common/errs2"
|
||||||
@ -1331,3 +1333,103 @@ func TestEndpoint_CopyObject(t *testing.T) {
|
|||||||
require.Equal(t, originalSegment.EncryptedInlineData, copiedSegment.EncryptedInlineData)
|
require.Equal(t, originalSegment.EncryptedInlineData, copiedSegment.EncryptedInlineData)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestEndpoint_ParallelDeletes(t *testing.T) {
|
||||||
|
t.Skip("to be fixed - creating deadlocks")
|
||||||
|
testplanet.Run(t, testplanet.Config{
|
||||||
|
SatelliteCount: 1,
|
||||||
|
StorageNodeCount: 4,
|
||||||
|
UplinkCount: 1,
|
||||||
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||||
|
project, err := planet.Uplinks[0].OpenProject(ctx, planet.Satellites[0])
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer ctx.Check(project.Close)
|
||||||
|
testData := testrand.Bytes(5 * memory.KiB)
|
||||||
|
for i := 0; i < 50; i++ {
|
||||||
|
err := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "bucket", "object"+strconv.Itoa(i), testData)
|
||||||
|
require.NoError(t, err)
|
||||||
|
_, err = project.CopyObject(ctx, "bucket", "object"+strconv.Itoa(i), "bucket", "object"+strconv.Itoa(i)+"copy", nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
list := project.ListObjects(ctx, "bucket", nil)
|
||||||
|
keys := []string{}
|
||||||
|
for list.Next() {
|
||||||
|
item := list.Item()
|
||||||
|
keys = append(keys, item.Key)
|
||||||
|
}
|
||||||
|
require.NoError(t, list.Err())
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
wg.Add(len(keys))
|
||||||
|
var errlist errs.Group
|
||||||
|
|
||||||
|
for i, name := range keys {
|
||||||
|
name := name
|
||||||
|
go func(toDelete string, index int) {
|
||||||
|
_, err := project.DeleteObject(ctx, "bucket", toDelete)
|
||||||
|
errlist.Add(err)
|
||||||
|
wg.Done()
|
||||||
|
}(name, i)
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
require.NoError(t, errlist.Err())
|
||||||
|
|
||||||
|
// check all objects have been deleted
|
||||||
|
listAfterDelete := project.ListObjects(ctx, "bucket", nil)
|
||||||
|
require.False(t, listAfterDelete.Next())
|
||||||
|
require.NoError(t, listAfterDelete.Err())
|
||||||
|
|
||||||
|
_, err = project.DeleteBucket(ctx, "bucket")
|
||||||
|
require.NoError(t, err)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestEndpoint_ParallelDeletesSameAncestor(t *testing.T) {
|
||||||
|
t.Skip("to be fixed - creating deadlocks")
|
||||||
|
testplanet.Run(t, testplanet.Config{
|
||||||
|
SatelliteCount: 1,
|
||||||
|
StorageNodeCount: 4,
|
||||||
|
UplinkCount: 1,
|
||||||
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||||
|
project, err := planet.Uplinks[0].OpenProject(ctx, planet.Satellites[0])
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer ctx.Check(project.Close)
|
||||||
|
testData := testrand.Bytes(5 * memory.KiB)
|
||||||
|
err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "bucket", "original-object", testData)
|
||||||
|
require.NoError(t, err)
|
||||||
|
for i := 0; i < 50; i++ {
|
||||||
|
_, err = project.CopyObject(ctx, "bucket", "original-object", "bucket", "copy"+strconv.Itoa(i), nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
list := project.ListObjects(ctx, "bucket", nil)
|
||||||
|
keys := []string{}
|
||||||
|
for list.Next() {
|
||||||
|
item := list.Item()
|
||||||
|
keys = append(keys, item.Key)
|
||||||
|
}
|
||||||
|
require.NoError(t, list.Err())
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
wg.Add(len(keys))
|
||||||
|
var errlist errs.Group
|
||||||
|
|
||||||
|
for i, name := range keys {
|
||||||
|
name := name
|
||||||
|
go func(toDelete string, index int) {
|
||||||
|
_, err := project.DeleteObject(ctx, "bucket", toDelete)
|
||||||
|
errlist.Add(err)
|
||||||
|
wg.Done()
|
||||||
|
}(name, i)
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
require.NoError(t, errlist.Err())
|
||||||
|
|
||||||
|
// check all objects have been deleted
|
||||||
|
listAfterDelete := project.ListObjects(ctx, "bucket", nil)
|
||||||
|
require.False(t, listAfterDelete.Next())
|
||||||
|
require.NoError(t, listAfterDelete.Err())
|
||||||
|
|
||||||
|
_, err = project.DeleteBucket(ctx, "bucket")
|
||||||
|
require.NoError(t, err)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user