2019-03-18 10:55:06 +00:00
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package pieces
import (
"context"
2022-11-14 11:14:55 +00:00
"database/sql"
2019-11-04 16:59:45 +00:00
"io"
2019-06-03 10:17:09 +01:00
"os"
2019-03-18 10:55:06 +00:00
"time"
2019-11-08 20:40:39 +00:00
"github.com/spacemonkeygo/monkit/v3"
2019-03-18 10:55:06 +00:00
"github.com/zeebo/errs"
"go.uber.org/zap"
2023-04-13 21:10:53 +01:00
"storj.io/common/bloomfilter"
2019-12-27 11:48:47 +00:00
"storj.io/common/memory"
"storj.io/common/pb"
"storj.io/common/storj"
2023-04-05 18:03:06 +01:00
"storj.io/storj/storagenode/blobstore"
"storj.io/storj/storagenode/blobstore/filestore"
2023-02-28 13:54:01 +00:00
"storj.io/storj/storagenode/pieces/lazyfilewalker"
2019-03-18 10:55:06 +00:00
)
2019-06-04 13:31:39 +01:00
var (
// Error is the default error class.
Error = errs . Class ( "pieces error" )
mon = monkit . Package ( )
)
2019-03-18 10:55:06 +00:00
// Info contains all the information we need to know about a Piece to manage them.
type Info struct {
SatelliteID storj . NodeID
PieceID storj . PieceID
PieceSize int64
2019-07-09 22:54:00 +01:00
PieceCreation time . Time
PieceExpiration time . Time
2019-03-18 10:55:06 +00:00
2019-07-11 21:51:40 +01:00
OrderLimit * pb . OrderLimit
2019-03-18 10:55:06 +00:00
UplinkPieceHash * pb . PieceHash
}
2020-07-16 15:18:02 +01:00
// ExpiredInfo is a fully namespaced piece id.
2019-05-08 12:11:59 +01:00
type ExpiredInfo struct {
SatelliteID storj . NodeID
PieceID storj . PieceID
2019-08-08 02:47:30 +01:00
// This can be removed when we no longer need to support the pieceinfo db. Its only purpose
// is to keep track of whether expired entries came from piece_expirations or pieceinfo.
InPieceInfo bool
2019-05-08 12:11:59 +01:00
}
2019-08-08 02:47:30 +01:00
// PieceExpirationDB stores information about pieces with expiration dates.
2019-09-10 14:24:16 +01:00
//
// architecture: Database
2019-08-08 02:47:30 +01:00
type PieceExpirationDB interface {
// GetExpired gets piece IDs that expire or have expired before the given time
GetExpired ( ctx context . Context , expiresBefore time . Time , limit int64 ) ( [ ] ExpiredInfo , error )
// SetExpiration sets an expiration time for the given piece ID on the given satellite
SetExpiration ( ctx context . Context , satellite storj . NodeID , pieceID storj . PieceID , expiresAt time . Time ) error
// DeleteExpiration removes an expiration record for the given piece ID on the given satellite
DeleteExpiration ( ctx context . Context , satellite storj . NodeID , pieceID storj . PieceID ) ( found bool , err error )
// DeleteFailed marks an expiration record as having experienced a failure in deleting the
// piece from the disk
DeleteFailed ( ctx context . Context , satelliteID storj . NodeID , pieceID storj . PieceID , failedAt time . Time ) error
2019-11-20 16:28:49 +00:00
// Trash marks a piece as in the trash
Trash ( ctx context . Context , satelliteID storj . NodeID , pieceID storj . PieceID ) error
// RestoreTrash marks all piece as not being in trash
RestoreTrash ( ctx context . Context , satelliteID storj . NodeID ) error
2019-08-08 02:47:30 +01:00
}
// V0PieceInfoDB stores meta information about pieces stored with storage format V0 (where
// metadata goes in the "pieceinfo" table in the storagenodedb). The actual pieces are stored
2023-04-05 18:03:06 +01:00
// behind something providing the blobstore.Blobs interface.
2019-09-10 14:24:16 +01:00
//
// architecture: Database
2019-08-08 02:47:30 +01:00
type V0PieceInfoDB interface {
2019-03-18 10:55:06 +00:00
// Get returns Info about a piece.
Get ( ctx context . Context , satelliteID storj . NodeID , pieceID storj . PieceID ) ( * Info , error )
// Delete deletes Info about a piece.
Delete ( ctx context . Context , satelliteID storj . NodeID , pieceID storj . PieceID ) error
2019-05-08 12:11:59 +01:00
// DeleteFailed marks piece deletion from disk failed
DeleteFailed ( ctx context . Context , satelliteID storj . NodeID , pieceID storj . PieceID , failedAt time . Time ) error
2019-08-08 02:47:30 +01:00
// GetExpired gets piece IDs stored with storage format V0 that expire or have expired
// before the given time
2019-05-08 12:11:59 +01:00
GetExpired ( ctx context . Context , expiredAt time . Time , limit int64 ) ( [ ] ExpiredInfo , error )
2019-08-08 02:47:30 +01:00
// WalkSatelliteV0Pieces executes walkFunc for each locally stored piece, stored
// with storage format V0 in the namespace of the given satellite. If walkFunc returns a
// non-nil error, WalkSatelliteV0Pieces will stop iterating and return the error
// immediately. The ctx parameter is intended specifically to allow canceling iteration
// early.
2023-04-05 18:03:06 +01:00
WalkSatelliteV0Pieces ( ctx context . Context , blobStore blobstore . Blobs , satellite storj . NodeID , walkFunc func ( StoredPieceAccess ) error ) error
2019-08-08 02:47:30 +01:00
}
// V0PieceInfoDBForTest is like V0PieceInfoDB, but adds on the Add() method so
// that test environments with V0 piece data can be set up.
type V0PieceInfoDBForTest interface {
V0PieceInfoDB
// Add inserts Info to the database. This is only a valid thing to do, now,
// during tests, to replicate the environment of a storage node not yet fully
// migrated to V1 storage.
Add ( context . Context , * Info ) error
}
2020-12-05 16:01:42 +00:00
// PieceSpaceUsedDB stores the most recent totals from the space used cache.
2019-09-10 14:24:16 +01:00
//
// architecture: Database
2019-08-12 22:43:05 +01:00
type PieceSpaceUsedDB interface {
2019-12-21 13:11:24 +00:00
// Init creates the one total and trash record if it doesn't already exist
2019-08-12 22:43:05 +01:00
Init ( ctx context . Context ) error
2020-01-07 23:34:51 +00:00
// GetPieceTotals returns the space used (total and contentSize) by all pieces stored
GetPieceTotals ( ctx context . Context ) ( piecesTotal int64 , piecesContentSize int64 , err error )
// UpdatePieceTotals updates the record for aggregate spaced used for pieces (total and contentSize) with new values
UpdatePieceTotals ( ctx context . Context , piecesTotal , piecesContentSize int64 ) error
// GetTotalsForAllSatellites returns how much total space used by pieces stored for each satelliteID
GetPieceTotalsForAllSatellites ( ctx context . Context ) ( map [ storj . NodeID ] SatelliteUsage , error )
2019-12-21 13:11:24 +00:00
// UpdatePieceTotalsForAllSatellites updates each record for total spaced used with a new value for each satelliteID
2020-01-07 23:34:51 +00:00
UpdatePieceTotalsForAllSatellites ( ctx context . Context , newTotalsBySatellites map [ storj . NodeID ] SatelliteUsage ) error
2019-12-21 13:11:24 +00:00
// GetTrashTotal returns the total space used by trash
GetTrashTotal ( ctx context . Context ) ( int64 , error )
// UpdateTrashTotal updates the record for total spaced used for trash with a new value
UpdateTrashTotal ( ctx context . Context , newTotal int64 ) error
2019-08-12 22:43:05 +01:00
}
2019-08-08 02:47:30 +01:00
// StoredPieceAccess allows inspection and manipulation of a piece during iteration with
// WalkSatellitePieces-type methods.
type StoredPieceAccess interface {
2023-04-05 18:03:06 +01:00
blobstore . BlobInfo
2019-08-08 02:47:30 +01:00
// PieceID gives the pieceID of the piece
PieceID ( ) storj . PieceID
// Satellite gives the nodeID of the satellite which owns the piece
Satellite ( ) ( storj . NodeID , error )
2019-12-21 13:11:24 +00:00
// Size gives the size of the piece on disk, and the size of the piece
// content (not including the piece header, if applicable)
Size ( ctx context . Context ) ( int64 , int64 , error )
2019-08-08 02:47:30 +01:00
// CreationTime returns the piece creation time as given in the original PieceHash (which is
// likely not the same as the file mtime). For non-FormatV0 pieces, this requires opening
// the file and unmarshaling the piece header. If exact precision is not required, ModTime()
// may be a better solution.
CreationTime ( ctx context . Context ) ( time . Time , error )
// ModTime returns a less-precise piece creation time than CreationTime, but is generally
// much faster. For non-FormatV0 pieces, this gets the piece creation time from to the
// filesystem instead of the piece header.
ModTime ( ctx context . Context ) ( time . Time , error )
2019-03-18 10:55:06 +00:00
}
2020-07-16 15:18:02 +01:00
// SatelliteUsage contains information of how much space is used by a satellite.
2020-01-07 23:34:51 +00:00
type SatelliteUsage struct {
Total int64 // the total space used (including headers)
ContentSize int64 // only content size used (excluding things like headers)
}
2020-04-14 13:39:42 +01:00
// Config is configuration for Store.
type Config struct {
WritePreallocSize memory . Size ` help:"file preallocated for uploading" default:"4MiB" `
2021-04-01 15:10:08 +01:00
DeleteToTrash bool ` help:"move pieces to trash upon deletion. Warning: if set to false, you risk disqualification for failed audits if a satellite database is restored from backup." default:"true" `
2023-02-28 13:54:01 +00:00
// TODO(clement): default is set to false for now.
// I will test and monitor on my node for some time before changing the default to true.
EnableLazyFilewalker bool ` help:"run garbage collection and used-space calculation filewalkers as a separate subprocess with lower IO priority" releaseDefault:"false" devDefault:"true" testDefault:"false" `
2020-04-14 13:39:42 +01:00
}
// DefaultConfig is the default value for the Config.
var DefaultConfig = Config {
WritePreallocSize : 4 * memory . MiB ,
}
2019-03-18 10:55:06 +00:00
// Store implements storing pieces onto a blob storage implementation.
2019-09-10 14:24:16 +01:00
//
// architecture: Database
2019-03-18 10:55:06 +00:00
type Store struct {
2020-04-14 13:39:42 +01:00
log * zap . Logger
config Config
2023-04-05 18:03:06 +01:00
blobs blobstore . Blobs
2019-08-08 02:47:30 +01:00
expirationInfo PieceExpirationDB
2019-08-12 22:43:05 +01:00
spaceUsedDB PieceSpaceUsedDB
2023-03-01 03:59:53 +00:00
v0PieceInfo V0PieceInfoDB
2023-02-28 13:54:01 +00:00
Filewalker * FileWalker
lazyFilewalker * lazyfilewalker . Supervisor
2019-08-08 02:47:30 +01:00
}
// StoreForTest is a wrapper around Store to be used only in test scenarios. It enables writing
2020-07-16 15:18:02 +01:00
// pieces with older storage formats.
2019-08-08 02:47:30 +01:00
type StoreForTest struct {
* Store
2019-03-18 10:55:06 +00:00
}
2020-07-16 15:18:02 +01:00
// NewStore creates a new piece store.
2023-02-28 13:54:01 +00:00
func NewStore ( log * zap . Logger , fw * FileWalker , lazyFilewalker * lazyfilewalker . Supervisor , blobs blobstore . Blobs , v0PieceInfo V0PieceInfoDB , expirationInfo PieceExpirationDB , spaceUsedDB PieceSpaceUsedDB , config Config ) * Store {
2019-03-18 10:55:06 +00:00
return & Store {
2019-08-08 02:47:30 +01:00
log : log ,
2020-04-14 13:39:42 +01:00
config : config ,
2019-08-08 02:47:30 +01:00
blobs : blobs ,
expirationInfo : expirationInfo ,
2023-03-01 03:59:53 +00:00
spaceUsedDB : spaceUsedDB ,
v0PieceInfo : v0PieceInfo ,
Filewalker : fw ,
2023-02-28 13:54:01 +00:00
lazyFilewalker : lazyFilewalker ,
2019-03-18 10:55:06 +00:00
}
}
2020-07-10 20:36:39 +01:00
// CreateVerificationFile creates a file to be used for storage directory verification.
2021-09-10 14:05:29 +01:00
func ( store * Store ) CreateVerificationFile ( ctx context . Context , id storj . NodeID ) error {
return store . blobs . CreateVerificationFile ( ctx , id )
2020-07-10 20:36:39 +01:00
}
// VerifyStorageDir verifies that the storage directory is correct by checking for the existence and validity
// of the verification file.
2021-09-10 14:05:29 +01:00
func ( store * Store ) VerifyStorageDir ( ctx context . Context , id storj . NodeID ) error {
return store . blobs . VerifyStorageDir ( ctx , id )
2020-07-10 20:36:39 +01:00
}
2023-03-14 11:09:03 +00:00
// VerifyStorageDirWithTimeout verifies that the storage directory is correct by checking for the existence and validity
// of the verification file. It uses the provided timeout for the operation.
func ( store * Store ) VerifyStorageDirWithTimeout ( ctx context . Context , id storj . NodeID , timeout time . Duration ) error {
ctx , cancel := context . WithTimeout ( ctx , timeout )
defer cancel ( )
ch := make ( chan error , 1 )
go func ( ) {
ch <- store . VerifyStorageDir ( ctx , id )
} ( )
select {
case err := <- ch :
return err
case <- ctx . Done ( ) :
return ctx . Err ( )
}
}
2019-03-18 10:55:06 +00:00
// Writer returns a new piece writer.
2022-08-01 10:30:33 +01:00
func ( store * Store ) Writer ( ctx context . Context , satellite storj . NodeID , pieceID storj . PieceID , hashAlgorithm pb . PieceHashAlgorithm ) ( _ * Writer , err error ) {
2019-06-04 13:31:39 +01:00
defer mon . Task ( ) ( & ctx ) ( & err )
2023-04-05 18:03:06 +01:00
blobWriter , err := store . blobs . Create ( ctx , blobstore . BlobRef {
2019-03-18 10:55:06 +00:00
Namespace : satellite . Bytes ( ) ,
Key : pieceID . Bytes ( ) ,
2020-04-14 13:39:42 +01:00
} , store . config . WritePreallocSize . Int64 ( ) )
2019-03-18 10:55:06 +00:00
if err != nil {
return nil , Error . Wrap ( err )
}
2022-08-01 10:30:33 +01:00
writer , err := NewWriter ( store . log . Named ( "blob-writer" ) , blobWriter , store . blobs , satellite , hashAlgorithm )
2019-03-18 10:55:06 +00:00
return writer , Error . Wrap ( err )
}
2019-08-08 02:47:30 +01:00
// WriterForFormatVersion allows opening a piece writer with a specified storage format version.
// This is meant to be used externally only in test situations (thus the StoreForTest receiver
// type).
2020-12-05 16:01:42 +00:00
func ( store StoreForTest ) WriterForFormatVersion ( ctx context . Context , satellite storj . NodeID ,
2023-04-05 18:03:06 +01:00
pieceID storj . PieceID , formatVersion blobstore . FormatVersion , hashAlgorithm pb . PieceHashAlgorithm ) ( _ * Writer , err error ) {
2020-12-05 16:01:42 +00:00
2019-08-08 02:47:30 +01:00
defer mon . Task ( ) ( & ctx ) ( & err )
2023-04-05 18:03:06 +01:00
blobRef := blobstore . BlobRef {
2019-08-08 02:47:30 +01:00
Namespace : satellite . Bytes ( ) ,
Key : pieceID . Bytes ( ) ,
}
2023-04-05 18:03:06 +01:00
var blobWriter blobstore . BlobWriter
2019-08-08 02:47:30 +01:00
switch formatVersion {
case filestore . FormatV0 :
2019-08-12 22:43:05 +01:00
fStore , ok := store . blobs . ( interface {
2023-04-05 18:03:06 +01:00
TestCreateV0 ( ctx context . Context , ref blobstore . BlobRef ) ( _ blobstore . BlobWriter , err error )
2019-08-12 22:43:05 +01:00
} )
2019-08-08 02:47:30 +01:00
if ! ok {
return nil , Error . New ( "can't make a WriterForFormatVersion with this blob store (%T)" , store . blobs )
}
2019-08-12 22:43:05 +01:00
blobWriter , err = fStore . TestCreateV0 ( ctx , blobRef )
2019-08-08 02:47:30 +01:00
case filestore . FormatV1 :
2020-04-14 13:39:42 +01:00
blobWriter , err = store . blobs . Create ( ctx , blobRef , store . config . WritePreallocSize . Int64 ( ) )
2019-08-08 02:47:30 +01:00
default :
return nil , Error . New ( "please teach me how to make V%d pieces" , formatVersion )
}
if err != nil {
return nil , Error . Wrap ( err )
}
2022-08-01 10:30:33 +01:00
writer , err := NewWriter ( store . log . Named ( "blob-writer" ) , blobWriter , store . blobs , satellite , hashAlgorithm )
2019-08-08 02:47:30 +01:00
return writer , Error . Wrap ( err )
}
2023-03-01 03:59:53 +00:00
// ReaderWithStorageFormat returns a new piece reader for a located piece, which avoids the
// potential need to check multiple storage formats to find the right blob.
func ( store * StoreForTest ) ReaderWithStorageFormat ( ctx context . Context , satellite storj . NodeID ,
2023-04-05 18:03:06 +01:00
pieceID storj . PieceID , formatVersion blobstore . FormatVersion ) ( _ * Reader , err error ) {
2023-03-01 03:59:53 +00:00
2019-06-04 13:31:39 +01:00
defer mon . Task ( ) ( & ctx ) ( & err )
2023-04-05 18:03:06 +01:00
ref := blobstore . BlobRef { Namespace : satellite . Bytes ( ) , Key : pieceID . Bytes ( ) }
2023-03-01 03:59:53 +00:00
blob , err := store . blobs . OpenWithStorageFormat ( ctx , ref , formatVersion )
2019-03-18 10:55:06 +00:00
if err != nil {
2019-06-03 10:17:09 +01:00
if os . IsNotExist ( err ) {
return nil , err
}
2019-03-18 10:55:06 +00:00
return nil , Error . Wrap ( err )
}
2019-07-25 09:22:15 +01:00
reader , err := NewReader ( blob )
2019-03-18 10:55:06 +00:00
return reader , Error . Wrap ( err )
}
2023-03-01 03:59:53 +00:00
// Reader returns a new piece reader.
func ( store * Store ) Reader ( ctx context . Context , satellite storj . NodeID , pieceID storj . PieceID ) ( _ * Reader , err error ) {
2019-08-08 02:47:30 +01:00
defer mon . Task ( ) ( & ctx ) ( & err )
2023-04-05 18:03:06 +01:00
blob , err := store . blobs . Open ( ctx , blobstore . BlobRef {
2023-03-01 03:59:53 +00:00
Namespace : satellite . Bytes ( ) ,
Key : pieceID . Bytes ( ) ,
} )
2019-08-08 02:47:30 +01:00
if err != nil {
if os . IsNotExist ( err ) {
return nil , err
}
return nil , Error . Wrap ( err )
}
reader , err := NewReader ( blob )
return reader , Error . Wrap ( err )
}
2019-03-18 10:55:06 +00:00
// Delete deletes the specified piece.
2019-06-04 13:31:39 +01:00
func ( store * Store ) Delete ( ctx context . Context , satellite storj . NodeID , pieceID storj . PieceID ) ( err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
2023-04-05 18:03:06 +01:00
err = store . blobs . Delete ( ctx , blobstore . BlobRef {
2019-03-18 10:55:06 +00:00
Namespace : satellite . Bytes ( ) ,
Key : pieceID . Bytes ( ) ,
} )
2019-08-08 02:47:30 +01:00
if err != nil {
return Error . Wrap ( err )
}
2019-08-12 22:43:05 +01:00
2022-09-08 12:08:49 +01:00
// delete expired piece records
err = store . DeleteExpired ( ctx , satellite , pieceID )
if err == nil {
store . log . Debug ( "deleted piece" , zap . String ( "Satellite ID" , satellite . String ( ) ) ,
zap . String ( "Piece ID" , pieceID . String ( ) ) )
}
return Error . Wrap ( err )
}
// DeleteExpired deletes records in both the piece_expirations and pieceinfo DBs, wherever we find it.
// Should return no error if the requested record is not found in any of the DBs.
func ( store * Store ) DeleteExpired ( ctx context . Context , satellite storj . NodeID , pieceID storj . PieceID ) ( err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
2019-08-08 02:47:30 +01:00
if store . expirationInfo != nil {
_ , err = store . expirationInfo . DeleteExpiration ( ctx , satellite , pieceID )
}
if store . v0PieceInfo != nil {
err = errs . Combine ( err , store . v0PieceInfo . Delete ( ctx , satellite , pieceID ) )
}
2019-11-04 16:59:45 +00:00
return Error . Wrap ( err )
}
2020-07-08 11:50:40 +01:00
// DeleteSatelliteBlobs deletes blobs folder of specific satellite after successful GE.
func ( store * Store ) DeleteSatelliteBlobs ( ctx context . Context , satellite storj . NodeID ) ( err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
err = store . blobs . DeleteNamespace ( ctx , satellite . Bytes ( ) )
return Error . Wrap ( err )
}
2019-11-20 16:28:49 +00:00
// Trash moves the specified piece to the blob trash. If necessary, it converts
// the v0 piece to a v1 piece. It also marks the item as "trashed" in the
// pieceExpirationDB.
func ( store * Store ) Trash ( ctx context . Context , satellite storj . NodeID , pieceID storj . PieceID ) ( err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
// Check if the MaxFormatVersionSupported piece exists. If not, we assume
// this is an old piece version and attempt to migrate it.
2023-04-05 18:03:06 +01:00
_ , err = store . blobs . StatWithStorageFormat ( ctx , blobstore . BlobRef {
2019-11-20 16:28:49 +00:00
Namespace : satellite . Bytes ( ) ,
Key : pieceID . Bytes ( ) ,
} , filestore . MaxFormatVersionSupported )
if err != nil && ! errs . IsFunc ( err , os . IsNotExist ) {
return Error . Wrap ( err )
}
if errs . IsFunc ( err , os . IsNotExist ) {
// MaxFormatVersionSupported does not exist, migrate.
err = store . MigrateV0ToV1 ( ctx , satellite , pieceID )
if err != nil {
2022-11-14 11:14:55 +00:00
if ! errs . Is ( err , sql . ErrNoRows ) {
return Error . Wrap ( err )
}
store . log . Warn ( "failed to migrate v0 piece. Piece may not be recoverable" )
2019-11-20 16:28:49 +00:00
}
}
err = store . expirationInfo . Trash ( ctx , satellite , pieceID )
2023-04-05 18:03:06 +01:00
err = errs . Combine ( err , store . blobs . Trash ( ctx , blobstore . BlobRef {
2019-11-20 16:28:49 +00:00
Namespace : satellite . Bytes ( ) ,
Key : pieceID . Bytes ( ) ,
} ) )
return Error . Wrap ( err )
}
2020-07-16 15:18:02 +01:00
// EmptyTrash deletes pieces in the trash that have been in there longer than trashExpiryInterval.
2019-11-26 16:25:21 +00:00
func ( store * Store ) EmptyTrash ( ctx context . Context , satelliteID storj . NodeID , trashedBefore time . Time ) ( err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
2019-12-21 13:11:24 +00:00
_ , deletedIDs , err := store . blobs . EmptyTrash ( ctx , satelliteID [ : ] , trashedBefore )
2019-11-26 16:25:21 +00:00
if err != nil {
return Error . Wrap ( err )
}
for _ , deletedID := range deletedIDs {
pieceID , pieceIDErr := storj . PieceIDFromBytes ( deletedID )
if pieceIDErr != nil {
return Error . Wrap ( pieceIDErr )
}
_ , deleteErr := store . expirationInfo . DeleteExpiration ( ctx , satelliteID , pieceID )
err = errs . Combine ( err , deleteErr )
}
return Error . Wrap ( err )
}
2020-07-16 15:18:02 +01:00
// RestoreTrash restores all pieces in the trash.
2019-11-20 16:28:49 +00:00
func ( store * Store ) RestoreTrash ( ctx context . Context , satelliteID storj . NodeID ) ( err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
2019-12-21 13:11:24 +00:00
_ , err = store . blobs . RestoreTrash ( ctx , satelliteID . Bytes ( ) )
2019-11-20 16:28:49 +00:00
if err != nil {
return Error . Wrap ( err )
}
return Error . Wrap ( store . expirationInfo . RestoreTrash ( ctx , satelliteID ) )
}
2019-11-04 16:59:45 +00:00
// MigrateV0ToV1 will migrate a piece stored with storage format v0 to storage
// format v1. If the piece is not stored as a v0 piece it will return an error.
// The follow failures are possible:
2022-11-14 11:14:55 +00:00
// - sql.ErrNoRows if the v0pieceInfoDB was corrupted or recreated.
2020-12-05 16:01:42 +00:00
// - Fail to open or read v0 piece. In this case no artifacts remain.
// - Fail to Write or Commit v1 piece. In this case no artifacts remain.
// - Fail to Delete v0 piece. In this case v0 piece may remain,
// but v1 piece will exist and be preferred in future calls.
2019-11-04 16:59:45 +00:00
func ( store * Store ) MigrateV0ToV1 ( ctx context . Context , satelliteID storj . NodeID , pieceID storj . PieceID ) ( err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
info , err := store . v0PieceInfo . Get ( ctx , satelliteID , pieceID )
if err != nil {
return Error . Wrap ( err )
}
err = func ( ) ( err error ) {
r , err := store . Reader ( ctx , satelliteID , pieceID )
if err != nil {
return err
}
defer func ( ) { err = errs . Combine ( err , r . Close ( ) ) } ( )
2022-08-01 10:30:33 +01:00
w , err := store . Writer ( ctx , satelliteID , pieceID , pb . PieceHashAlgorithm_SHA256 )
2019-11-04 16:59:45 +00:00
if err != nil {
return err
}
_ , err = io . Copy ( w , r )
if err != nil {
return errs . Combine ( err , w . Cancel ( ctx ) )
}
header := & pb . PieceHeader {
Hash : w . Hash ( ) ,
CreationTime : info . PieceCreation ,
Signature : info . UplinkPieceHash . GetSignature ( ) ,
OrderLimit : * info . OrderLimit ,
}
return w . Commit ( ctx , header )
} ( )
if err != nil {
return Error . Wrap ( err )
}
2023-04-05 18:03:06 +01:00
err = store . blobs . DeleteWithStorageFormat ( ctx , blobstore . BlobRef {
2019-11-04 16:59:45 +00:00
Namespace : satelliteID . Bytes ( ) ,
Key : pieceID . Bytes ( ) ,
} , filestore . FormatV0 )
if store . v0PieceInfo != nil {
err = errs . Combine ( err , store . v0PieceInfo . Delete ( ctx , satelliteID , pieceID ) )
}
2019-03-18 10:55:06 +00:00
return Error . Wrap ( err )
}
2019-11-13 19:15:31 +00:00
// GetV0PieceInfoDBForTest returns this piece-store's reference to the V0 piece info DB (or nil,
2019-08-08 02:47:30 +01:00
// if this piece-store does not have one). This is ONLY intended for use with testing
// functionality.
2019-11-13 19:15:31 +00:00
func ( store StoreForTest ) GetV0PieceInfoDBForTest ( ) V0PieceInfoDBForTest {
if store . v0PieceInfo == nil {
return nil
}
return store . v0PieceInfo . ( V0PieceInfoDBForTest )
}
// GetHashAndLimit returns the PieceHash and OrderLimit associated with the specified piece. The
// piece must already have been opened for reading, and the associated *Reader passed in.
//
// Once we have migrated everything off of V0 storage and no longer need to support it, this can
// cleanly become a method directly on *Reader and will need only the 'pieceID' parameter.
func ( store * Store ) GetHashAndLimit ( ctx context . Context , satellite storj . NodeID , pieceID storj . PieceID , reader * Reader ) ( pb . PieceHash , pb . OrderLimit , error ) {
if reader . StorageFormatVersion ( ) == filestore . FormatV0 {
info , err := store . GetV0PieceInfo ( ctx , satellite , pieceID )
if err != nil {
return pb . PieceHash { } , pb . OrderLimit { } , err // err is already wrapped as a storagenodedb.ErrPieceInfo
}
return * info . UplinkPieceHash , * info . OrderLimit , nil
}
header , err := reader . GetPieceHeader ( )
if err != nil {
return pb . PieceHash { } , pb . OrderLimit { } , Error . Wrap ( err )
}
pieceHash := pb . PieceHash {
2022-08-01 10:30:33 +01:00
PieceId : pieceID ,
Hash : header . GetHash ( ) ,
HashAlgorithm : header . GetHashAlgorithm ( ) ,
PieceSize : reader . Size ( ) ,
Timestamp : header . GetCreationTime ( ) ,
Signature : header . GetSignature ( ) ,
2019-11-13 19:15:31 +00:00
}
return pieceHash , header . OrderLimit , nil
2019-08-08 02:47:30 +01:00
}
2023-03-01 03:59:53 +00:00
// WalkSatellitePieces wraps FileWalker.WalkSatellitePieces.
2019-08-08 02:47:30 +01:00
func ( store * Store ) WalkSatellitePieces ( ctx context . Context , satellite storj . NodeID , walkFunc func ( StoredPieceAccess ) error ) ( err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
2023-03-01 03:59:53 +00:00
return store . Filewalker . WalkSatellitePieces ( ctx , satellite , walkFunc )
2019-08-08 02:47:30 +01:00
}
2023-04-13 21:10:53 +01:00
// SatellitePiecesToTrash returns a list of piece IDs that are trash for the given satellite.
//
// If the lazy filewalker is enabled, it will be used to find the pieces to trash, otherwise
// the regular filewalker will be used. If the lazy filewalker fails, the regular filewalker
// will be used as a fallback.
func ( store * Store ) SatellitePiecesToTrash ( ctx context . Context , satelliteID storj . NodeID , createdBefore time . Time , filter * bloomfilter . Filter ) ( pieceIDs [ ] storj . PieceID , piecesCount , piecesSkipped int64 , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
if store . config . EnableLazyFilewalker && store . lazyFilewalker != nil {
pieceIDs , piecesCount , piecesSkipped , err = store . lazyFilewalker . WalkSatellitePiecesToTrash ( ctx , satelliteID , createdBefore , filter )
if err == nil {
return pieceIDs , piecesCount , piecesSkipped , nil
}
store . log . Error ( "lazyfilewalker failed" , zap . Error ( err ) )
}
// fallback to the regular filewalker
pieceIDs , piecesCount , piecesSkipped , err = store . Filewalker . WalkSatellitePiecesToTrash ( ctx , satelliteID , createdBefore , filter )
return pieceIDs , piecesCount , piecesSkipped , err
}
2020-07-16 15:18:02 +01:00
// GetExpired gets piece IDs that are expired and were created before the given time.
2019-08-08 02:47:30 +01:00
func ( store * Store ) GetExpired ( ctx context . Context , expiredAt time . Time , limit int64 ) ( _ [ ] ExpiredInfo , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
expired , err := store . expirationInfo . GetExpired ( ctx , expiredAt , limit )
if err != nil {
return nil , err
}
if int64 ( len ( expired ) ) < limit && store . v0PieceInfo != nil {
v0Expired , err := store . v0PieceInfo . GetExpired ( ctx , expiredAt , limit - int64 ( len ( expired ) ) )
if err != nil {
return nil , err
}
expired = append ( expired , v0Expired ... )
}
return expired , nil
}
2020-07-16 15:18:02 +01:00
// SetExpiration records an expiration time for the specified piece ID owned by the specified satellite.
2019-08-08 02:47:30 +01:00
func ( store * Store ) SetExpiration ( ctx context . Context , satellite storj . NodeID , pieceID storj . PieceID , expiresAt time . Time ) ( err error ) {
return store . expirationInfo . SetExpiration ( ctx , satellite , pieceID , expiresAt )
}
// DeleteFailed marks piece as a failed deletion.
func ( store * Store ) DeleteFailed ( ctx context . Context , expired ExpiredInfo , when time . Time ) ( err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
if expired . InPieceInfo {
return store . v0PieceInfo . DeleteFailed ( ctx , expired . SatelliteID , expired . PieceID , when )
}
return store . expirationInfo . DeleteFailed ( ctx , expired . SatelliteID , expired . PieceID , when )
}
// SpaceUsedForPieces returns *an approximation of* the disk space used by all local pieces (both
// V0 and later). This is an approximation because changes may be being applied to the filestore as
// this information is collected, and because it is possible that various errors in directory
// traversal could cause this count to be undersized.
//
2020-01-23 17:47:20 +00:00
// Returns:
// - piecesTotal: the total space used by pieces, including headers
// - piecesContentSize: the space used by piece content, not including headers
//
2020-01-07 23:34:51 +00:00
// This returns both the total size of pieces plus the contentSize of pieces.
func ( store * Store ) SpaceUsedForPieces ( ctx context . Context ) ( piecesTotal int64 , piecesContentSize int64 , err error ) {
2019-08-12 22:43:05 +01:00
if cache , ok := store . blobs . ( * BlobsUsageCache ) ; ok {
return cache . SpaceUsedForPieces ( ctx )
}
2019-08-08 02:47:30 +01:00
satellites , err := store . getAllStoringSatellites ( ctx )
if err != nil {
2020-01-07 23:34:51 +00:00
return 0 , 0 , err
2019-08-08 02:47:30 +01:00
}
for _ , satellite := range satellites {
2020-01-07 23:34:51 +00:00
pieceTotal , pieceContentSize , err := store . SpaceUsedBySatellite ( ctx , satellite )
2019-08-08 02:47:30 +01:00
if err != nil {
2020-01-07 23:34:51 +00:00
return 0 , 0 , err
2019-08-08 02:47:30 +01:00
}
2020-01-07 23:34:51 +00:00
piecesTotal += pieceTotal
piecesContentSize += pieceContentSize
2019-08-08 02:47:30 +01:00
}
2020-01-07 23:34:51 +00:00
return piecesTotal , piecesContentSize , nil
2019-08-08 02:47:30 +01:00
}
2020-01-23 17:47:20 +00:00
// SpaceUsedForTrash returns the total space used by the the piece store's
// trash, including all headers.
2019-12-21 13:11:24 +00:00
func ( store * Store ) SpaceUsedForTrash ( ctx context . Context ) ( int64 , error ) {
// If the blobs is cached, it will return the cached value
return store . blobs . SpaceUsedForTrash ( ctx )
}
// SpaceUsedForPiecesAndTrash returns the total space used by both active
2020-07-16 15:18:02 +01:00
// pieces and the trash directory.
2019-12-21 13:11:24 +00:00
func ( store * Store ) SpaceUsedForPiecesAndTrash ( ctx context . Context ) ( int64 , error ) {
2020-01-07 23:34:51 +00:00
piecesTotal , _ , err := store . SpaceUsedForPieces ( ctx )
2019-12-21 13:11:24 +00:00
if err != nil {
return 0 , err
}
2020-01-07 23:34:51 +00:00
trashTotal , err := store . SpaceUsedForTrash ( ctx )
2019-12-21 13:11:24 +00:00
if err != nil {
return 0 , err
}
2020-01-07 23:34:51 +00:00
return piecesTotal + trashTotal , nil
2019-12-21 13:11:24 +00:00
}
2019-08-08 02:47:30 +01:00
func ( store * Store ) getAllStoringSatellites ( ctx context . Context ) ( [ ] storj . NodeID , error ) {
namespaces , err := store . blobs . ListNamespaces ( ctx )
if err != nil {
return nil , err
}
satellites := make ( [ ] storj . NodeID , len ( namespaces ) )
for i , namespace := range namespaces {
satellites [ i ] , err = storj . NodeIDFromBytes ( namespace )
if err != nil {
return nil , err
}
}
return satellites , nil
}
// SpaceUsedBySatellite calculates *an approximation of* how much disk space is used for local
// piece storage in the given satellite's namespace. This is an approximation because changes may
// be being applied to the filestore as this information is collected, and because it is possible
// that various errors in directory traversal could cause this count to be undersized.
//
2020-01-07 23:34:51 +00:00
// This returns both the total size of pieces plus the contentSize of pieces.
func ( store * Store ) SpaceUsedBySatellite ( ctx context . Context , satelliteID storj . NodeID ) ( piecesTotal , piecesContentSize int64 , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
2019-08-12 22:43:05 +01:00
if cache , ok := store . blobs . ( * BlobsUsageCache ) ; ok {
return cache . SpaceUsedBySatellite ( ctx , satelliteID )
}
2020-01-07 23:34:51 +00:00
err = store . WalkSatellitePieces ( ctx , satelliteID , func ( access StoredPieceAccess ) error {
pieceTotal , pieceContentSize , statErr := access . Size ( ctx )
2019-08-08 02:47:30 +01:00
if statErr != nil {
2023-01-29 21:16:19 +00:00
if os . IsNotExist ( statErr ) {
return nil
}
2019-11-05 21:04:07 +00:00
store . log . Error ( "failed to stat" , zap . Error ( statErr ) , zap . Stringer ( "Piece ID" , access . PieceID ( ) ) , zap . Stringer ( "Satellite ID" , satelliteID ) )
2019-08-08 02:47:30 +01:00
// keep iterating; we want a best effort total here.
return nil
}
2020-01-07 23:34:51 +00:00
piecesTotal += pieceTotal
piecesContentSize += pieceContentSize
2019-08-08 02:47:30 +01:00
return nil
} )
if err != nil {
2020-01-07 23:34:51 +00:00
return 0 , 0 , err
2019-08-08 02:47:30 +01:00
}
2020-01-07 23:34:51 +00:00
return piecesTotal , piecesContentSize , nil
2019-08-08 02:47:30 +01:00
}
2020-07-16 15:18:02 +01:00
// SpaceUsedTotalAndBySatellite adds up the space used by and for all satellites for blob storage.
2020-01-07 23:34:51 +00:00
func ( store * Store ) SpaceUsedTotalAndBySatellite ( ctx context . Context ) ( piecesTotal , piecesContentSize int64 , totalBySatellite map [ storj . NodeID ] SatelliteUsage , err error ) {
2019-08-12 22:43:05 +01:00
defer mon . Task ( ) ( & ctx ) ( & err )
satelliteIDs , err := store . getAllStoringSatellites ( ctx )
if err != nil {
2020-01-07 23:34:51 +00:00
return 0 , 0 , nil , Error . New ( "failed to enumerate satellites: %w" , err )
2019-08-12 22:43:05 +01:00
}
2020-01-07 23:34:51 +00:00
totalBySatellite = map [ storj . NodeID ] SatelliteUsage { }
2020-01-30 18:10:34 +00:00
var group errs . Group
2020-01-30 17:41:24 +00:00
2019-08-12 22:43:05 +01:00
for _ , satelliteID := range satelliteIDs {
2020-01-30 17:41:24 +00:00
var satPiecesTotal int64
var satPiecesContentSize int64
2023-02-28 13:54:01 +00:00
failover := true
if store . config . EnableLazyFilewalker && store . lazyFilewalker != nil {
satPiecesTotal , satPiecesContentSize , err = store . lazyFilewalker . WalkAndComputeSpaceUsedBySatellite ( ctx , satelliteID )
2020-01-30 17:41:24 +00:00
if err != nil {
2023-02-28 13:54:01 +00:00
store . log . Error ( "failed to lazywalk space used by satellite" , zap . Error ( err ) , zap . Stringer ( "Satellite ID" , satelliteID ) )
} else {
failover = false
2020-01-30 17:41:24 +00:00
}
2023-02-28 13:54:01 +00:00
}
if failover {
satPiecesTotal , satPiecesContentSize , err = store . Filewalker . WalkAndComputeSpaceUsedBySatellite ( ctx , satelliteID )
}
2019-08-12 22:43:05 +01:00
if err != nil {
2020-01-30 18:10:34 +00:00
group . Add ( err )
2019-08-12 22:43:05 +01:00
}
2020-01-30 17:41:24 +00:00
piecesTotal += satPiecesTotal
piecesContentSize += satPiecesContentSize
2020-01-07 23:34:51 +00:00
totalBySatellite [ satelliteID ] = SatelliteUsage {
2020-01-30 17:41:24 +00:00
Total : satPiecesTotal ,
ContentSize : satPiecesContentSize ,
2020-01-07 23:34:51 +00:00
}
2019-08-12 22:43:05 +01:00
}
2020-01-30 18:10:34 +00:00
return piecesTotal , piecesContentSize , totalBySatellite , group . Err ( )
2019-08-08 02:47:30 +01:00
}
2019-11-13 19:15:31 +00:00
// GetV0PieceInfo fetches the Info record from the V0 piece info database. Obviously,
// of no use when a piece does not have filestore.FormatV0 storage.
func ( store * Store ) GetV0PieceInfo ( ctx context . Context , satellite storj . NodeID , pieceID storj . PieceID ) ( * Info , error ) {
return store . v0PieceInfo . Get ( ctx , satellite , pieceID )
}
2019-03-18 10:55:06 +00:00
// StorageStatus contains information about the disk store is using.
type StorageStatus struct {
DiskUsed int64
DiskFree int64
}
// StorageStatus returns information about the disk.
2019-06-04 13:31:39 +01:00
func ( store * Store ) StorageStatus ( ctx context . Context ) ( _ StorageStatus , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
2021-09-10 14:05:29 +01:00
diskFree , err := store . blobs . FreeSpace ( ctx )
2019-03-18 10:55:06 +00:00
if err != nil {
return StorageStatus { } , err
}
return StorageStatus {
DiskUsed : - 1 , // TODO set value
DiskFree : diskFree ,
} , nil
}
2019-08-08 02:47:30 +01:00
2020-08-07 17:19:37 +01:00
// CheckWritability tests writability of the storage directory by creating and deleting a file.
2021-09-10 14:05:29 +01:00
func ( store * Store ) CheckWritability ( ctx context . Context ) error {
return store . blobs . CheckWritability ( ctx )
2020-08-07 17:19:37 +01:00
}
2023-03-14 11:09:03 +00:00
// CheckWritabilityWithTimeout tests writability of the storage directory by creating and deleting a file with a timeout.
func ( store * Store ) CheckWritabilityWithTimeout ( ctx context . Context , timeout time . Duration ) error {
ctx , cancel := context . WithTimeout ( ctx , timeout )
defer cancel ( )
ch := make ( chan error , 1 )
go func ( ) {
ch <- store . CheckWritability ( ctx )
} ( )
select {
case err := <- ch :
return err
case <- ctx . Done ( ) :
return ctx . Err ( )
}
}
2022-12-14 20:04:41 +00:00
// Stat looks up disk metadata on the blob file.
2023-04-05 18:03:06 +01:00
func ( store * Store ) Stat ( ctx context . Context , satellite storj . NodeID , pieceID storj . PieceID ) ( blobstore . BlobInfo , error ) {
return store . blobs . Stat ( ctx , blobstore . BlobRef {
2022-12-14 20:04:41 +00:00
Namespace : satellite . Bytes ( ) ,
Key : pieceID . Bytes ( ) ,
} )
}
2019-08-08 02:47:30 +01:00
type storedPieceAccess struct {
2023-04-05 18:03:06 +01:00
blobstore . BlobInfo
2019-08-08 02:47:30 +01:00
pieceID storj . PieceID
2023-04-05 18:03:06 +01:00
blobs blobstore . Blobs
2019-08-08 02:47:30 +01:00
}
2023-04-05 18:03:06 +01:00
func newStoredPieceAccess ( blobs blobstore . Blobs , blobInfo blobstore . BlobInfo ) ( storedPieceAccess , error ) {
2023-03-01 03:59:53 +00:00
ref := blobInfo . BlobRef ( )
pieceID , err := storj . PieceIDFromBytes ( ref . Key )
2019-08-08 02:47:30 +01:00
if err != nil {
return storedPieceAccess { } , err
}
2023-03-01 03:59:53 +00:00
2019-08-08 02:47:30 +01:00
return storedPieceAccess {
BlobInfo : blobInfo ,
2023-03-01 03:59:53 +00:00
blobs : blobs ,
2019-08-08 02:47:30 +01:00
pieceID : pieceID ,
} , nil
}
2020-07-16 15:18:02 +01:00
// PieceID returns the piece ID of the piece.
2019-08-08 02:47:30 +01:00
func ( access storedPieceAccess ) PieceID ( ) storj . PieceID {
return access . pieceID
}
2020-07-16 15:18:02 +01:00
// Satellite returns the satellite ID that owns the piece.
2019-08-08 02:47:30 +01:00
func ( access storedPieceAccess ) Satellite ( ) ( storj . NodeID , error ) {
return storj . NodeIDFromBytes ( access . BlobRef ( ) . Namespace )
}
2020-07-16 15:18:02 +01:00
// Size gives the size of the piece on disk, and the size of the content (not including the piece header, if applicable).
2019-12-21 13:11:24 +00:00
func ( access storedPieceAccess ) Size ( ctx context . Context ) ( size , contentSize int64 , err error ) {
2019-08-08 02:47:30 +01:00
defer mon . Task ( ) ( & ctx ) ( & err )
stat , err := access . Stat ( ctx )
if err != nil {
2019-12-21 13:11:24 +00:00
return 0 , 0 , err
2019-08-08 02:47:30 +01:00
}
size = stat . Size ( )
2019-12-21 13:11:24 +00:00
contentSize = size
2019-08-08 02:47:30 +01:00
if access . StorageFormatVersion ( ) >= filestore . FormatV1 {
2019-12-21 13:11:24 +00:00
contentSize -= V1PieceHeaderReservedArea
2019-08-08 02:47:30 +01:00
}
2019-12-21 13:11:24 +00:00
return size , contentSize , nil
2019-08-08 02:47:30 +01:00
}
// CreationTime returns the piece creation time as given in the original PieceHash (which is likely
// not the same as the file mtime). This requires opening the file and unmarshaling the piece
// header. If exact precision is not required, ModTime() may be a better solution.
func ( access storedPieceAccess ) CreationTime ( ctx context . Context ) ( cTime time . Time , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
2023-03-01 03:59:53 +00:00
blob , err := access . blobs . OpenWithStorageFormat ( ctx , access . BlobInfo . BlobRef ( ) , access . BlobInfo . StorageFormatVersion ( ) )
2019-08-08 02:47:30 +01:00
if err != nil {
2023-03-01 03:59:53 +00:00
if os . IsNotExist ( err ) {
return time . Time { } , err
}
2019-08-08 02:47:30 +01:00
return time . Time { } , err
}
2023-03-01 03:59:53 +00:00
reader , err := NewReader ( blob )
2019-08-08 02:47:30 +01:00
if err != nil {
return time . Time { } , err
}
2023-03-13 13:15:32 +00:00
defer func ( ) {
err = errs . Combine ( err , reader . Close ( ) )
} ( )
2019-08-08 02:47:30 +01:00
header , err := reader . GetPieceHeader ( )
if err != nil {
return time . Time { } , err
}
return header . CreationTime , nil
}
// ModTime returns a less-precise piece creation time than CreationTime, but is generally
// much faster. This gets the piece creation time from to the filesystem instead of the
// piece header.
func ( access storedPieceAccess ) ModTime ( ctx context . Context ) ( mTime time . Time , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
stat , err := access . Stat ( ctx )
if err != nil {
return time . Time { } , err
}
return stat . ModTime ( ) , nil
}