14fd6a9ef0
This change updates the storagenode piecestore apis to expose access to the full piece size stored on disk. Previously we only had access to (and only kept a cache of) the content size used for all pieces. This was inaccurate when reporting the amount of disk space used by nodes. We now have access to the total content size, as well as the total disk usage, of all pieces. The pieces cache also keeps a cache of the total piece size along with the content size. Change-Id: I4fffe7e1257e04c46021a2e37c5adc6fe69bee55
233 lines
6.4 KiB
Go
233 lines
6.4 KiB
Go
// Copyright (C) 2019 Storj Labs, Inc.
|
|
// See LICENSE for copying information.
|
|
|
|
package storagenodedb
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
|
|
"github.com/zeebo/errs"
|
|
|
|
"storj.io/common/storj"
|
|
"storj.io/storj/storagenode/pieces"
|
|
)
|
|
|
|
// ErrPieceSpaceUsed represents errors from the piece spaced used database.
|
|
var ErrPieceSpaceUsed = errs.Class("piece space used error")
|
|
|
|
// PieceSpaceUsedDBName represents the database name.
|
|
const PieceSpaceUsedDBName = "piece_spaced_used"
|
|
|
|
// trashTotalRowName is the special "satellite_id" used in the db to represent
|
|
// the total stored for trash. Similar to how we use NULL as a special value
|
|
// for satellite_id to represent the total for pieces, this value is used to
|
|
// identify the row storing the total for trash.
|
|
//
|
|
// It is intentionally an otherwise-invalid satellite_id (not 32 bytes) so that
|
|
// it cannot conflict with real satellite_id names
|
|
const trashTotalRowName = "trashtotal"
|
|
|
|
type pieceSpaceUsedDB struct {
|
|
dbContainerImpl
|
|
}
|
|
|
|
// Init creates the total pieces and total trash records if they don't already exist
|
|
func (db *pieceSpaceUsedDB) Init(ctx context.Context) (err error) {
|
|
totalPiecesRow := db.QueryRowContext(ctx, `
|
|
SELECT total
|
|
FROM piece_space_used
|
|
WHERE satellite_id IS NULL
|
|
AND satellite_id IS NOT ?;
|
|
`, trashTotalRowName)
|
|
|
|
var piecesTotal int64
|
|
err = totalPiecesRow.Scan(&piecesTotal)
|
|
if err != nil {
|
|
if err == sql.ErrNoRows {
|
|
err = db.createInitTotalPieces(ctx)
|
|
if err != nil {
|
|
return ErrPieceSpaceUsed.Wrap(err)
|
|
}
|
|
}
|
|
}
|
|
|
|
totalTrashRow := db.QueryRowContext(ctx, `
|
|
SELECT total
|
|
FROM piece_space_used
|
|
WHERE satellite_id = ?;
|
|
`, trashTotalRowName)
|
|
|
|
var trashTotal int64
|
|
err = totalTrashRow.Scan(&trashTotal)
|
|
if err != nil {
|
|
if err == sql.ErrNoRows {
|
|
err = db.createInitTotalTrash(ctx)
|
|
if err != nil {
|
|
return ErrPieceSpaceUsed.Wrap(err)
|
|
}
|
|
}
|
|
}
|
|
|
|
return ErrPieceSpaceUsed.Wrap(err)
|
|
}
|
|
|
|
func (db *pieceSpaceUsedDB) createInitTotalPieces(ctx context.Context) (err error) {
|
|
_, err = db.ExecContext(ctx, `
|
|
INSERT INTO piece_space_used (total, content_size) VALUES (0, 0)
|
|
`)
|
|
return ErrPieceSpaceUsed.Wrap(err)
|
|
}
|
|
|
|
func (db *pieceSpaceUsedDB) createInitTotalTrash(ctx context.Context) (err error) {
|
|
_, err = db.ExecContext(ctx, `
|
|
INSERT INTO piece_space_used (total, content_size, satellite_id) VALUES (0, 0, ?)
|
|
`, trashTotalRowName)
|
|
return ErrPieceSpaceUsed.Wrap(err)
|
|
}
|
|
|
|
// GetPieceTotal returns the total space used (total and contentSize) for all pieces stored
|
|
func (db *pieceSpaceUsedDB) GetPieceTotals(ctx context.Context) (total int64, contentSize int64, err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
row := db.QueryRowContext(ctx, `
|
|
SELECT total, content_size
|
|
FROM piece_space_used
|
|
WHERE satellite_id IS NULL;
|
|
`)
|
|
|
|
err = row.Scan(&total, &contentSize)
|
|
if err != nil {
|
|
if err == sql.ErrNoRows {
|
|
return 0, 0, nil
|
|
}
|
|
return 0, 0, ErrPieceSpaceUsed.Wrap(err)
|
|
}
|
|
return total, contentSize, nil
|
|
}
|
|
|
|
// GetTrashTotal returns the total space used by all trash
|
|
func (db *pieceSpaceUsedDB) GetTrashTotal(ctx context.Context) (total int64, err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
row := db.QueryRowContext(ctx, `
|
|
SELECT total
|
|
FROM piece_space_used
|
|
WHERE satellite_id = ?
|
|
`, trashTotalRowName)
|
|
|
|
err = row.Scan(&total)
|
|
if err != nil {
|
|
if err == sql.ErrNoRows {
|
|
return total, nil
|
|
}
|
|
return total, ErrPieceSpaceUsed.Wrap(err)
|
|
}
|
|
return total, nil
|
|
}
|
|
|
|
// GetPieceTotalsForAllSatellites returns how much space used by pieces stored for each satelliteID
|
|
func (db *pieceSpaceUsedDB) GetPieceTotalsForAllSatellites(ctx context.Context) (_ map[storj.NodeID]pieces.SatelliteUsage, err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
rows, err := db.QueryContext(ctx, `
|
|
SELECT total, content_size, satellite_id
|
|
FROM piece_space_used
|
|
WHERE satellite_id IS NOT NULL
|
|
AND satellite_id IS NOT ?
|
|
`, trashTotalRowName)
|
|
if err != nil {
|
|
if err == sql.ErrNoRows {
|
|
return nil, nil
|
|
}
|
|
return nil, ErrPieceSpaceUsed.Wrap(err)
|
|
}
|
|
defer func() { err = errs.Combine(err, rows.Close()) }()
|
|
|
|
totalBySatellite := map[storj.NodeID]pieces.SatelliteUsage{}
|
|
for rows.Next() {
|
|
var total, contentSize int64
|
|
var satelliteID storj.NodeID
|
|
|
|
err = rows.Scan(&total, &contentSize, &satelliteID)
|
|
if err != nil {
|
|
return nil, ErrPieceSpaceUsed.Wrap(err)
|
|
}
|
|
totalBySatellite[satelliteID] = pieces.SatelliteUsage{
|
|
Total: total,
|
|
ContentSize: contentSize,
|
|
}
|
|
}
|
|
return totalBySatellite, rows.Err()
|
|
}
|
|
|
|
// UpdatePieceTotals updates the record for total spaced used with new total and contentSize values
|
|
func (db *pieceSpaceUsedDB) UpdatePieceTotals(ctx context.Context, newTotal, newContentSize int64) (err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
_, err = db.ExecContext(ctx, `
|
|
UPDATE piece_space_used
|
|
SET total = ?, content_size = ?
|
|
WHERE satellite_id IS NULL
|
|
`, newTotal, newContentSize)
|
|
|
|
return ErrPieceSpaceUsed.Wrap(err)
|
|
}
|
|
|
|
// UpdateTrashTotal updates the record for total spaced used with a new value
|
|
func (db *pieceSpaceUsedDB) UpdateTrashTotal(ctx context.Context, newTotal int64) (err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
_, err = db.ExecContext(ctx, `
|
|
UPDATE piece_space_used
|
|
SET total = ?
|
|
WHERE satellite_id = ?
|
|
`, newTotal, trashTotalRowName)
|
|
|
|
return ErrPieceSpaceUsed.Wrap(err)
|
|
}
|
|
|
|
// UpdatePieceTotalsForAllSatellites updates each record with new values for each satelliteID
|
|
func (db *pieceSpaceUsedDB) UpdatePieceTotalsForAllSatellites(ctx context.Context, newTotalsBySatellites map[storj.NodeID]pieces.SatelliteUsage) (err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
for satelliteID, vals := range newTotalsBySatellites {
|
|
if vals.ContentSize == 0 && vals.Total == 0 {
|
|
if err := db.deleteTotalBySatellite(ctx, satelliteID); err != nil {
|
|
return ErrPieceSpaceUsed.Wrap(err)
|
|
}
|
|
continue
|
|
}
|
|
|
|
_, err = db.ExecContext(ctx, `
|
|
INSERT INTO piece_space_used (total, content_size, satellite_id)
|
|
VALUES (?, ?, ?)
|
|
ON CONFLICT (satellite_id)
|
|
DO UPDATE SET total = ?, content_size = ?
|
|
WHERE satellite_id = ?
|
|
`, vals.Total, vals.ContentSize, satelliteID, vals.Total, vals.ContentSize, satelliteID)
|
|
|
|
if err != nil {
|
|
if err == sql.ErrNoRows {
|
|
continue
|
|
}
|
|
return ErrPieceSpaceUsed.Wrap(err)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (db *pieceSpaceUsedDB) deleteTotalBySatellite(ctx context.Context, satelliteID storj.NodeID) (err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
_, err = db.ExecContext(ctx, `
|
|
DELETE FROM piece_space_used
|
|
WHERE satellite_id = ?
|
|
`, satelliteID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|