
494 lines
18 KiB
Raw Normal View History

// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package pieces
import (
const (
preallocSize = 4 * memory.MiB
var (
// Error is the default error class.
Error = errs.Class("pieces error")
mon = monkit.Package()
// Info contains all the information we need to know about a Piece to manage them.
type Info struct {
SatelliteID storj.NodeID
PieceID storj.PieceID
PieceSize int64
PieceCreation time.Time
PieceExpiration time.Time
OrderLimit *pb.OrderLimit
UplinkPieceHash *pb.PieceHash
2019-05-08 12:11:59 +01:00
// ExpiredInfo is a fully namespaced piece id
type ExpiredInfo struct {
SatelliteID storj.NodeID
PieceID storj.PieceID
// This can be removed when we no longer need to support the pieceinfo db. Its only purpose
// is to keep track of whether expired entries came from piece_expirations or pieceinfo.
InPieceInfo bool
2019-05-08 12:11:59 +01:00
// PieceExpirationDB stores information about pieces with expiration dates.
type PieceExpirationDB interface {
// GetExpired gets piece IDs that expire or have expired before the given time
GetExpired(ctx context.Context, expiresBefore time.Time, limit int64) ([]ExpiredInfo, error)
// SetExpiration sets an expiration time for the given piece ID on the given satellite
SetExpiration(ctx context.Context, satellite storj.NodeID, pieceID storj.PieceID, expiresAt time.Time) error
// DeleteExpiration removes an expiration record for the given piece ID on the given satellite
DeleteExpiration(ctx context.Context, satellite storj.NodeID, pieceID storj.PieceID) (found bool, err error)
// DeleteFailed marks an expiration record as having experienced a failure in deleting the
// piece from the disk
DeleteFailed(ctx context.Context, satelliteID storj.NodeID, pieceID storj.PieceID, failedAt time.Time) error
// V0PieceInfoDB stores meta information about pieces stored with storage format V0 (where
// metadata goes in the "pieceinfo" table in the storagenodedb). The actual pieces are stored
// behind something providing the storage.Blobs interface.
type V0PieceInfoDB interface {
// Get returns Info about a piece.
Get(ctx context.Context, satelliteID storj.NodeID, pieceID storj.PieceID) (*Info, error)
// Delete deletes Info about a piece.
Delete(ctx context.Context, satelliteID storj.NodeID, pieceID storj.PieceID) error
2019-05-08 12:11:59 +01:00
// DeleteFailed marks piece deletion from disk failed
DeleteFailed(ctx context.Context, satelliteID storj.NodeID, pieceID storj.PieceID, failedAt time.Time) error
// GetExpired gets piece IDs stored with storage format V0 that expire or have expired
// before the given time
2019-05-08 12:11:59 +01:00
GetExpired(ctx context.Context, expiredAt time.Time, limit int64) ([]ExpiredInfo, error)
// WalkSatelliteV0Pieces executes walkFunc for each locally stored piece, stored
// with storage format V0 in the namespace of the given satellite. If walkFunc returns a
// non-nil error, WalkSatelliteV0Pieces will stop iterating and return the error
// immediately. The ctx parameter is intended specifically to allow canceling iteration
// early.
WalkSatelliteV0Pieces(ctx context.Context, blobStore storage.Blobs, satellite storj.NodeID, walkFunc func(StoredPieceAccess) error) error
// V0PieceInfoDBForTest is like V0PieceInfoDB, but adds on the Add() method so
// that test environments with V0 piece data can be set up.
type V0PieceInfoDBForTest interface {
// Add inserts Info to the database. This is only a valid thing to do, now,
// during tests, to replicate the environment of a storage node not yet fully
// migrated to V1 storage.
Add(context.Context, *Info) error
// StoredPieceAccess allows inspection and manipulation of a piece during iteration with
// WalkSatellitePieces-type methods.
type StoredPieceAccess interface {
// PieceID gives the pieceID of the piece
PieceID() storj.PieceID
// Satellite gives the nodeID of the satellite which owns the piece
Satellite() (storj.NodeID, error)
// ContentSize gives the size of the piece content (not including the piece header, if
// applicable)
ContentSize(ctx context.Context) (int64, error)
// CreationTime returns the piece creation time as given in the original PieceHash (which is
// likely not the same as the file mtime). For non-FormatV0 pieces, this requires opening
// the file and unmarshaling the piece header. If exact precision is not required, ModTime()
// may be a better solution.
CreationTime(ctx context.Context) (time.Time, error)
// ModTime returns a less-precise piece creation time than CreationTime, but is generally
// much faster. For non-FormatV0 pieces, this gets the piece creation time from to the
// filesystem instead of the piece header.
ModTime(ctx context.Context) (time.Time, error)
// Store implements storing pieces onto a blob storage implementation.
type Store struct {
log *zap.Logger
blobs storage.Blobs
v0PieceInfo V0PieceInfoDB
expirationInfo PieceExpirationDB
// The value of reservedSpace is always added to the return value from the
// SpaceUsedForPieces() method.
// The reservedSpace field is part of an unfortunate hack that enables testing of low-space
// or no-space conditions. It is not (or should not be) used under regular operating
// conditions.
reservedSpace int64
// StoreForTest is a wrapper around Store to be used only in test scenarios. It enables writing
// pieces with older storage formats and allows use of the ReserveSpace() method.
type StoreForTest struct {
// NewStore creates a new piece store
func NewStore(log *zap.Logger, blobs storage.Blobs, v0PieceInfo V0PieceInfoDB, expirationInfo PieceExpirationDB) *Store {
return &Store{
log: log,
blobs: blobs,
v0PieceInfo: v0PieceInfo,
expirationInfo: expirationInfo,
// Writer returns a new piece writer.
func (store *Store) Writer(ctx context.Context, satellite storj.NodeID, pieceID storj.PieceID) (_ *Writer, err error) {
defer mon.Task()(&ctx)(&err)
blob, err := store.blobs.Create(ctx, storage.BlobRef{
Namespace: satellite.Bytes(),
Key: pieceID.Bytes(),
}, preallocSize.Int64())
if err != nil {
return nil, Error.Wrap(err)
writer, err := NewWriter(blob)
return writer, Error.Wrap(err)
// WriterForFormatVersion allows opening a piece writer with a specified storage format version.
// This is meant to be used externally only in test situations (thus the StoreForTest receiver
// type).
func (store StoreForTest) WriterForFormatVersion(ctx context.Context, satellite storj.NodeID, pieceID storj.PieceID, formatVersion storage.FormatVersion) (_ *Writer, err error) {
defer mon.Task()(&ctx)(&err)
blobRef := storage.BlobRef{
Namespace: satellite.Bytes(),
Key: pieceID.Bytes(),
var blob storage.BlobWriter
switch formatVersion {
case filestore.FormatV0:
fStore, ok := store.blobs.(*filestore.Store)
if !ok {
return nil, Error.New("can't make a WriterForFormatVersion with this blob store (%T)", store.blobs)
tStore := filestore.StoreForTest{Store: fStore}
blob, err = tStore.CreateV0(ctx, blobRef)
case filestore.FormatV1:
blob, err = store.blobs.Create(ctx, blobRef, preallocSize.Int64())
return nil, Error.New("please teach me how to make V%d pieces", formatVersion)
if err != nil {
return nil, Error.Wrap(err)
writer, err := NewWriter(blob)
return writer, Error.Wrap(err)
// Reader returns a new piece reader.
func (store *Store) Reader(ctx context.Context, satellite storj.NodeID, pieceID storj.PieceID) (_ *Reader, err error) {
defer mon.Task()(&ctx)(&err)
blob, err := store.blobs.Open(ctx, storage.BlobRef{
Namespace: satellite.Bytes(),
Key: pieceID.Bytes(),
if err != nil {
if os.IsNotExist(err) {
return nil, err
return nil, Error.Wrap(err)
reader, err := NewReader(blob)
return reader, Error.Wrap(err)
// ReaderWithStorageFormat returns a new piece reader for a located piece, which avoids the
// potential need to check multiple storage formats to find the right blob.
func (store *Store) ReaderWithStorageFormat(ctx context.Context, satellite storj.NodeID, pieceID storj.PieceID, formatVersion storage.FormatVersion) (_ *Reader, err error) {
defer mon.Task()(&ctx)(&err)
ref := storage.BlobRef{Namespace: satellite.Bytes(), Key: pieceID.Bytes()}
blob, err := store.blobs.OpenWithStorageFormat(ctx, ref, formatVersion)
if err != nil {
if os.IsNotExist(err) {
return nil, err
return nil, Error.Wrap(err)
reader, err := NewReader(blob)
return reader, Error.Wrap(err)
// Delete deletes the specified piece.
func (store *Store) Delete(ctx context.Context, satellite storj.NodeID, pieceID storj.PieceID) (err error) {
defer mon.Task()(&ctx)(&err)
err = store.blobs.Delete(ctx, storage.BlobRef{
Namespace: satellite.Bytes(),
Key: pieceID.Bytes(),
if err != nil {
return Error.Wrap(err)
// delete records in both the piece_expirations and pieceinfo DBs, wherever we find it.
// both of these calls should return no error if the requested record is not found.
if store.expirationInfo != nil {
_, err = store.expirationInfo.DeleteExpiration(ctx, satellite, pieceID)
if store.v0PieceInfo != nil {
err = errs.Combine(err, store.v0PieceInfo.Delete(ctx, satellite, pieceID))
return Error.Wrap(err)
// GetV0PieceInfoDB returns this piece-store's reference to the V0 piece info DB (or nil,
// if this piece-store does not have one). This is ONLY intended for use with testing
// functionality.
func (store *Store) GetV0PieceInfoDB() V0PieceInfoDB {
return store.v0PieceInfo
// WalkSatellitePieces executes walkFunc for each locally stored piece in the namespace of the
// given satellite. If walkFunc returns a non-nil error, WalkSatellitePieces will stop iterating
// and return the error immediately. The ctx parameter is intended specifically to allow canceling
// iteration early.
// Note that this method includes all locally stored pieces, both V0 and higher.
func (store *Store) WalkSatellitePieces(ctx context.Context, satellite storj.NodeID, walkFunc func(StoredPieceAccess) error) (err error) {
defer mon.Task()(&ctx)(&err)
// first iterate over all in V1 storage, then all in V0
err = store.blobs.WalkNamespace(ctx, satellite.Bytes(), func(blobInfo storage.BlobInfo) error {
if blobInfo.StorageFormatVersion() < filestore.FormatV1 {
// we'll address this piece while iterating over the V0 pieces below.
return nil
pieceAccess, err := newStoredPieceAccess(store, blobInfo)
if err != nil {
// this is not a real piece blob. the blob store can't distinguish between actual piece
// blobs and stray files whose names happen to decode as valid base32. skip this
// "blob".
return nil
return walkFunc(pieceAccess)
if err == nil && store.v0PieceInfo != nil {
err = store.v0PieceInfo.WalkSatelliteV0Pieces(ctx, store.blobs, satellite, walkFunc)
return err
// GetExpired gets piece IDs that are expired and were created before the given time
func (store *Store) GetExpired(ctx context.Context, expiredAt time.Time, limit int64) (_ []ExpiredInfo, err error) {
defer mon.Task()(&ctx)(&err)
expired, err := store.expirationInfo.GetExpired(ctx, expiredAt, limit)
if err != nil {
return nil, err
if int64(len(expired)) < limit && store.v0PieceInfo != nil {
v0Expired, err := store.v0PieceInfo.GetExpired(ctx, expiredAt, limit-int64(len(expired)))
if err != nil {
return nil, err
expired = append(expired, v0Expired...)
return expired, nil
// SetExpiration records an expiration time for the specified piece ID owned by the specified satellite
func (store *Store) SetExpiration(ctx context.Context, satellite storj.NodeID, pieceID storj.PieceID, expiresAt time.Time) (err error) {
return store.expirationInfo.SetExpiration(ctx, satellite, pieceID, expiresAt)
// DeleteFailed marks piece as a failed deletion.
func (store *Store) DeleteFailed(ctx context.Context, expired ExpiredInfo, when time.Time) (err error) {
defer mon.Task()(&ctx)(&err)
if expired.InPieceInfo {
return store.v0PieceInfo.DeleteFailed(ctx, expired.SatelliteID, expired.PieceID, when)
return store.expirationInfo.DeleteFailed(ctx, expired.SatelliteID, expired.PieceID, when)
// SpaceUsedForPieces returns *an approximation of* the disk space used by all local pieces (both
// V0 and later). This is an approximation because changes may be being applied to the filestore as
// this information is collected, and because it is possible that various errors in directory
// traversal could cause this count to be undersized.
// Important note: this metric does not include space used by piece headers, whereas
// storj/filestore/store.(*Store).SpaceUsed() *does* include all space used by the blobs.
// The value of reservedSpace for this Store is added to the result, but this should only affect
// tests (reservedSpace should always be 0 in real usage).
func (store *Store) SpaceUsedForPieces(ctx context.Context) (int64, error) {
satellites, err := store.getAllStoringSatellites(ctx)
if err != nil {
return 0, err
var total int64
for _, satellite := range satellites {
spaceUsed, err := store.SpaceUsedBySatellite(ctx, satellite)
if err != nil {
return 0, err
total += spaceUsed
return total + store.reservedSpace, nil
func (store *Store) getAllStoringSatellites(ctx context.Context) ([]storj.NodeID, error) {
namespaces, err := store.blobs.ListNamespaces(ctx)
if err != nil {
return nil, err
satellites := make([]storj.NodeID, len(namespaces))
for i, namespace := range namespaces {
satellites[i], err = storj.NodeIDFromBytes(namespace)
if err != nil {
return nil, err
return satellites, nil
// SpaceUsedBySatellite calculates *an approximation of* how much disk space is used for local
// piece storage in the given satellite's namespace. This is an approximation because changes may
// be being applied to the filestore as this information is collected, and because it is possible
// that various errors in directory traversal could cause this count to be undersized.
// Important note: this metric does not include space used by piece headers, whereas
// storj/filestore/store.(*Store).SpaceUsedInNamespace() *does* include all space used by the
// blobs.
func (store *Store) SpaceUsedBySatellite(ctx context.Context, satelliteID storj.NodeID) (int64, error) {
var totalUsed int64
err := store.WalkSatellitePieces(ctx, satelliteID, func(access StoredPieceAccess) error {
contentSize, statErr := access.ContentSize(ctx)
if statErr != nil {
store.log.Error("failed to stat", zap.Error(statErr), zap.String("pieceID", access.PieceID().String()), zap.String("satellite", satelliteID.String()))
// keep iterating; we want a best effort total here.
return nil
totalUsed += contentSize
return nil
if err != nil {
return 0, err
return totalUsed, nil
// ReserveSpace marks some amount of free space as used, even if it's not, so that future calls
// to SpaceUsedForPieces() are raised by this amount. Calls to ReserveSpace invalidate earlier
// calls, so ReserveSpace(0) undoes all prior space reservation. This should only be used in
// test scenarios.
func (store StoreForTest) ReserveSpace(amount int64) {
store.reservedSpace = amount
// StorageStatus contains information about the disk store is using.
type StorageStatus struct {
DiskUsed int64
DiskFree int64
// StorageStatus returns information about the disk.
func (store *Store) StorageStatus(ctx context.Context) (_ StorageStatus, err error) {
defer mon.Task()(&ctx)(&err)
diskFree, err := store.blobs.FreeSpace()
if err != nil {
return StorageStatus{}, err
return StorageStatus{
DiskUsed: -1, // TODO set value
DiskFree: diskFree,
}, nil
type storedPieceAccess struct {
store *Store
pieceID storj.PieceID
func newStoredPieceAccess(store *Store, blobInfo storage.BlobInfo) (storedPieceAccess, error) {
pieceID, err := storj.PieceIDFromBytes(blobInfo.BlobRef().Key)
if err != nil {
return storedPieceAccess{}, err
return storedPieceAccess{
BlobInfo: blobInfo,
store: store,
pieceID: pieceID,
}, nil
// PieceID returns the piece ID of the piece
func (access storedPieceAccess) PieceID() storj.PieceID {
return access.pieceID
// Satellite returns the satellite ID that owns the piece
func (access storedPieceAccess) Satellite() (storj.NodeID, error) {
return storj.NodeIDFromBytes(access.BlobRef().Namespace)
// ContentSize gives the size of the piece content (not including the piece header, if applicable)
func (access storedPieceAccess) ContentSize(ctx context.Context) (size int64, err error) {
defer mon.Task()(&ctx)(&err)
stat, err := access.Stat(ctx)
if err != nil {
return 0, err
size = stat.Size()
if access.StorageFormatVersion() >= filestore.FormatV1 {
size -= V1PieceHeaderReservedArea
return size, nil
// CreationTime returns the piece creation time as given in the original PieceHash (which is likely
// not the same as the file mtime). This requires opening the file and unmarshaling the piece
// header. If exact precision is not required, ModTime() may be a better solution.
func (access storedPieceAccess) CreationTime(ctx context.Context) (cTime time.Time, err error) {
defer mon.Task()(&ctx)(&err)
satellite, err := access.Satellite()
if err != nil {
return time.Time{}, err
reader, err :=, satellite, access.PieceID(), access.StorageFormatVersion())
if err != nil {
return time.Time{}, err
header, err := reader.GetPieceHeader()
if err != nil {
return time.Time{}, err
return header.CreationTime, nil
// ModTime returns a less-precise piece creation time than CreationTime, but is generally
// much faster. This gets the piece creation time from to the filesystem instead of the
// piece header.
func (access storedPieceAccess) ModTime(ctx context.Context) (mTime time.Time, err error) {
defer mon.Task()(&ctx)(&err)
stat, err := access.Stat(ctx)
if err != nil {
return time.Time{}, err
return stat.ModTime(), nil