Piecestore space and bandwidth allocation check (#1527)

* Space and bandwidth allocation check

* use proper config flag

* one more check + tests

* use monitor to check space and bandwidth

* remove unused field

* check during read/write

* fix linter

* fix pieceid

* remove unused methods

* revert unneeded change
This commit is contained in:
Michal Niewrzal 2019-04-15 12:12:22 +02:00 committed by Stefan Benten
parent 15eec28d86
commit e922f71f61
7 changed files with 422 additions and 46 deletions

View File

@ -594,7 +594,7 @@ func (planet *Planet) newStorageNodes(count int, whitelistedSatelliteIDs []strin
},
Storage: psserver.Config{
Path: "", // TODO: this argument won't be needed with master storagenodedb
AllocatedDiskSpace: memory.TB,
AllocatedDiskSpace: 1500 * memory.GB,
AllocatedBandwidth: memory.TB,
KBucketRefreshInterval: time.Hour,

View File

@ -76,3 +76,14 @@ func (usage *Usage) Total() int64 {
usage.PutRepair +
usage.Delete
}
// TotalMonthlySummary returns total bandwidth usage for current month
func TotalMonthlySummary(ctx context.Context, db DB) (*Usage, error) {
return db.Summary(ctx, getBeginningOfMonth(), time.Now())
}
func getBeginningOfMonth() time.Time {
t := time.Now()
y, m, _ := t.Date()
return time.Date(y, m, 1, 0, 0, 0, 0, time.Now().Location())
}

View File

@ -61,24 +61,14 @@ func (inspector *Endpoint) retrieveStats(ctx context.Context) (*pb.StatSummaryRe
if err != nil {
return nil, err
}
// Bandwidth Usage
usage, err := inspector.usageDB.Summary(ctx, getBeginningOfMonth(), time.Now())
usage, err := bandwidth.TotalMonthlySummary(ctx, inspector.usageDB)
if err != nil {
return nil, err
}
ingress := usage.Put + usage.PutRepair
egress := usage.Get + usage.GetAudit + usage.GetRepair
totalUsedBandwidth := int64(0)
oldUsage, err := inspector.psdbDB.GetTotalBandwidthBetween(getBeginningOfMonth(), time.Now())
if err != nil {
inspector.log.Warn("unable to calculate old bandwidth usage")
} else {
totalUsedBandwidth = oldUsage
}
totalUsedBandwidth += usage.Total()
totalUsedBandwidth := usage.Total()
return &pb.StatSummaryResponse{
UsedSpace: totalUsedSpace,
@ -159,9 +149,3 @@ func (inspector *Endpoint) Dashboard(ctx context.Context, in *pb.DashboardReques
}
return data, nil
}
func getBeginningOfMonth() time.Time {
t := time.Now()
y, m, _ := t.Date()
return time.Date(y, m, 1, 0, 0, 0, 0, time.Now().Location())
}

View File

@ -64,11 +64,11 @@ func (service *Service) Run(ctx context.Context) (err error) {
// get the disk space details
// The returned path ends in a slash only if it represents a root directory, such as "/" on Unix or `C:\` on Windows.
info, err := service.store.StorageStatus()
storageStatus, err := service.store.StorageStatus()
if err != nil {
return Error.Wrap(err)
}
freeDiskSpace := info.DiskFree
freeDiskSpace := storageStatus.DiskFree
totalUsed, err := service.usedSpace(ctx)
if err != nil {
@ -100,7 +100,7 @@ func (service *Service) Run(ctx context.Context) (err error) {
service.log.Warn("Used more space than allocated. Allocating space", zap.Int64("bytes", service.allocatedDiskSpace))
}
// the available diskspace is less than remaining allocated space,
// the available disk space is less than remaining allocated space,
// due to change of setting before restarting
if freeDiskSpace < service.allocatedDiskSpace-totalUsed {
service.allocatedDiskSpace = freeDiskSpace
@ -151,15 +151,29 @@ func (service *Service) usedSpace(ctx context.Context) (int64, error) {
}
func (service *Service) usedBandwidth(ctx context.Context) (int64, error) {
usage, err := service.usageDB.Summary(ctx, getBeginningOfMonth(), time.Now())
usage, err := bandwidth.TotalMonthlySummary(ctx, service.usageDB)
if err != nil {
return 0, err
}
return usage.Total(), nil
}
func getBeginningOfMonth() time.Time {
t := time.Now()
y, m, _ := t.Date()
return time.Date(y, m, 1, 0, 0, 0, 0, time.Now().Location())
// AvailableSpace returns available disk space for upload
func (service *Service) AvailableSpace(ctx context.Context) (int64, error) {
usedSpace, err := service.pieceInfo.SpaceUsed(ctx)
if err != nil {
return 0, Error.Wrap(err)
}
allocatedSpace := service.allocatedDiskSpace
return allocatedSpace - usedSpace, nil
}
// AvailableBandwidth returns available bandwidth for upload/download
func (service *Service) AvailableBandwidth(ctx context.Context) (int64, error) {
usage, err := bandwidth.TotalMonthlySummary(ctx, service.usageDB)
if err != nil {
return 0, Error.Wrap(err)
}
allocatedBandwidth := service.allocatedBandwidth
return allocatedBandwidth - usage.Total(), nil
}

View File

@ -204,10 +204,23 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, config Config, ver
peer.Storage2.Store = pieces.NewStore(peer.Log.Named("pieces"), peer.DB.Pieces())
peer.Storage2.Monitor = monitor.NewService(
log.Named("piecestore:monitor"),
peer.Kademlia.RoutingTable,
peer.Storage2.Store,
peer.DB.PieceInfo(),
peer.DB.Bandwidth(),
config.Storage.AllocatedDiskSpace.Int64(),
config.Storage.AllocatedBandwidth.Int64(),
//TODO use config.Storage.Monitor.Interval, but for some reason is not set
config.Storage.KBucketRefreshInterval,
)
peer.Storage2.Endpoint, err = piecestore.NewEndpoint(
peer.Log.Named("piecestore"),
signing.SignerFromFullIdentity(peer.Identity),
peer.Storage2.Trust,
peer.Storage2.Monitor,
peer.Storage2.Store,
peer.DB.PieceInfo(),
peer.DB.Orders(),
@ -230,18 +243,6 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, config Config, ver
)
pb.RegisterPieceStoreInspectorServer(peer.Server.PrivateGRPC(), peer.Storage2.Inspector)
peer.Storage2.Monitor = monitor.NewService(
log.Named("piecestore:monitor"),
peer.Kademlia.RoutingTable,
peer.Storage2.Store,
peer.DB.PieceInfo(),
peer.DB.Bandwidth(),
config.Storage.AllocatedDiskSpace.Int64(),
config.Storage.AllocatedBandwidth.Int64(),
//TODO use config.Storage.Monitor.Interval, but for some reason is not set
config.Storage.KBucketRefreshInterval,
)
peer.Storage2.Sender = orders.NewSender(
log.Named("piecestore:orderssender"),
peer.Transport,

View File

@ -51,8 +51,9 @@ type Endpoint struct {
log *zap.Logger
config Config
signer signing.Signer
trust *trust.Pool
signer signing.Signer
trust *trust.Pool
monitor *monitor.Service
store *pieces.Store
pieceinfo pieces.DB
@ -62,13 +63,14 @@ type Endpoint struct {
}
// NewEndpoint creates a new piecestore endpoint.
func NewEndpoint(log *zap.Logger, signer signing.Signer, trust *trust.Pool, store *pieces.Store, pieceinfo pieces.DB, orders orders.DB, usage bandwidth.DB, usedSerials UsedSerials, config Config) (*Endpoint, error) {
func NewEndpoint(log *zap.Logger, signer signing.Signer, trust *trust.Pool, monitor *monitor.Service, store *pieces.Store, pieceinfo pieces.DB, orders orders.DB, usage bandwidth.DB, usedSerials UsedSerials, config Config) (*Endpoint, error) {
return &Endpoint{
log: log,
config: config,
signer: signer,
trust: trust,
signer: signer,
trust: trust,
monitor: monitor,
store: store,
pieceinfo: pieceinfo,
@ -163,6 +165,16 @@ func (endpoint *Endpoint) Upload(stream pb.Piecestore_UploadServer) (err error)
}
}()
availableBandwidth, err := endpoint.monitor.AvailableBandwidth(ctx)
if err != nil {
return ErrInternal.Wrap(err)
}
availableSpace, err := endpoint.monitor.AvailableSpace(ctx)
if err != nil {
return ErrInternal.Wrap(err)
}
largestOrder := pb.Order2{}
defer endpoint.SaveOrder(ctx, limit, &largestOrder, peer)
@ -192,11 +204,21 @@ func (endpoint *Endpoint) Upload(stream pb.Piecestore_UploadServer) (err error)
return ErrProtocol.New("chunk out of order") // TODO: report grpc status bad message
}
if largestOrder.Amount < pieceWriter.Size()+int64(len(message.Chunk.Data)) {
chunkSize := int64(len(message.Chunk.Data))
if largestOrder.Amount < pieceWriter.Size()+chunkSize {
// TODO: should we write currently and give a chance for uplink to remedy the situation?
return ErrProtocol.New("not enough allocated, allocated=%v writing=%v", largestOrder.Amount, pieceWriter.Size()+int64(len(message.Chunk.Data))) // TODO: report grpc status ?
}
availableBandwidth -= chunkSize
if availableBandwidth < 0 {
return ErrProtocol.New("out of bandwidth")
}
availableSpace -= chunkSize
if availableSpace < 0 {
return ErrProtocol.New("out of space")
}
if _, err := pieceWriter.Write(message.Chunk.Data); err != nil {
return ErrInternal.Wrap(err) // TODO: report grpc status internal server error
}
@ -317,6 +339,11 @@ func (endpoint *Endpoint) Download(stream pb.Piecestore_DownloadServer) (err err
return Error.New("requested more data than available, requesting=%v available=%v", chunk.Offset+chunk.ChunkSize, pieceReader.Size())
}
availableBandwidth, err := endpoint.monitor.AvailableBandwidth(ctx)
if err != nil {
return ErrInternal.Wrap(err)
}
throttle := sync2.NewThrottle()
// TODO: see whether this can be implemented without a goroutine
@ -389,7 +416,14 @@ func (endpoint *Endpoint) Download(stream pb.Piecestore_DownloadServer) (err err
if err := endpoint.VerifyOrder(ctx, peer, limit, message.Order, largestOrder.Amount); err != nil {
return err
}
if err := throttle.Produce(message.Order.Amount - largestOrder.Amount); err != nil {
chunkSize := message.Order.Amount - largestOrder.Amount
availableBandwidth -= chunkSize
if availableBandwidth < 0 {
return ErrProtocol.New("out of bandwidth")
}
if err := throttle.Produce(chunkSize); err != nil {
// shouldn't happen since only receiving side is calling Fail
return ErrInternal.Wrap(err)
}

View File

@ -0,0 +1,332 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information
package piecestore_test
import (
"context"
"fmt"
"math/rand"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/zeebo/errs"
"storj.io/storj/internal/memory"
"storj.io/storj/internal/testcontext"
"storj.io/storj/internal/testplanet"
"storj.io/storj/pkg/auth/signing"
"storj.io/storj/pkg/identity"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/storj"
"storj.io/storj/storagenode/pieces"
)
const oneWeek = 7 * 24 * time.Hour
func TestOrderLimitPutValidation(t *testing.T) {
for i, tt := range []struct {
useUnknownSatellite bool
pieceID storj.PieceID
action pb.PieceAction
serialNumber storj.SerialNumber
pieceExpiration time.Duration
orderExpiration time.Duration
limit int64
availableBandwidth int64
availableSpace int64
err string
}{
{ // unapproved satellite id
useUnknownSatellite: true,
pieceID: storj.PieceID{1},
action: pb.PieceAction_PUT,
serialNumber: storj.SerialNumber{1},
pieceExpiration: oneWeek,
orderExpiration: oneWeek,
limit: memory.KiB.Int64(),
err: " is untrusted",
},
{ // approved satellite id
pieceID: storj.PieceID{2},
action: pb.PieceAction_PUT,
serialNumber: storj.SerialNumber{2},
pieceExpiration: oneWeek,
orderExpiration: oneWeek,
limit: 10 * memory.KiB.Int64(),
},
{ // wrong action type
pieceID: storj.PieceID{3},
action: pb.PieceAction_GET,
serialNumber: storj.SerialNumber{3},
pieceExpiration: oneWeek,
orderExpiration: oneWeek,
limit: memory.KiB.Int64(),
err: "expected put or put repair action got GET",
},
{ // piece expired
pieceID: storj.PieceID{4},
action: pb.PieceAction_PUT,
serialNumber: storj.SerialNumber{4},
pieceExpiration: -4 * 24 * time.Hour,
orderExpiration: oneWeek,
limit: memory.KiB.Int64(),
err: "piece expired:",
},
{ // limit is negative
pieceID: storj.PieceID{5},
action: pb.PieceAction_PUT,
serialNumber: storj.SerialNumber{5},
pieceExpiration: oneWeek,
orderExpiration: oneWeek,
limit: -1,
err: "order limit is negative",
},
{ // order limit expired
pieceID: storj.PieceID{6},
action: pb.PieceAction_PUT,
serialNumber: storj.SerialNumber{6},
pieceExpiration: oneWeek,
orderExpiration: -4 * 24 * time.Hour,
limit: memory.KiB.Int64(),
err: "order expired:",
},
{ // allocated bandwidth limit
pieceID: storj.PieceID{7},
action: pb.PieceAction_PUT,
serialNumber: storj.SerialNumber{7},
pieceExpiration: oneWeek,
orderExpiration: oneWeek,
limit: 10 * memory.KiB.Int64(),
availableBandwidth: 5 * memory.KiB.Int64(),
err: "out of bandwidth",
},
{ // allocated space limit
pieceID: storj.PieceID{8},
action: pb.PieceAction_PUT,
serialNumber: storj.SerialNumber{8},
pieceExpiration: oneWeek,
orderExpiration: oneWeek,
limit: 10 * memory.KiB.Int64(),
availableSpace: 5 * memory.KiB.Int64(),
err: "out of space",
},
} {
ctx := testcontext.New(t)
defer ctx.Cleanup()
planet, err := testplanet.New(t, 1, 1, 1)
require.NoError(t, err)
defer ctx.Check(planet.Shutdown)
planet.Start(ctx)
// set desirable bandwidth
setBandwidth(ctx, t, planet, tt.availableBandwidth)
// set desirable space
setSpace(ctx, t, planet, tt.availableSpace)
client, err := planet.Uplinks[0].DialPiecestore(ctx, planet.StorageNodes[0])
require.NoError(t, err)
signer := signing.SignerFromFullIdentity(planet.Satellites[0].Identity)
satellite := planet.Satellites[0].Identity
if tt.useUnknownSatellite {
unapprovedSatellite, err := planet.NewIdentity()
require.NoError(t, err)
signer = signing.SignerFromFullIdentity(unapprovedSatellite)
satellite = unapprovedSatellite
}
orderLimit := GenerateOrderLimit(
t,
satellite.ID,
planet.Uplinks[0].ID(),
planet.StorageNodes[0].ID(),
tt.pieceID,
tt.action,
tt.serialNumber,
tt.pieceExpiration,
tt.orderExpiration,
tt.limit,
)
orderLimit, err = signing.SignOrderLimit(signer, orderLimit)
require.NoError(t, err)
uploader, err := client.Upload(ctx, orderLimit)
require.NoError(t, err)
var writeErr error
buffer := make([]byte, memory.KiB)
for i := 0; i < 10; i++ {
_, _ = rand.Read(buffer)
_, writeErr = uploader.Write(buffer)
if writeErr != nil {
break
}
}
_, commitErr := uploader.Commit()
err = errs.Combine(writeErr, commitErr)
testIndex := fmt.Sprintf("#%d", i)
if tt.err != "" {
require.Error(t, err, testIndex)
require.Contains(t, err.Error(), tt.err, testIndex)
} else {
require.NoError(t, err, testIndex)
}
}
}
func TestOrderLimitGetValidation(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
planet, err := testplanet.New(t, 1, 1, 1)
require.NoError(t, err)
defer ctx.Check(planet.Shutdown)
planet.Start(ctx)
defaultPieceSize := 10 * memory.KiB
for _, storageNode := range planet.StorageNodes {
err = storageNode.DB.Bandwidth().Add(ctx, planet.Satellites[0].ID(), pb.PieceAction_GET, memory.TB.Int64()-(15*memory.KiB.Int64()), time.Now())
require.NoError(t, err)
}
{ // upload test piece
client, err := planet.Uplinks[0].DialPiecestore(ctx, planet.StorageNodes[0])
require.NoError(t, err)
signer := signing.SignerFromFullIdentity(planet.Satellites[0].Identity)
satellite := planet.Satellites[0].Identity
orderLimit := GenerateOrderLimit(
t,
satellite.ID,
planet.Uplinks[0].ID(),
planet.StorageNodes[0].ID(),
storj.PieceID{1},
pb.PieceAction_PUT,
storj.SerialNumber{0},
oneWeek,
oneWeek,
defaultPieceSize.Int64(),
)
orderLimit, err = signing.SignOrderLimit(signer, orderLimit)
require.NoError(t, err)
uploader, err := client.Upload(ctx, orderLimit)
require.NoError(t, err)
data := make([]byte, defaultPieceSize)
_, _ = rand.Read(data)
_, err = uploader.Write(data)
require.NoError(t, err)
_, err = uploader.Commit()
require.NoError(t, err)
}
for _, tt := range []struct {
satellite *identity.FullIdentity
pieceID storj.PieceID
action pb.PieceAction
serialNumber storj.SerialNumber
pieceExpiration time.Duration
orderExpiration time.Duration
limit int64
err string
}{
{ // allocated bandwidth limit
pieceID: storj.PieceID{1},
action: pb.PieceAction_GET,
serialNumber: storj.SerialNumber{1},
pieceExpiration: oneWeek,
orderExpiration: oneWeek,
limit: 10 * memory.KiB.Int64(),
err: "out of bandwidth",
},
} {
client, err := planet.Uplinks[0].DialPiecestore(ctx, planet.StorageNodes[0])
require.NoError(t, err)
signer := signing.SignerFromFullIdentity(planet.Satellites[0].Identity)
satellite := planet.Satellites[0].Identity
if tt.satellite != nil {
signer = signing.SignerFromFullIdentity(tt.satellite)
satellite = tt.satellite
}
orderLimit := GenerateOrderLimit(
t,
satellite.ID,
planet.Uplinks[0].ID(),
planet.StorageNodes[0].ID(),
tt.pieceID,
tt.action,
tt.serialNumber,
tt.pieceExpiration,
tt.orderExpiration,
tt.limit,
)
orderLimit, err = signing.SignOrderLimit(signer, orderLimit)
require.NoError(t, err)
downloader, err := client.Download(ctx, orderLimit, 0, tt.limit)
require.NoError(t, err)
var readErr error
buffer := make([]byte, memory.KiB)
for i := 0; i < 10; i++ {
_, readErr = downloader.Read(buffer)
if readErr != nil {
break
}
}
closeErr := downloader.Close()
err = errs.Combine(readErr, closeErr)
if tt.err != "" {
require.Error(t, err)
require.Contains(t, err.Error(), tt.err)
} else {
require.NoError(t, err)
}
}
}
func setBandwidth(ctx context.Context, t *testing.T, planet *testplanet.Planet, bandwidth int64) {
if bandwidth == 0 {
return
}
for _, storageNode := range planet.StorageNodes {
availableBandwidth, err := storageNode.Storage2.Monitor.AvailableBandwidth(ctx)
require.NoError(t, err)
diff := (bandwidth - availableBandwidth) * -1
err = storageNode.DB.Bandwidth().Add(ctx, planet.Satellites[0].ID(), pb.PieceAction_GET, diff, time.Now())
require.NoError(t, err)
}
}
func setSpace(ctx context.Context, t *testing.T, planet *testplanet.Planet, space int64) {
if space == 0 {
return
}
for _, storageNode := range planet.StorageNodes {
availableSpace, err := storageNode.Storage2.Monitor.AvailableSpace(ctx)
require.NoError(t, err)
diff := (space - availableSpace) * -1
err = storageNode.DB.PieceInfo().Add(ctx, &pieces.Info{
SatelliteID: planet.Satellites[0].ID(),
PieceID: storj.PieceID{99},
PieceSize: diff,
Uplink: planet.Uplinks[0].Identity.PeerIdentity(),
UplinkPieceHash: &pb.PieceHash{},
})
require.NoError(t, err)
}
}