storagenode/piecestore: available storage check added in Upload

Change-Id: I71e9e5f335d4320d5de8b374fe747fec43179f78
This commit is contained in:
Qweder93 2020-05-22 21:12:26 +03:00 committed by Nikolai Siedov
parent df0ef7e0cd
commit 89c9672ce0
3 changed files with 134 additions and 0 deletions

View File

@ -0,0 +1,56 @@
// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package testblobs
import (
"go.uber.org/zap"
"storj.io/storj/storage"
"storj.io/storj/storagenode"
)
// ensures that limitedSpaceDB implements storagenode.DB.
var _ storagenode.DB = (*limitedSpaceDB)(nil)
// limitedSpaceDB implements storage node DB with limited free space.
type limitedSpaceDB struct {
storagenode.DB
log *zap.Logger
blobs *LimitedSpaceBlobs
}
// NewLimitedSpaceDB creates a new storage node DB with limited free space.
func NewLimitedSpaceDB(log *zap.Logger, db storagenode.DB, freeSpace int64) storagenode.DB {
return &limitedSpaceDB{
DB: db,
blobs: newLimitedSpaceBlobs(log, db.Pieces(), freeSpace),
log: log,
}
}
// Pieces returns the blob store.
func (lim *limitedSpaceDB) Pieces() storage.Blobs {
return lim.blobs
}
// LimitedSpaceBlobs implements a limited space blob store.
type LimitedSpaceBlobs struct {
storage.Blobs
log *zap.Logger
freeSpace int64
}
// newLimitedSpaceBlobs creates a new limited space blob store wrapping the provided blobs.
func newLimitedSpaceBlobs(log *zap.Logger, blobs storage.Blobs, freeSpace int64) *LimitedSpaceBlobs {
return &LimitedSpaceBlobs{
log: log,
Blobs: blobs,
freeSpace: freeSpace,
}
}
// FreeSpace returns how much free space left for writing.
func (limspace *LimitedSpaceBlobs) FreeSpace() (int64, error) {
return limspace.freeSpace, nil
}

View File

@ -219,6 +219,15 @@ func (endpoint *Endpoint) Upload(stream pb.DRPCPiecestore_UploadStream) (err err
}
limit := message.Limit
status, err := endpoint.store.StorageStatus(ctx)
if err != nil {
return err
}
if status.DiskFree < limit.Limit {
return rpcstatus.Errorf(rpcstatus.Aborted, "not enough available disk space, have: %v, need: %v", status.DiskFree, limit.Limit)
}
// TODO: verify that we have have expected amount of storage before continuing
if limit.Action != pb.PieceAction_PUT && limit.Action != pb.PieceAction_PUT_REPAIR {

View File

@ -25,6 +25,7 @@ import (
"storj.io/common/storj"
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/storj/private/testblobs"
"storj.io/storj/private/testplanet"
"storj.io/storj/storagenode"
"storj.io/storj/storagenode/bandwidth"
@ -162,6 +163,74 @@ func TestUpload(t *testing.T) {
})
}
func TestUploadOverAvailable(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
StorageNodeDB: func(index int, db storagenode.DB, log *zap.Logger) (storagenode.DB, error) {
return testblobs.NewLimitedSpaceDB(log.Named("overload"), db, 3000000), nil
},
StorageNode: func(index int, config *storagenode.Config) {
config.Storage2.Monitor.MinimumDiskSpace = 3 * memory.MB
},
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
client, err := planet.Uplinks[0].DialPiecestore(ctx, planet.StorageNodes[0])
require.NoError(t, err)
defer ctx.Check(client.Close)
var tt struct {
pieceID storj.PieceID
contentLength memory.Size
action pb.PieceAction
err string
}
tt.pieceID = storj.PieceID{1}
tt.contentLength = 5 * memory.MB
tt.action = pb.PieceAction_PUT
tt.err = "not enough available disk space, have: 3000000, need: 5000000"
data := testrand.Bytes(tt.contentLength)
serialNumber := testrand.SerialNumber()
orderLimit, piecePrivateKey := GenerateOrderLimit(
t,
planet.Satellites[0].ID(),
planet.StorageNodes[0].ID(),
tt.pieceID,
tt.action,
serialNumber,
24*time.Hour,
24*time.Hour,
int64(len(data)),
)
signer := signing.SignerFromFullIdentity(planet.Satellites[0].Identity)
orderLimit, err = signing.SignOrderLimit(ctx, signer, orderLimit)
require.NoError(t, err)
uploader, err := client.Upload(ctx, orderLimit, piecePrivateKey)
require.NoError(t, err)
_, err = uploader.Write(data)
require.Error(t, err)
pieceHash, err := uploader.Commit(ctx)
if tt.err != "" {
require.Error(t, err)
require.Contains(t, err.Error(), tt.err)
} else {
require.NoError(t, err)
expectedHash := pkcrypto.SHA256Hash(data)
assert.Equal(t, expectedHash, pieceHash.Hash)
signee := signing.SignerFromFullIdentity(planet.StorageNodes[0].Identity)
require.NoError(t, signing.VerifyPieceHashSignature(ctx, signee, pieceHash))
}
})
}
func TestDownload(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 1,