storj/storagenode/storagenodedb/pieceinfo.go

289 lines
9.0 KiB
Go
Raw Normal View History

// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package storagenodedb
import (
"context"
"os"
"time"
"github.com/gogo/protobuf/proto"
"github.com/zeebo/errs"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/storj"
"storj.io/storj/storage"
"storj.io/storj/storage/filestore"
"storj.io/storj/storagenode/pieces"
)
// ErrPieceInfo represents errors from the piece info database.
var ErrPieceInfo = errs.Class("v0pieceinfodb error")
storagenode/storagenodedb: refactor both data access objects and migrations to support multiple DB connections (#3057) * Split the info.db database into multiple DBs using Backup API. * Remove location. Prev refactor assumed we would need this but don't. * Added VACUUM to reclaim space after splitting storage node databases. * Added unique names to SQLite3 connection hooks to fix testplanet. * Moving DB closing to the migration step. * Removing the closing of the versions DB. It's already getting closed. * Swapping the database connection references on reconnect. * Moved sqlite closing logic away from the boltdb closing logic. * Moved sqlite closing logic away from the boltdb closing logic. * Remove certificate and vouchers from DB split migration. * Removed vouchers and bumped up the migration version. * Use same constructor in tests for storage node databases. * Use same constructor in tests for storage node databases. * Adding method to access underlining SQL database connections and cleanup * Adding logging for migration diagnostics. * Moved migration closing database logic to minimize disk usage. * Cleaning up error handling. * Fix missing copyright. * Fix linting error. * Add test for migration 21 (#3012) * Refactoring migration code into a nicer to use object. * Refactoring migration code into a nicer to use object. * Fixing broken migration test. * Removed unnecessary code that is no longer needed now that we close DBs. * Removed unnecessary code that is no longer needed now that we close DBs. * Fixed bug where an invalid database path was being opened. * Fixed linting errors. * Renamed VersionsDB to LegacyInfoDB and refactored DB lookup keys. * Renamed VersionsDB to LegacyInfoDB and refactored DB lookup keys. * Fix migration test. NOTE: This change does not address new tables satellites and satellite_exit_progress * Removing v22 migration to move into it's own PR. * Removing v22 migration to move into it's own PR. * Refactored schema, rebind and configure functions to be re-useable. * Renamed LegacyInfoDB to DeprecatedInfoDB. * Cleaned up closeDatabase function. * Renamed storageNodeSQLDB to migratableDB. * Switched from using errs.Combine() to errs.Group in closeDatabases func. * Removed constructors from storage node data access objects. * Reformatted usage of const. * Fixed broken test snapshots. * Fixed linting error.
2019-09-18 17:17:28 +01:00
// PieceInfoDBName represents the database name.
const PieceInfoDBName = "pieceinfo"
storagenode/storagenodedb: refactor both data access objects and migrations to support multiple DB connections (#3057) * Split the info.db database into multiple DBs using Backup API. * Remove location. Prev refactor assumed we would need this but don't. * Added VACUUM to reclaim space after splitting storage node databases. * Added unique names to SQLite3 connection hooks to fix testplanet. * Moving DB closing to the migration step. * Removing the closing of the versions DB. It's already getting closed. * Swapping the database connection references on reconnect. * Moved sqlite closing logic away from the boltdb closing logic. * Moved sqlite closing logic away from the boltdb closing logic. * Remove certificate and vouchers from DB split migration. * Removed vouchers and bumped up the migration version. * Use same constructor in tests for storage node databases. * Use same constructor in tests for storage node databases. * Adding method to access underlining SQL database connections and cleanup * Adding logging for migration diagnostics. * Moved migration closing database logic to minimize disk usage. * Cleaning up error handling. * Fix missing copyright. * Fix linting error. * Add test for migration 21 (#3012) * Refactoring migration code into a nicer to use object. * Refactoring migration code into a nicer to use object. * Fixing broken migration test. * Removed unnecessary code that is no longer needed now that we close DBs. * Removed unnecessary code that is no longer needed now that we close DBs. * Fixed bug where an invalid database path was being opened. * Fixed linting errors. * Renamed VersionsDB to LegacyInfoDB and refactored DB lookup keys. * Renamed VersionsDB to LegacyInfoDB and refactored DB lookup keys. * Fix migration test. NOTE: This change does not address new tables satellites and satellite_exit_progress * Removing v22 migration to move into it's own PR. * Removing v22 migration to move into it's own PR. * Refactored schema, rebind and configure functions to be re-useable. * Renamed LegacyInfoDB to DeprecatedInfoDB. * Cleaned up closeDatabase function. * Renamed storageNodeSQLDB to migratableDB. * Switched from using errs.Combine() to errs.Group in closeDatabases func. * Removed constructors from storage node data access objects. * Reformatted usage of const. * Fixed broken test snapshots. * Fixed linting error.
2019-09-18 17:17:28 +01:00
type v0PieceInfoDB struct {
dbContainerImpl
}
// Add inserts piece information into the database.
func (db *v0PieceInfoDB) Add(ctx context.Context, info *pieces.Info) (err error) {
defer mon.Task()(&ctx)(&err)
orderLimit, err := proto.Marshal(info.OrderLimit)
if err != nil {
return ErrPieceInfo.Wrap(err)
}
uplinkPieceHash, err := proto.Marshal(info.UplinkPieceHash)
if err != nil {
return ErrPieceInfo.Wrap(err)
}
var pieceExpiration *time.Time
if !info.PieceExpiration.IsZero() {
utcExpiration := info.PieceExpiration.UTC()
pieceExpiration = &utcExpiration
}
// TODO remove `uplink_cert_id` from DB
_, err = db.ExecContext(ctx, `
INSERT INTO
2019-07-16 17:31:29 +01:00
pieceinfo_(satellite_id, piece_id, piece_size, piece_creation, piece_expiration, order_limit, uplink_piece_hash, uplink_cert_id)
VALUES (?,?,?,?,?,?,?,?)
`, info.SatelliteID, info.PieceID, info.PieceSize, info.PieceCreation.UTC(), pieceExpiration, orderLimit, uplinkPieceHash, 0)
return ErrPieceInfo.Wrap(err)
}
func (db *v0PieceInfoDB) getAllPiecesOwnedBy(ctx context.Context, blobStore storage.Blobs, satelliteID storj.NodeID) ([]v0StoredPieceAccess, error) {
rows, err := db.QueryContext(ctx, `
SELECT piece_id, piece_size, piece_creation, piece_expiration
2019-07-16 17:31:29 +01:00
FROM pieceinfo_
WHERE satellite_id = ?
ORDER BY piece_id
`, satelliteID)
if err != nil {
return nil, ErrPieceInfo.Wrap(err)
}
defer func() { err = errs.Combine(err, rows.Close()) }()
var pieceInfos []v0StoredPieceAccess
for rows.Next() {
pieceInfos = append(pieceInfos, v0StoredPieceAccess{
blobStore: blobStore,
satellite: satelliteID,
})
thisAccess := &pieceInfos[len(pieceInfos)-1]
err = rows.Scan(&thisAccess.pieceID, &thisAccess.pieceSize, &thisAccess.creationTime, &thisAccess.expirationTime)
if err != nil {
return nil, ErrPieceInfo.Wrap(err)
}
}
return pieceInfos, nil
}
// WalkSatelliteV0Pieces executes walkFunc for each locally stored piece, stored with storage
// format V0 in the namespace of the given satellite. If walkFunc returns a non-nil error,
// WalkSatelliteV0Pieces will stop iterating and return the error immediately. The ctx parameter
// parameter is intended specifically to allow canceling iteration early.
//
// If blobStore is nil, the .Stat() and .FullPath() methods of the provided StoredPieceAccess
// instances will not work, but otherwise everything should be ok.
func (db *v0PieceInfoDB) WalkSatelliteV0Pieces(ctx context.Context, blobStore storage.Blobs, satelliteID storj.NodeID, walkFunc func(pieces.StoredPieceAccess) error) (err error) {
defer mon.Task()(&ctx)(&err)
// TODO: is it worth paging this query? we hope that SNs will not yet have too many V0 pieces.
pieceInfos, err := db.getAllPiecesOwnedBy(ctx, blobStore, satelliteID)
if err != nil {
return err
}
// note we must not keep a transaction open with the db when calling walkFunc; the callback
// might need to make db calls as well
for i := range pieceInfos {
if err := ctx.Err(); err != nil {
return err
}
if err := walkFunc(&pieceInfos[i]); err != nil {
return err
}
}
return nil
}
// Get gets piece information by satellite id and piece id.
func (db *v0PieceInfoDB) Get(ctx context.Context, satelliteID storj.NodeID, pieceID storj.PieceID) (_ *pieces.Info, err error) {
defer mon.Task()(&ctx)(&err)
info := &pieces.Info{}
info.SatelliteID = satelliteID
info.PieceID = pieceID
var orderLimit []byte
var uplinkPieceHash []byte
var pieceExpiration *time.Time
err = db.QueryRowContext(ctx, `
SELECT piece_size, piece_creation, piece_expiration, order_limit, uplink_piece_hash
2019-07-16 17:31:29 +01:00
FROM pieceinfo_
WHERE satellite_id = ? AND piece_id = ?
`, satelliteID, pieceID).Scan(&info.PieceSize, &info.PieceCreation, &pieceExpiration, &orderLimit, &uplinkPieceHash)
if err != nil {
return nil, ErrPieceInfo.Wrap(err)
}
if pieceExpiration != nil {
info.PieceExpiration = *pieceExpiration
}
info.OrderLimit = &pb.OrderLimit{}
err = proto.Unmarshal(orderLimit, info.OrderLimit)
if err != nil {
return nil, ErrPieceInfo.Wrap(err)
}
info.UplinkPieceHash = &pb.PieceHash{}
err = proto.Unmarshal(uplinkPieceHash, info.UplinkPieceHash)
if err != nil {
return nil, ErrPieceInfo.Wrap(err)
}
return info, nil
}
// Delete deletes piece information.
func (db *v0PieceInfoDB) Delete(ctx context.Context, satelliteID storj.NodeID, pieceID storj.PieceID) (err error) {
defer mon.Task()(&ctx)(&err)
_, err = db.ExecContext(ctx, `
2019-07-16 17:31:29 +01:00
DELETE FROM pieceinfo_
WHERE satellite_id = ?
2019-05-08 12:11:59 +01:00
AND piece_id = ?
`, satelliteID, pieceID)
return ErrPieceInfo.Wrap(err)
}
2019-05-08 12:11:59 +01:00
// DeleteFailed marks piece as a failed deletion.
func (db *v0PieceInfoDB) DeleteFailed(ctx context.Context, satelliteID storj.NodeID, pieceID storj.PieceID, now time.Time) (err error) {
defer mon.Task()(&ctx)(&err)
_, err = db.ExecContext(ctx, `
2019-07-16 17:31:29 +01:00
UPDATE pieceinfo_
2019-05-08 12:11:59 +01:00
SET deletion_failed_at = ?
WHERE satellite_id = ?
2019-05-08 12:11:59 +01:00
AND piece_id = ?
`, now.UTC(), satelliteID, pieceID)
2019-05-08 12:11:59 +01:00
return ErrPieceInfo.Wrap(err)
}
// GetExpired gets ExpiredInfo records for pieces that are expired.
func (db *v0PieceInfoDB) GetExpired(ctx context.Context, expiredAt time.Time, limit int64) (infos []pieces.ExpiredInfo, err error) {
defer mon.Task()(&ctx)(&err)
rows, err := db.QueryContext(ctx, `
SELECT satellite_id, piece_id
2019-07-16 17:31:29 +01:00
FROM pieceinfo_
WHERE piece_expiration IS NOT NULL
AND piece_expiration < ?
AND ((deletion_failed_at IS NULL) OR deletion_failed_at <> ?)
2019-05-08 12:11:59 +01:00
ORDER BY satellite_id
LIMIT ?
`, expiredAt.UTC(), expiredAt.UTC(), limit)
if err != nil {
return nil, ErrPieceInfo.Wrap(err)
}
defer func() { err = errs.Combine(err, rows.Close()) }()
for rows.Next() {
info := pieces.ExpiredInfo{InPieceInfo: true}
err = rows.Scan(&info.SatelliteID, &info.PieceID)
if err != nil {
return infos, ErrPieceInfo.Wrap(err)
}
2019-05-08 12:11:59 +01:00
infos = append(infos, info)
}
2019-05-08 12:11:59 +01:00
return infos, nil
}
type v0StoredPieceAccess struct {
blobStore storage.Blobs
satellite storj.NodeID
pieceID storj.PieceID
pieceSize int64
creationTime time.Time
expirationTime *time.Time
blobInfo storage.BlobInfo
}
// PieceID returns the piece ID for the piece
func (v0Access v0StoredPieceAccess) PieceID() storj.PieceID {
return v0Access.pieceID
}
// Satellite returns the satellite ID that owns the piece
func (v0Access v0StoredPieceAccess) Satellite() (storj.NodeID, error) {
return v0Access.satellite, nil
}
// BlobRef returns the relevant storage.BlobRef locator for the piece
func (v0Access v0StoredPieceAccess) BlobRef() storage.BlobRef {
return storage.BlobRef{
Namespace: v0Access.satellite.Bytes(),
Key: v0Access.pieceID.Bytes(),
}
}
func (v0Access v0StoredPieceAccess) fillInBlobAccess(ctx context.Context) error {
if v0Access.blobInfo == nil {
if v0Access.blobStore == nil {
return errs.New("this v0StoredPieceAccess instance has no blobStore reference, and cannot look up the relevant blob")
}
blobInfo, err := v0Access.blobStore.StatWithStorageFormat(ctx, v0Access.BlobRef(), v0Access.StorageFormatVersion())
if err != nil {
return err
}
v0Access.blobInfo = blobInfo
2019-05-08 12:11:59 +01:00
}
return nil
}
// ContentSize gives the size of the piece content (not including the piece header, if applicable)
func (v0Access v0StoredPieceAccess) ContentSize(ctx context.Context) (int64, error) {
return v0Access.pieceSize, nil
}
// CreationTime returns the piece creation time as given in the original order (which is not
// necessarily the same as the file mtime).
func (v0Access v0StoredPieceAccess) CreationTime(ctx context.Context) (time.Time, error) {
return v0Access.creationTime, nil
}
// ModTime returns the same thing as CreationTime for V0 blobs. The intent is for ModTime to
// be a little faster when CreationTime is too slow and the precision is not needed, but in
// this case we already have the exact creation time from the database.
func (v0Access v0StoredPieceAccess) ModTime(ctx context.Context) (time.Time, error) {
return v0Access.creationTime, nil
}
// FullPath gives the full path to the on-disk blob file
func (v0Access v0StoredPieceAccess) FullPath(ctx context.Context) (string, error) {
if err := v0Access.fillInBlobAccess(ctx); err != nil {
return "", err
}
return v0Access.blobInfo.FullPath(ctx)
}
// StorageFormatVersion indicates the storage format version used to store the piece
func (v0Access v0StoredPieceAccess) StorageFormatVersion() storage.FormatVersion {
return filestore.FormatV0
}
// Stat does a stat on the on-disk blob file
func (v0Access v0StoredPieceAccess) Stat(ctx context.Context) (os.FileInfo, error) {
if err := v0Access.fillInBlobAccess(ctx); err != nil {
return nil, err
}
return v0Access.blobInfo.Stat(ctx)
}