Jess G 022f5d2e14
storagenode: add space used cache for pieces (#2753)
* add cache, update cache w/piece create/delete

* add service w/loop to cache to recalculate space used cache

* add piecestore cache to other sn svcs to use

* add table to persist the total space used

* rm cache where not needed

* rm stuff from sn svcs

* start fixing tests, changes per comments

* update commits

* add unit tests

* fix commiting before we write header bytes

* fix cache create test

* copy cache map, add started back to recalc

* fix test

* add test, update comments
2019-08-12 14:43:05 -07:00

720 lines
26 KiB

// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package piecestore
import (
monkit ""
var (
mon = monkit.Package()
// Error is the default error class for piecestore errors
Error = errs.Class("piecestore")
// ErrProtocol is the default error class for protocol errors.
ErrProtocol = errs.Class("piecestore protocol")
// ErrInternal is the default error class for internal piecestore errors.
ErrInternal = errs.Class("piecestore internal")
var _ pb.PiecestoreServer = (*Endpoint)(nil)
// OldConfig contains everything necessary for a server
type OldConfig struct {
Path string `help:"path to store data in" default:"$CONFDIR/storage"`
WhitelistedSatellites storj.NodeURLs `help:"a comma-separated list of approved satellite node urls" devDefault:"" releaseDefault:",,,"`
AllocatedDiskSpace memory.Size `user:"true" help:"total allocated disk space in bytes" default:"1TB"`
AllocatedBandwidth memory.Size `user:"true" help:"total allocated bandwidth in bytes" default:"2TB"`
KBucketRefreshInterval time.Duration `help:"how frequently Kademlia bucket should be refreshed with node stats" default:"1h0m0s"`
// Config defines parameters for piecestore endpoint.
type Config struct {
ExpirationGracePeriod time.Duration `help:"how soon before expiration date should things be considered expired" default:"48h0m0s"`
MaxConcurrentRequests int `help:"how many concurrent requests are allowed, before uploads are rejected." default:"6"`
OrderLimitGracePeriod time.Duration `help:"how long after OrderLimit creation date are OrderLimits no longer accepted" default:"1h0m0s"`
RetainTimeBuffer time.Duration `help:"allows for small differences in the satellite and storagenode clocks" default:"1h0m0s"`
RetainStatus RetainStatus `help:"allows configuration to enable, disable, or test retain requests from the satellite. Options: (disabled/enabled/debug)" default:"disabled"`
CacheSyncInterval time.Duration `help:"how often the space used cache is synced to persistent storage" releaseDefault:"1h0m0s" devDefault:"0h1m0s"`
Monitor monitor.Config
Sender orders.SenderConfig
// RetainStatus is a type defining the enabled/disabled status of retain requests
type RetainStatus uint32
const (
// RetainDisabled means we do not do anything with retain requests
RetainDisabled RetainStatus = iota + 1
// RetainEnabled means we fully enable retain requests and delete data not defined by bloom filter
// RetainDebug means we partially enable retain requests, and print out pieces we should delete, without actually deleting them
// Set implements pflag.Value
func (v *RetainStatus) Set(s string) error {
switch s {
case "disabled":
*v = RetainDisabled
case "enabled":
*v = RetainEnabled
case "debug":
*v = RetainDebug
return Error.New("invalid RetainStatus %q", s)
return nil
// Type implements pflag.Value
func (*RetainStatus) Type() string { return "storj.RetainStatus" }
// String implements pflag.Value
func (v *RetainStatus) String() string {
switch *v {
case RetainDisabled:
return "disabled"
case RetainEnabled:
return "enabled"
case RetainDebug:
return "debug"
return "invalid"
// Endpoint implements uploading, downloading and deleting for a storage node.
type Endpoint struct {
log *zap.Logger
config Config
signer signing.Signer
trust *trust.Pool
monitor *monitor.Service
store *pieces.Store
orders orders.DB
usage bandwidth.DB
usedSerials UsedSerials
liveRequests int32
// NewEndpoint creates a new piecestore endpoint.
func NewEndpoint(log *zap.Logger, signer signing.Signer, trust *trust.Pool, monitor *monitor.Service, store *pieces.Store, orders orders.DB, usage bandwidth.DB, usedSerials UsedSerials, config Config) (*Endpoint, error) {
return &Endpoint{
log: log,
config: config,
signer: signer,
trust: trust,
monitor: monitor,
store: store,
orders: orders,
usage: usage,
usedSerials: usedSerials,
liveRequests: 0,
}, nil
var monLiveRequests = mon.TaskNamed("live-request")
// Delete handles deleting a piece on piece store.
func (endpoint *Endpoint) Delete(ctx context.Context, delete *pb.PieceDeleteRequest) (_ *pb.PieceDeleteResponse, err error) {
defer monLiveRequests(&ctx)(&err)
defer mon.Task()(&ctx)(&err)
atomic.AddInt32(&endpoint.liveRequests, 1)
defer atomic.AddInt32(&endpoint.liveRequests, -1)
if delete.Limit.Action != pb.PieceAction_DELETE {
return nil, Error.New("expected delete action got %v", delete.Limit.Action) // TODO: report grpc status unauthorized or bad request
if err := endpoint.verifyOrderLimit(ctx, delete.Limit); err != nil {
// TODO: report grpc status unauthorized or bad request
return nil, Error.Wrap(err)
if err :=, delete.Limit.SatelliteId, delete.Limit.PieceId); err != nil {
// explicitly ignoring error because the errors
// TODO: add more debug info
endpoint.log.Error("delete failed", zap.Stringer("Piece ID", delete.Limit.PieceId), zap.Error(err))
// TODO: report internal server internal or missing error using grpc status,
// e.g. missing might happen when we get a deletion request after garbage collection has deleted it
} else {
endpoint.log.Info("deleted", zap.Stringer("Piece ID", delete.Limit.PieceId))
return &pb.PieceDeleteResponse{}, nil
// Upload handles uploading a piece on piece store.
func (endpoint *Endpoint) Upload(stream pb.Piecestore_UploadServer) (err error) {
ctx := stream.Context()
defer monLiveRequests(&ctx)(&err)
defer mon.Task()(&ctx)(&err)
liveRequests := atomic.AddInt32(&endpoint.liveRequests, 1)
defer atomic.AddInt32(&endpoint.liveRequests, -1)
if int(liveRequests) > endpoint.config.MaxConcurrentRequests {
endpoint.log.Error("upload rejected, too many requests", zap.Int32("live requests", liveRequests))
return status.Error(codes.Unavailable, "storage node overloaded")
startTime := time.Now().UTC()
// TODO: set connection timeouts
// TODO: set maximum message size
var message *pb.PieceUploadRequest
message, err = stream.Recv()
switch {
case err != nil:
return ErrProtocol.Wrap(err)
case message == nil:
return ErrProtocol.New("expected a message")
case message.Limit == nil:
return ErrProtocol.New("expected order limit as the first message")
limit := message.Limit
endpoint.log.Info("upload started", zap.Stringer("Piece ID", limit.PieceId), zap.Stringer("SatelliteID", limit.SatelliteId), zap.Stringer("Action", limit.Action))
// TODO: verify that we have have expected amount of storage before continuing
if limit.Action != pb.PieceAction_PUT && limit.Action != pb.PieceAction_PUT_REPAIR {
return ErrProtocol.New("expected put or put repair action got %v", limit.Action) // TODO: report grpc status unauthorized or bad request
if err := endpoint.verifyOrderLimit(ctx, limit); err != nil {
return err
var pieceWriter *pieces.Writer
defer func() {
endTime := time.Now().UTC()
dt := endTime.Sub(startTime)
uploadSize := int64(0)
if pieceWriter != nil {
uploadSize = pieceWriter.Size()
uploadRate := float64(0)
if dt.Seconds() > 0 {
uploadRate = float64(uploadSize) / dt.Seconds()
uploadDuration := dt.Nanoseconds()
if err != nil {
endpoint.log.Info("upload failed", zap.Stringer("Piece ID", limit.PieceId), zap.Stringer("SatelliteID", limit.SatelliteId), zap.Stringer("Action", limit.Action), zap.Error(err))
} else {
endpoint.log.Info("uploaded", zap.Stringer("Piece ID", limit.PieceId), zap.Stringer("SatelliteID", limit.SatelliteId), zap.Stringer("Action", limit.Action))
pieceWriter, err =, limit.SatelliteId, limit.PieceId)
if err != nil {
return ErrInternal.Wrap(err) // TODO: report grpc status internal server error
defer func() {
// cancel error if it hasn't been committed
if cancelErr := pieceWriter.Cancel(ctx); cancelErr != nil {
endpoint.log.Error("error during canceling a piece write", zap.Error(cancelErr))
availableBandwidth, err := endpoint.monitor.AvailableBandwidth(ctx)
if err != nil {
return ErrInternal.Wrap(err)
availableSpace, err := endpoint.monitor.AvailableSpace(ctx)
if err != nil {
return ErrInternal.Wrap(err)
largestOrder := pb.Order{}
defer endpoint.saveOrder(ctx, limit, &largestOrder)
for {
message, err = stream.Recv() // TODO: reuse messages to avoid allocations
if err == io.EOF {
return ErrProtocol.New("unexpected EOF")
} else if err != nil {
return ErrProtocol.Wrap(err) // TODO: report grpc status bad message
if message == nil {
return ErrProtocol.New("expected a message") // TODO: report grpc status bad message
if message.Order == nil && message.Chunk == nil && message.Done == nil {
return ErrProtocol.New("expected a message") // TODO: report grpc status bad message
if message.Order != nil {
if err := endpoint.VerifyOrder(ctx, limit, message.Order, largestOrder.Amount); err != nil {
return err
largestOrder = *message.Order
if message.Chunk != nil {
if message.Chunk.Offset != pieceWriter.Size() {
return ErrProtocol.New("chunk out of order") // TODO: report grpc status bad message
chunkSize := int64(len(message.Chunk.Data))
if largestOrder.Amount < pieceWriter.Size()+chunkSize {
// TODO: should we write currently and give a chance for uplink to remedy the situation?
return ErrProtocol.New("not enough allocated, allocated=%v writing=%v", largestOrder.Amount, pieceWriter.Size()+int64(len(message.Chunk.Data))) // TODO: report grpc status ?
availableBandwidth -= chunkSize
if availableBandwidth < 0 {
return ErrProtocol.New("out of bandwidth")
availableSpace -= chunkSize
if availableSpace < 0 {
return ErrProtocol.New("out of space")
if _, err := pieceWriter.Write(message.Chunk.Data); err != nil {
return ErrInternal.Wrap(err) // TODO: report grpc status internal server error
if message.Done != nil {
calculatedHash := pieceWriter.Hash()
if err := endpoint.VerifyPieceHash(ctx, limit, message.Done, calculatedHash); err != nil {
return err // TODO: report grpc status internal server error
if message.Done.PieceSize != pieceWriter.Size() {
return ErrProtocol.New("Size of finished piece does not match size declared by uplink! %d != %d",
message.Done.PieceSize, pieceWriter.Size())
info := &pb.PieceHeader{
Hash: calculatedHash,
CreationTime: message.Done.Timestamp,
Signature: message.Done.GetSignature(),
OrderLimit: *limit,
if err := pieceWriter.Commit(ctx, info); err != nil {
return ErrInternal.Wrap(err) // TODO: report grpc status internal server error
if !limit.PieceExpiration.IsZero() {
err :=, limit.SatelliteId, limit.PieceId, limit.PieceExpiration)
if err != nil {
return ErrInternal.Wrap(err) // TODO: report grpc status internal server error
storageNodeHash, err := signing.SignPieceHash(ctx, endpoint.signer, &pb.PieceHash{
PieceId: limit.PieceId,
Hash: calculatedHash,
PieceSize: pieceWriter.Size(),
Timestamp: time.Now(),
if err != nil {
return ErrInternal.Wrap(err)
closeErr := stream.SendAndClose(&pb.PieceUploadResponse{
Done: storageNodeHash,
return ErrProtocol.Wrap(ignoreEOF(closeErr))
// Download implements downloading a piece from piece store.
func (endpoint *Endpoint) Download(stream pb.Piecestore_DownloadServer) (err error) {
ctx := stream.Context()
defer monLiveRequests(&ctx)(&err)
defer mon.Task()(&ctx)(&err)
atomic.AddInt32(&endpoint.liveRequests, 1)
defer atomic.AddInt32(&endpoint.liveRequests, -1)
startTime := time.Now().UTC()
// TODO: set connection timeouts
// TODO: set maximum message size
var message *pb.PieceDownloadRequest
// receive limit and chunk from uplink
message, err = stream.Recv()
if err != nil {
return ErrProtocol.Wrap(err)
if message.Limit == nil || message.Chunk == nil {
return ErrProtocol.New("expected order limit and chunk as the first message")
limit, chunk := message.Limit, message.Chunk
endpoint.log.Info("download started", zap.Stringer("Piece ID", limit.PieceId), zap.Stringer("SatelliteID", limit.SatelliteId), zap.Stringer("Action", limit.Action))
if limit.Action != pb.PieceAction_GET && limit.Action != pb.PieceAction_GET_REPAIR && limit.Action != pb.PieceAction_GET_AUDIT {
return ErrProtocol.New("expected get or get repair or audit action got %v", limit.Action) // TODO: report grpc status unauthorized or bad request
if chunk.ChunkSize > limit.Limit {
return ErrProtocol.New("requested more that order limit allows, limit=%v requested=%v", limit.Limit, chunk.ChunkSize)
if err := endpoint.verifyOrderLimit(ctx, limit); err != nil {
return Error.Wrap(err) // TODO: report grpc status unauthorized or bad request
var pieceReader *pieces.Reader
defer func() {
endTime := time.Now().UTC()
dt := endTime.Sub(startTime)
downloadSize := int64(0)
if pieceReader != nil {
downloadSize = pieceReader.Size()
downloadRate := float64(0)
if dt.Seconds() > 0 {
downloadRate = float64(downloadSize) / dt.Seconds()
downloadDuration := dt.Nanoseconds()
if err != nil {
endpoint.log.Info("download failed", zap.Stringer("Piece ID", limit.PieceId), zap.Stringer("SatelliteID", limit.SatelliteId), zap.Stringer("Action", limit.Action), zap.Error(err))
} else {
endpoint.log.Info("downloaded", zap.Stringer("Piece ID", limit.PieceId), zap.Stringer("SatelliteID", limit.SatelliteId), zap.Stringer("Action", limit.Action))
pieceReader, err =, limit.SatelliteId, limit.PieceId)
if err != nil {
if os.IsNotExist(err) {
return status.Error(codes.NotFound, err.Error())
return status.Error(codes.Internal, err.Error())
defer func() {
err := pieceReader.Close() // similarly how transcation Rollback works
if err != nil {
// no reason to report this error to the uplink
endpoint.log.Error("failed to close piece reader", zap.Error(err))
// TODO: verify chunk.Size behavior logic with regards to reading all
if chunk.Offset+chunk.ChunkSize > pieceReader.Size() {
return Error.New("requested more data than available, requesting=%v available=%v", chunk.Offset+chunk.ChunkSize, pieceReader.Size())
availableBandwidth, err := endpoint.monitor.AvailableBandwidth(ctx)
if err != nil {
return ErrInternal.Wrap(err)
throttle := sync2.NewThrottle()
// TODO: see whether this can be implemented without a goroutine
group, ctx := errgroup.WithContext(ctx)
group.Go(func() (err error) {
var maximumChunkSize = 1 * memory.MiB.Int64()
currentOffset := chunk.Offset
unsentAmount := chunk.ChunkSize
for unsentAmount > 0 {
tryToSend := min(unsentAmount, maximumChunkSize)
// TODO: add timeout here
chunkSize, err := throttle.ConsumeOrWait(tryToSend)
if err != nil {
// this can happen only because uplink decided to close the connection
return nil
chunkData := make([]byte, chunkSize)
_, err = pieceReader.Seek(currentOffset, io.SeekStart)
if err != nil {
return ErrInternal.Wrap(err)
// ReadFull is required to ensure we are sending the right amount of data.
_, err = io.ReadFull(pieceReader, chunkData)
if err != nil {
return ErrInternal.Wrap(err)
err = stream.Send(&pb.PieceDownloadResponse{
Chunk: &pb.PieceDownloadResponse_Chunk{
Offset: currentOffset,
Data: chunkData,
if err != nil {
// err is io.EOF when uplink asked for a piece, but decided not to retrieve it,
// no need to propagate it
return ErrProtocol.Wrap(ignoreEOF(err))
currentOffset += chunkSize
unsentAmount -= chunkSize
return nil
recvErr := func() (err error) {
largestOrder := pb.Order{}
defer endpoint.saveOrder(ctx, limit, &largestOrder)
// ensure that we always terminate sending goroutine
defer throttle.Fail(io.EOF)
for {
// TODO: check errors
// TODO: add timeout here
message, err = stream.Recv()
if err != nil {
// err is io.EOF when uplink closed the connection, no need to return error
return ErrProtocol.Wrap(ignoreEOF(err))
if message == nil || message.Order == nil {
return ErrProtocol.New("expected order as the message")
if err := endpoint.VerifyOrder(ctx, limit, message.Order, largestOrder.Amount); err != nil {
return err
chunkSize := message.Order.Amount - largestOrder.Amount
availableBandwidth -= chunkSize
if availableBandwidth < 0 {
return ErrProtocol.New("out of bandwidth")
if err := throttle.Produce(chunkSize); err != nil {
// shouldn't happen since only receiving side is calling Fail
return ErrInternal.Wrap(err)
largestOrder = *message.Order
// ensure we wait for sender to complete
sendErr := group.Wait()
return Error.Wrap(errs.Combine(sendErr, recvErr))
// saveOrder saves the order with all necessary information. It assumes it has been already verified.
func (endpoint *Endpoint) saveOrder(ctx context.Context, limit *pb.OrderLimit, order *pb.Order) {
var err error
defer mon.Task()(&ctx)(&err)
// TODO: do this in a goroutine
if order == nil || order.Amount <= 0 {
err = endpoint.orders.Enqueue(ctx, &orders.Info{
Limit: limit,
Order: order,
if err != nil {
endpoint.log.Error("failed to add order", zap.Error(err))
} else {
err = endpoint.usage.Add(ctx, limit.SatelliteId, limit.Action, order.Amount, time.Now())
if err != nil {
endpoint.log.Error("failed to add bandwidth usage", zap.Error(err))
// ------------------------------------------------------------------------------------------------
// On the correctness of using access.ModTime() in place of the more precise access.CreationTime()
// in Retain():
// ------------------------------------------------------------------------------------------------
// Background: for pieces not stored with storage.FormatV0, the access.CreationTime() value can
// only be retrieved by opening the piece file, and reading and unmarshaling the piece header.
// This is far slower than access.ModTime(), which gets the file modification time from the file
// system and only needs to do a stat(2) on the piece file. If we can make Retain() work with
// ModTime, we should.
// Possibility of mismatch: We do not force or require piece file modification times to be equal to
// or close to the CreationTime specified by the uplink, but we do expect that piece files will be
// written to the filesystem _after_ the CreationTime. We make the assumption already that storage
// nodes and satellites and uplinks have system clocks that are very roughly in sync (that is, they
// are out of sync with each other by less than an hour of real time, or whatever is configured as
// RetainTimeBuffer). So if an uplink is not lying about CreationTime and it uploads a piece that
// makes it to a storagenode's disk as quickly as possible, even in the worst-synchronized-clocks
// case we can assume that `ModTime > (CreationTime - RetainTimeBuffer)`. We also allow for storage
// node operators doing file system manipulations after a piece has been written. If piece files
// are copied between volumes and their attributes are not preserved, it will be possible for their
// modification times to be changed to something later in time. This still preserves the inequality
// relationship mentioned above, `ModTime > (CreationTime - RetainTimeBuffer)`. We only stipulate
// that storage node operators must not artificially change blob file modification times to be in
// the past.
// If there is a mismatch: in most cases, a mismatch between ModTime and CreationTime has no
// effect. In certain remaining cases, the only effect is that a piece file which _should_ be
// garbage collected survives until the next round of garbage collection. The only really
// problematic case is when there is a relatively new piece file which was created _after_ this
// node's Retain bloom filter started being built on the satellite, and is recorded in this
// storage node's blob store before the Retain operation has completed. Then, it might be possible
// for that new piece to be garbage collected incorrectly, because it does not show up in the
// bloom filter and the node incorrectly thinks that it was created before the bloom filter.
// But if the uplink is not lying about CreationTime and its clock drift versus the storage node
// is less than `RetainTimeBuffer`, and the ModTime on a blob file is correctly set from the
// storage node system time, then it is still true that `ModTime > (CreationTime -
// RetainTimeBuffer)`.
// The rule that storage node operators need to be aware of is only this: do not artificially set
// mtimes on blob files to be in the past. Let the filesystem manage mtimes. If blob files need to
// be moved or copied between locations, and this updates the mtime, that is ok. A secondary effect
// of this rule is that if the storage node's system clock needs to be changed forward by a
// nontrivial amount, mtimes on existing blobs should also be adjusted (by the same interval,
// ideally, but just running "touch" on all blobs is sufficient to avoid incorrect deletion of
// data).
// Retain keeps only piece ids specified in the request
func (endpoint *Endpoint) Retain(ctx context.Context, retainReq *pb.RetainRequest) (res *pb.RetainResponse, err error) {
defer mon.Task()(&ctx)(&err)
// if retain status is disabled, quit immediately
if endpoint.config.RetainStatus == RetainDisabled {
return &pb.RetainResponse{}, nil
peer, err := identity.PeerIdentityFromContext(ctx)
if err != nil {
return nil, status.Error(codes.Unauthenticated, Error.Wrap(err).Error())
err =, peer.ID)
if err != nil {
return nil, status.Error(codes.PermissionDenied, Error.New("retain called with untrusted ID").Error())
filter, err := bloomfilter.NewFromBytes(retainReq.GetFilter())
if err != nil {
return nil, status.Error(codes.InvalidArgument, Error.Wrap(err).Error())
numDeleted := 0
// subtract some time to leave room for clock difference between the satellite and storage node
createdBefore := retainReq.GetCreationDate().Add(-endpoint.config.RetainTimeBuffer)
endpoint.log.Info("Prepared to run a Retain request.",
zap.Time("createdBefore", createdBefore),
zap.Int64("filterSize", filter.Size()),
zap.String("satellite", peer.ID.String()))
err =, peer.ID, func(access pieces.StoredPieceAccess) error {
// We call Gosched() when done because the GC process is expected to be long and we want to keep it at low priority,
// so other goroutines can continue serving requests.
defer runtime.Gosched()
// See the comment above the Retain() function for a discussion on the correctness
// of using ModTime in place of the more precise CreationTime.
mTime, err := access.ModTime(ctx)
if err != nil {
endpoint.log.Error("failed to determine mtime of blob", zap.Error(err))
// but continue iterating.
return nil
if !mTime.Before(createdBefore) {
return nil
pieceID := access.PieceID()
if !filter.Contains(pieceID) {
endpoint.log.Debug("About to delete piece id",
zap.String("satellite", peer.ID.String()),
zap.String("pieceID", pieceID.String()),
zap.String("retainStatus", endpoint.config.RetainStatus.String()))
// if retain status is enabled, delete pieceid
if endpoint.config.RetainStatus == RetainEnabled {
if err =, peer.ID, pieceID); err != nil {
endpoint.log.Error("failed to delete piece",
zap.String("satellite", peer.ID.String()),
zap.String("pieceID", pieceID.String()),
return nil
return nil
if err != nil {
return nil, status.Error(codes.Internal, Error.Wrap(err).Error())
endpoint.log.Sugar().Debugf("Deleted %d pieces during retain. RetainStatus: %s", numDeleted, endpoint.config.RetainStatus.String())
return &pb.RetainResponse{}, nil
// min finds the min of two values
func min(a, b int64) int64 {
if a < b {
return a
return b
// ignoreEOF ignores io.EOF error.
func ignoreEOF(err error) error {
if err == io.EOF {
return nil
return err