2019-08-08 02:47:30 +01:00
|
|
|
// Copyright (C) 2019 Storj Labs, Inc.
|
|
|
|
// See LICENSE for copying information.
|
|
|
|
|
|
|
|
package storagenodedb
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"time"
|
|
|
|
|
|
|
|
"github.com/zeebo/errs"
|
|
|
|
|
|
|
|
"storj.io/storj/pkg/storj"
|
|
|
|
"storj.io/storj/storagenode/pieces"
|
|
|
|
)
|
|
|
|
|
2019-08-21 15:32:25 +01:00
|
|
|
// ErrPieceExpiration represents errors from the piece expiration database.
|
|
|
|
var ErrPieceExpiration = errs.Class("piece expiration error")
|
|
|
|
|
2019-08-08 02:47:30 +01:00
|
|
|
type pieceExpirationDB struct {
|
2019-08-21 15:32:25 +01:00
|
|
|
location string
|
|
|
|
SQLDB
|
2019-08-08 02:47:30 +01:00
|
|
|
}
|
|
|
|
|
2019-08-21 15:32:25 +01:00
|
|
|
// newPieceExpirationDB returns a new instance of pieceExpirationDB initialized with the specified database.
|
|
|
|
func newPieceExpirationDB(db SQLDB, location string) *pieceExpirationDB {
|
|
|
|
return &pieceExpirationDB{
|
|
|
|
location: location,
|
|
|
|
SQLDB: db,
|
|
|
|
}
|
|
|
|
}
|
2019-08-08 02:47:30 +01:00
|
|
|
|
|
|
|
// GetExpired gets piece IDs that expire or have expired before the given time
|
|
|
|
func (db *pieceExpirationDB) GetExpired(ctx context.Context, expiresBefore time.Time, limit int64) (expiredPieceIDs []pieces.ExpiredInfo, err error) {
|
|
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
|
2019-08-21 15:32:25 +01:00
|
|
|
rows, err := db.QueryContext(ctx, `
|
2019-08-08 02:47:30 +01:00
|
|
|
SELECT satellite_id, piece_id
|
|
|
|
FROM piece_expirations
|
|
|
|
WHERE piece_expiration < ?
|
|
|
|
AND ((deletion_failed_at IS NULL) OR deletion_failed_at <> ?)
|
|
|
|
LIMIT ?
|
|
|
|
`, expiresBefore.UTC(), expiresBefore.UTC(), limit)
|
|
|
|
if err != nil {
|
2019-08-21 15:32:25 +01:00
|
|
|
return nil, ErrPieceExpiration.Wrap(err)
|
2019-08-08 02:47:30 +01:00
|
|
|
}
|
|
|
|
defer func() { err = errs.Combine(err, rows.Close()) }()
|
|
|
|
|
|
|
|
for rows.Next() {
|
|
|
|
var satelliteID storj.NodeID
|
|
|
|
var pieceID storj.PieceID
|
|
|
|
err = rows.Scan(&satelliteID, &pieceID)
|
|
|
|
if err != nil {
|
2019-08-21 15:32:25 +01:00
|
|
|
return nil, ErrPieceExpiration.Wrap(err)
|
2019-08-08 02:47:30 +01:00
|
|
|
}
|
|
|
|
expiredPieceIDs = append(expiredPieceIDs, pieces.ExpiredInfo{
|
|
|
|
SatelliteID: satelliteID,
|
|
|
|
PieceID: pieceID,
|
|
|
|
InPieceInfo: false,
|
|
|
|
})
|
|
|
|
}
|
|
|
|
return expiredPieceIDs, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// SetExpiration sets an expiration time for the given piece ID on the given satellite
|
|
|
|
func (db *pieceExpirationDB) SetExpiration(ctx context.Context, satellite storj.NodeID, pieceID storj.PieceID, expiresAt time.Time) (err error) {
|
|
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
|
2019-08-21 15:32:25 +01:00
|
|
|
_, err = db.ExecContext(ctx, `
|
2019-08-08 02:47:30 +01:00
|
|
|
INSERT INTO piece_expirations(satellite_id, piece_id, piece_expiration)
|
|
|
|
VALUES (?,?,?)
|
|
|
|
`, satellite, pieceID, expiresAt.UTC())
|
2019-08-21 15:32:25 +01:00
|
|
|
return ErrPieceExpiration.Wrap(err)
|
2019-08-08 02:47:30 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
// DeleteExpiration removes an expiration record for the given piece ID on the given satellite
|
|
|
|
func (db *pieceExpirationDB) DeleteExpiration(ctx context.Context, satelliteID storj.NodeID, pieceID storj.PieceID) (found bool, err error) {
|
|
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
|
2019-08-21 15:32:25 +01:00
|
|
|
result, err := db.ExecContext(ctx, `
|
2019-08-08 02:47:30 +01:00
|
|
|
DELETE FROM piece_expirations
|
|
|
|
WHERE satellite_id = ? AND piece_id = ?
|
|
|
|
`, satelliteID, pieceID)
|
|
|
|
if err != nil {
|
|
|
|
return false, err
|
|
|
|
}
|
|
|
|
numRows, err := result.RowsAffected()
|
|
|
|
if err != nil {
|
|
|
|
return false, err
|
|
|
|
}
|
|
|
|
return numRows > 0, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// DeleteFailed marks an expiration record as having experienced a failure in deleting the piece
|
|
|
|
// from the disk
|
|
|
|
func (db *pieceExpirationDB) DeleteFailed(ctx context.Context, satelliteID storj.NodeID, pieceID storj.PieceID, when time.Time) (err error) {
|
|
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
|
2019-08-21 15:32:25 +01:00
|
|
|
_, err = db.ExecContext(ctx, `
|
2019-08-08 02:47:30 +01:00
|
|
|
UPDATE piece_expirations
|
|
|
|
SET deletion_failed_at = ?
|
|
|
|
WHERE satellite_id = ?
|
|
|
|
AND piece_id = ?
|
|
|
|
`, when.UTC(), satelliteID, pieceID)
|
2019-08-21 15:32:25 +01:00
|
|
|
return ErrPieceExpiration.Wrap(err)
|
2019-08-08 02:47:30 +01:00
|
|
|
}
|