storj/storagenode/storagenodedb/pieceinfo.go
paul cannon 17bdb5e9e5
move piece info into files (#2629)
Deprecate the pieceinfo database, and start storing piece info as a header to
piece files. Institute a "storage format version" concept allowing us to handle
pieces stored under multiple different types of storage. Add a piece_expirations
table which will still be used to track expiration times, so we can query it, but
which should be much smaller than the pieceinfo database would be for the
same number of pieces. (Only pieces with expiration times need to be stored in piece_expirations, and we don't need to store large byte blobs like the serialized
order limit, etc.) Use specialized names for accessing any functionality related
only to dealing with V0 pieces (e.g., `store.V0PieceInfo()`). Move SpaceUsed-
type functionality under the purview of the piece store. Add some generic
interfaces for traversing all blobs or all pieces. Add lots of tests.
2019-08-07 20:47:30 -05:00

289 lines
9.1 KiB
Go

// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package storagenodedb
import (
"context"
"os"
"time"
"github.com/gogo/protobuf/proto"
"github.com/zeebo/errs"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/storj"
"storj.io/storj/storage"
"storj.io/storj/storage/filestore"
"storj.io/storj/storagenode/pieces"
)
type v0PieceInfo struct {
*InfoDB
}
// V0PieceInfo returns database for storing piece information
func (db *DB) V0PieceInfo() pieces.V0PieceInfoDB { return db.info.V0PieceInfo() }
// V0PieceInfo returns database for storing piece information
func (db *InfoDB) V0PieceInfo() pieces.V0PieceInfoDB { return &db.v0PieceInfo }
// Add inserts piece information into the database.
func (db *v0PieceInfo) Add(ctx context.Context, info *pieces.Info) (err error) {
defer mon.Task()(&ctx)(&err)
orderLimit, err := proto.Marshal(info.OrderLimit)
if err != nil {
return ErrInfo.Wrap(err)
}
uplinkPieceHash, err := proto.Marshal(info.UplinkPieceHash)
if err != nil {
return ErrInfo.Wrap(err)
}
var pieceExpiration *time.Time
if !info.PieceExpiration.IsZero() {
utcExpiration := info.PieceExpiration.UTC()
pieceExpiration = &utcExpiration
}
// TODO remove `uplink_cert_id` from DB
_, err = db.db.ExecContext(ctx, db.Rebind(`
INSERT INTO
pieceinfo_(satellite_id, piece_id, piece_size, piece_creation, piece_expiration, order_limit, uplink_piece_hash, uplink_cert_id)
VALUES (?,?,?,?,?,?,?,?)
`), info.SatelliteID, info.PieceID, info.PieceSize, info.PieceCreation.UTC(), pieceExpiration, orderLimit, uplinkPieceHash, 0)
return ErrInfo.Wrap(err)
}
func (db *v0PieceInfo) getAllPiecesOwnedBy(ctx context.Context, blobStore storage.Blobs, satelliteID storj.NodeID) ([]v0StoredPieceAccess, error) {
rows, err := db.db.QueryContext(ctx, db.Rebind(`
SELECT piece_id, piece_size, piece_creation, piece_expiration
FROM pieceinfo_
WHERE satellite_id = ?
ORDER BY piece_id
`), satelliteID)
if err != nil {
return nil, ErrInfo.Wrap(err)
}
defer func() { err = errs.Combine(err, rows.Close()) }()
var pieceInfos []v0StoredPieceAccess
for rows.Next() {
pieceInfos = append(pieceInfos, v0StoredPieceAccess{
blobStore: blobStore,
satellite: satelliteID,
})
thisAccess := &pieceInfos[len(pieceInfos)-1]
err = rows.Scan(&thisAccess.pieceID, &thisAccess.pieceSize, &thisAccess.creationTime, &thisAccess.expirationTime)
if err != nil {
return nil, ErrInfo.Wrap(err)
}
}
return pieceInfos, nil
}
// WalkSatelliteV0Pieces executes walkFunc for each locally stored piece, stored with storage
// format V0 in the namespace of the given satellite. If walkFunc returns a non-nil error,
// WalkSatelliteV0Pieces will stop iterating and return the error immediately. The ctx parameter
// parameter is intended specifically to allow canceling iteration early.
//
// If blobStore is nil, the .Stat() and .FullPath() methods of the provided StoredPieceAccess
// instances will not work, but otherwise everything should be ok.
func (db *v0PieceInfo) WalkSatelliteV0Pieces(ctx context.Context, blobStore storage.Blobs, satelliteID storj.NodeID, walkFunc func(pieces.StoredPieceAccess) error) (err error) {
defer mon.Task()(&ctx)(&err)
// TODO: is it worth paging this query? we hope that SNs will not yet have too many V0 pieces.
pieceInfos, err := db.getAllPiecesOwnedBy(ctx, blobStore, satelliteID)
if err != nil {
return err
}
// note we must not keep a transaction open with the db when calling walkFunc; the callback
// might need to make db calls as well
for i := range pieceInfos {
if err := ctx.Err(); err != nil {
return err
}
if err := walkFunc(&pieceInfos[i]); err != nil {
return err
}
}
return nil
}
// Get gets piece information by satellite id and piece id.
func (db *v0PieceInfo) Get(ctx context.Context, satelliteID storj.NodeID, pieceID storj.PieceID) (_ *pieces.Info, err error) {
defer mon.Task()(&ctx)(&err)
info := &pieces.Info{}
info.SatelliteID = satelliteID
info.PieceID = pieceID
var orderLimit []byte
var uplinkPieceHash []byte
var pieceExpiration *time.Time
err = db.db.QueryRowContext(ctx, db.Rebind(`
SELECT piece_size, piece_creation, piece_expiration, order_limit, uplink_piece_hash
FROM pieceinfo_
WHERE satellite_id = ? AND piece_id = ?
`), satelliteID, pieceID).Scan(&info.PieceSize, &info.PieceCreation, &pieceExpiration, &orderLimit, &uplinkPieceHash)
if err != nil {
return nil, ErrInfo.Wrap(err)
}
if pieceExpiration != nil {
info.PieceExpiration = *pieceExpiration
}
info.OrderLimit = &pb.OrderLimit{}
err = proto.Unmarshal(orderLimit, info.OrderLimit)
if err != nil {
return nil, ErrInfo.Wrap(err)
}
info.UplinkPieceHash = &pb.PieceHash{}
err = proto.Unmarshal(uplinkPieceHash, info.UplinkPieceHash)
if err != nil {
return nil, ErrInfo.Wrap(err)
}
return info, nil
}
// Delete deletes piece information.
func (db *v0PieceInfo) Delete(ctx context.Context, satelliteID storj.NodeID, pieceID storj.PieceID) (err error) {
defer mon.Task()(&ctx)(&err)
_, err = db.db.ExecContext(ctx, db.Rebind(`
DELETE FROM pieceinfo_
WHERE satellite_id = ?
AND piece_id = ?
`), satelliteID, pieceID)
return ErrInfo.Wrap(err)
}
// DeleteFailed marks piece as a failed deletion.
func (db *v0PieceInfo) DeleteFailed(ctx context.Context, satelliteID storj.NodeID, pieceID storj.PieceID, now time.Time) (err error) {
defer mon.Task()(&ctx)(&err)
_, err = db.db.ExecContext(ctx, db.Rebind(`
UPDATE pieceinfo_
SET deletion_failed_at = ?
WHERE satellite_id = ?
AND piece_id = ?
`), now.UTC(), satelliteID, pieceID)
return ErrInfo.Wrap(err)
}
// GetExpired gets ExpiredInfo records for pieces that are expired.
func (db *v0PieceInfo) GetExpired(ctx context.Context, expiredAt time.Time, limit int64) (infos []pieces.ExpiredInfo, err error) {
defer mon.Task()(&ctx)(&err)
rows, err := db.db.QueryContext(ctx, db.Rebind(`
SELECT satellite_id, piece_id
FROM pieceinfo_
WHERE piece_expiration IS NOT NULL
AND piece_expiration < ?
AND ((deletion_failed_at IS NULL) OR deletion_failed_at <> ?)
ORDER BY satellite_id
LIMIT ?
`), expiredAt.UTC(), expiredAt.UTC(), limit)
if err != nil {
return nil, ErrInfo.Wrap(err)
}
defer func() { err = errs.Combine(err, rows.Close()) }()
for rows.Next() {
info := pieces.ExpiredInfo{InPieceInfo: true}
err = rows.Scan(&info.SatelliteID, &info.PieceID)
if err != nil {
return infos, ErrInfo.Wrap(err)
}
infos = append(infos, info)
}
return infos, nil
}
type v0StoredPieceAccess struct {
blobStore storage.Blobs
satellite storj.NodeID
pieceID storj.PieceID
pieceSize int64
creationTime time.Time
expirationTime *time.Time
blobInfo storage.BlobInfo
}
// PieceID returns the piece ID for the piece
func (v0Access v0StoredPieceAccess) PieceID() storj.PieceID {
return v0Access.pieceID
}
// Satellite returns the satellite ID that owns the piece
func (v0Access v0StoredPieceAccess) Satellite() (storj.NodeID, error) {
return v0Access.satellite, nil
}
// BlobRef returns the relevant storage.BlobRef locator for the piece
func (v0Access v0StoredPieceAccess) BlobRef() storage.BlobRef {
return storage.BlobRef{
Namespace: v0Access.satellite.Bytes(),
Key: v0Access.pieceID.Bytes(),
}
}
func (v0Access v0StoredPieceAccess) fillInBlobAccess(ctx context.Context) error {
if v0Access.blobInfo == nil {
if v0Access.blobStore == nil {
return errs.New("this v0StoredPieceAccess instance has no blobStore reference, and cannot look up the relevant blob")
}
blobInfo, err := v0Access.blobStore.StatWithStorageFormat(ctx, v0Access.BlobRef(), v0Access.StorageFormatVersion())
if err != nil {
return err
}
v0Access.blobInfo = blobInfo
}
return nil
}
// ContentSize gives the size of the piece content (not including the piece header, if applicable)
func (v0Access v0StoredPieceAccess) ContentSize(ctx context.Context) (int64, error) {
return v0Access.pieceSize, nil
}
// CreationTime returns the piece creation time as given in the original order (which is not
// necessarily the same as the file mtime).
func (v0Access v0StoredPieceAccess) CreationTime(ctx context.Context) (time.Time, error) {
return v0Access.creationTime, nil
}
// ModTime returns the same thing as CreationTime for V0 blobs. The intent is for ModTime to
// be a little faster when CreationTime is too slow and the precision is not needed, but in
// this case we already have the exact creation time from the database.
func (v0Access v0StoredPieceAccess) ModTime(ctx context.Context) (time.Time, error) {
return v0Access.creationTime, nil
}
// FullPath gives the full path to the on-disk blob file
func (v0Access v0StoredPieceAccess) FullPath(ctx context.Context) (string, error) {
if err := v0Access.fillInBlobAccess(ctx); err != nil {
return "", err
}
return v0Access.blobInfo.FullPath(ctx)
}
// StorageFormatVersion indicates the storage format version used to store the piece
func (v0Access v0StoredPieceAccess) StorageFormatVersion() storage.FormatVersion {
return filestore.FormatV0
}
// Stat does a stat on the on-disk blob file
func (v0Access v0StoredPieceAccess) Stat(ctx context.Context) (os.FileInfo, error) {
if err := v0Access.fillInBlobAccess(ctx); err != nil {
return nil, err
}
return v0Access.blobInfo.Stat(ctx)
}