satellite/metainfo: check bucket existence on upload and listing
Initial change for checking bucket existence on satellite side for requests like BeginObject and ListObjects. This is simple implementation that is just checking bucket in DB but should be improved in future to avoid DB calls as much as possible. Part of https://storjlabs.atlassian.net/browse/USR-365 Change-Id: I9076acddc44d7dbfa7612a1c24a007de01621583
This commit is contained in:
parent
b30a9cb1f9
commit
81afbcc12e
@ -1134,6 +1134,21 @@ func (endpoint *Endpoint) BeginObject(ctx context.Context, req *pb.ObjectBeginRe
|
||||
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, "Invalid expiration time")
|
||||
}
|
||||
|
||||
if len(req.Bucket) == 0 {
|
||||
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, storj.ErrNoBucket.New("").Error())
|
||||
}
|
||||
|
||||
// TODO this needs to be optimized to avoid DB call on each request
|
||||
_, err = endpoint.metainfo.GetBucket(ctx, req.Bucket, keyInfo.ProjectID)
|
||||
if err != nil {
|
||||
if storj.ErrBucketNotFound.Has(err) {
|
||||
return nil, rpcstatus.Error(rpcstatus.NotFound, err.Error())
|
||||
}
|
||||
|
||||
endpoint.log.Error("unable to check bucket", zap.Error(err))
|
||||
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
|
||||
}
|
||||
|
||||
// use only satellite values for Redundancy Scheme
|
||||
pbRS := endpoint.redundancyScheme()
|
||||
|
||||
@ -1383,6 +1398,17 @@ func (endpoint *Endpoint) ListObjects(ctx context.Context, req *pb.ObjectListReq
|
||||
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
|
||||
}
|
||||
|
||||
// TODO this needs to be optimized to avoid DB call on each request
|
||||
_, err = endpoint.metainfo.GetBucket(ctx, req.Bucket, keyInfo.ProjectID)
|
||||
if err != nil {
|
||||
if storj.ErrBucketNotFound.Has(err) {
|
||||
return nil, rpcstatus.Error(rpcstatus.NotFound, err.Error())
|
||||
}
|
||||
|
||||
endpoint.log.Error("unable to check bucket", zap.Error(err))
|
||||
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
|
||||
}
|
||||
|
||||
prefix, err := CreatePath(ctx, keyInfo.ProjectID, lastSegment, req.Bucket, req.EncryptedPrefix)
|
||||
if err != nil {
|
||||
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
|
||||
|
@ -12,6 +12,7 @@ import (
|
||||
"github.com/gogo/protobuf/proto"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/zeebo/errs"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"storj.io/common/errs2"
|
||||
@ -454,6 +455,34 @@ func TestListGetObjects(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func TestBucketExistenceCheck(t *testing.T) {
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 0, UplinkCount: 1,
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
apiKey := planet.Uplinks[0].APIKey[planet.Satellites[0].ID()]
|
||||
|
||||
metainfoClient, err := planet.Uplinks[0].DialMetainfo(ctx, planet.Satellites[0], apiKey)
|
||||
require.NoError(t, err)
|
||||
defer ctx.Check(metainfoClient.Close)
|
||||
|
||||
// test object methods for bucket existence check
|
||||
_, err = metainfoClient.BeginObject(ctx, metainfo.BeginObjectParams{
|
||||
Bucket: []byte("non-existing-bucket"),
|
||||
EncryptedPath: []byte("encrypted-path"),
|
||||
})
|
||||
require.Error(t, err)
|
||||
require.True(t, errs2.IsRPC(err, rpcstatus.NotFound))
|
||||
require.Equal(t, storj.ErrBucketNotFound.New("%s", "non-existing-bucket").Error(), errs.Unwrap(err).Error())
|
||||
|
||||
_, _, err = metainfoClient.ListObjects(ctx, metainfo.ListObjectsParams{
|
||||
Bucket: []byte("non-existing-bucket"),
|
||||
})
|
||||
require.Error(t, err)
|
||||
require.True(t, errs2.IsRPC(err, rpcstatus.NotFound))
|
||||
require.Equal(t, storj.ErrBucketNotFound.New("%s", "non-existing-bucket").Error(), errs.Unwrap(err).Error())
|
||||
})
|
||||
}
|
||||
|
||||
func TestBeginCommitListSegment(t *testing.T) {
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
||||
|
@ -237,7 +237,7 @@ func (endpoint *Endpoint) validateBucket(ctx context.Context, bucket []byte) (er
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
if len(bucket) == 0 {
|
||||
return Error.New("bucket not specified")
|
||||
return Error.Wrap(storj.ErrNoBucket.New(""))
|
||||
}
|
||||
|
||||
if len(bucket) < 3 || len(bucket) > 63 {
|
||||
|
@ -73,7 +73,7 @@ func (db *bucketsDB) GetBucket(ctx context.Context, bucketName []byte, projectID
|
||||
)
|
||||
if err != nil {
|
||||
if err == sql.ErrNoRows {
|
||||
return storj.Bucket{}, storj.ErrBucketNotFound.Wrap(err)
|
||||
return storj.Bucket{}, storj.ErrBucketNotFound.New("%s", bucketName)
|
||||
}
|
||||
return storj.Bucket{}, storj.ErrBucket.Wrap(err)
|
||||
}
|
||||
@ -109,7 +109,7 @@ func (db *bucketsDB) DeleteBucket(ctx context.Context, bucketName []byte, projec
|
||||
return storj.ErrBucket.Wrap(err)
|
||||
}
|
||||
if !deleted {
|
||||
return storj.ErrBucketNotFound.New("")
|
||||
return storj.ErrBucketNotFound.New("%s", bucketName)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user