235 lines
8.7 KiB
Go
235 lines
8.7 KiB
Go
|
// Copyright (C) 2019 Storj Labs, Inc.
|
||
|
// See LICENSE for copying information.
|
||
|
|
||
|
package satellitedb
|
||
|
|
||
|
import (
|
||
|
"bytes"
|
||
|
"context"
|
||
|
"sort"
|
||
|
"time"
|
||
|
|
||
|
"github.com/lib/pq"
|
||
|
sqlite3 "github.com/mattn/go-sqlite3"
|
||
|
|
||
|
"storj.io/storj/pkg/storj"
|
||
|
"storj.io/storj/satellite/gracefulexit"
|
||
|
dbx "storj.io/storj/satellite/satellitedb/dbx"
|
||
|
)
|
||
|
|
||
|
type gracefulexitDB struct {
|
||
|
db *dbx.DB
|
||
|
}
|
||
|
|
||
|
// IncrementProgress increments transfer stats for a node.
|
||
|
func (db *gracefulexitDB) IncrementProgress(ctx context.Context, nodeID storj.NodeID, bytes int64, successfulTransfers int64, failedTransfers int64) (err error) {
|
||
|
defer mon.Task()(&ctx)(&err)
|
||
|
|
||
|
statement := db.db.Rebind(
|
||
|
`INSERT INTO graceful_exit_progress (node_id, bytes_transferred, pieces_transferred, pieces_failed, updated_at) VALUES (?, ?, ?, ?, ?)
|
||
|
ON CONFLICT(node_id)
|
||
|
DO UPDATE SET bytes_transferred = graceful_exit_progress.bytes_transferred + excluded.bytes_transferred,
|
||
|
pieces_transferred = graceful_exit_progress.pieces_transferred + excluded.pieces_transferred,
|
||
|
pieces_failed = graceful_exit_progress.pieces_failed + excluded.pieces_failed,
|
||
|
updated_at = excluded.updated_at;`,
|
||
|
)
|
||
|
now := time.Now().UTC()
|
||
|
_, err = db.db.ExecContext(ctx, statement, nodeID, bytes, successfulTransfers, failedTransfers, now)
|
||
|
if err != nil {
|
||
|
return Error.Wrap(err)
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// GetProgress gets a graceful exit progress entry.
|
||
|
func (db *gracefulexitDB) GetProgress(ctx context.Context, nodeID storj.NodeID) (_ *gracefulexit.Progress, err error) {
|
||
|
defer mon.Task()(&ctx)(&err)
|
||
|
dbxProgress, err := db.db.Get_GracefulExitProgress_By_NodeId(ctx, dbx.GracefulExitProgress_NodeId(nodeID.Bytes()))
|
||
|
if err != nil {
|
||
|
return nil, Error.Wrap(err)
|
||
|
}
|
||
|
nID, err := storj.NodeIDFromBytes(dbxProgress.NodeId)
|
||
|
if err != nil {
|
||
|
return nil, Error.Wrap(err)
|
||
|
}
|
||
|
|
||
|
progress := &gracefulexit.Progress{
|
||
|
NodeID: nID,
|
||
|
BytesTransferred: dbxProgress.BytesTransferred,
|
||
|
PiecesTransferred: dbxProgress.PiecesTransferred,
|
||
|
PiecesFailed: dbxProgress.PiecesFailed,
|
||
|
UpdatedAt: dbxProgress.UpdatedAt,
|
||
|
}
|
||
|
|
||
|
return progress, Error.Wrap(err)
|
||
|
}
|
||
|
|
||
|
// Enqueue batch inserts graceful exit transfer queue entries it does not exist.
|
||
|
func (db *gracefulexitDB) Enqueue(ctx context.Context, items []gracefulexit.TransferQueueItem) (err error) {
|
||
|
defer mon.Task()(&ctx)(&err)
|
||
|
|
||
|
switch t := db.db.Driver().(type) {
|
||
|
case *sqlite3.SQLiteDriver:
|
||
|
statement := db.db.Rebind(
|
||
|
`INSERT INTO graceful_exit_transfer_queue(node_id, path, piece_num, durability_ratio, queued_at)
|
||
|
VALUES (?, ?, ?, ?, ?) ON CONFLICT DO NOTHING;`,
|
||
|
)
|
||
|
for _, item := range items {
|
||
|
_, err = db.db.ExecContext(ctx, statement,
|
||
|
item.NodeID.Bytes(), item.Path, item.PieceNum, item.DurabilityRatio, time.Now().UTC())
|
||
|
if err != nil {
|
||
|
return Error.Wrap(err)
|
||
|
}
|
||
|
}
|
||
|
case *pq.Driver:
|
||
|
sort.Slice(items, func(i, k int) bool {
|
||
|
compare := bytes.Compare(items[i].NodeID.Bytes(), items[k].NodeID.Bytes())
|
||
|
if compare == 0 {
|
||
|
return bytes.Compare(items[i].Path, items[k].Path) < 0
|
||
|
}
|
||
|
return compare < 0
|
||
|
})
|
||
|
|
||
|
var nodeIDs []storj.NodeID
|
||
|
var paths [][]byte
|
||
|
var pieceNums []int32
|
||
|
var durabilities []float64
|
||
|
for _, item := range items {
|
||
|
nodeIDs = append(nodeIDs, item.NodeID)
|
||
|
paths = append(paths, item.Path)
|
||
|
pieceNums = append(pieceNums, item.PieceNum)
|
||
|
durabilities = append(durabilities, item.DurabilityRatio)
|
||
|
}
|
||
|
|
||
|
_, err := db.db.ExecContext(ctx, `
|
||
|
INSERT INTO graceful_exit_transfer_queue(node_id, path, piece_num, durability_ratio, queued_at)
|
||
|
SELECT unnest($1::bytea[]), unnest($2::bytea[]), unnest($3::integer[]), unnest($4::float8[]), $5
|
||
|
ON CONFLICT DO NOTHING;`, postgresNodeIDList(nodeIDs), pq.ByteaArray(paths), pq.Array(pieceNums), pq.Array(durabilities), time.Now().UTC())
|
||
|
if err != nil {
|
||
|
return Error.Wrap(err)
|
||
|
}
|
||
|
default:
|
||
|
return Error.New("Unsupported database %t", t)
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// UpdateTransferQueueItem creates a graceful exit transfer queue entry.
|
||
|
func (db *gracefulexitDB) UpdateTransferQueueItem(ctx context.Context, item gracefulexit.TransferQueueItem) (err error) {
|
||
|
defer mon.Task()(&ctx)(&err)
|
||
|
update := dbx.GracefulExitTransferQueue_Update_Fields{
|
||
|
DurabilityRatio: dbx.GracefulExitTransferQueue_DurabilityRatio(item.DurabilityRatio),
|
||
|
LastFailedCode: dbx.GracefulExitTransferQueue_LastFailedCode_Raw(&item.LastFailedCode),
|
||
|
FailedCount: dbx.GracefulExitTransferQueue_FailedCount_Raw(&item.FailedCount),
|
||
|
}
|
||
|
|
||
|
if !item.RequestedAt.IsZero() {
|
||
|
update.RequestedAt = dbx.GracefulExitTransferQueue_RequestedAt_Raw(&item.RequestedAt)
|
||
|
}
|
||
|
if !item.LastFailedAt.IsZero() {
|
||
|
update.LastFailedAt = dbx.GracefulExitTransferQueue_LastFailedAt_Raw(&item.LastFailedAt)
|
||
|
}
|
||
|
if !item.FinishedAt.IsZero() {
|
||
|
update.FinishedAt = dbx.GracefulExitTransferQueue_FinishedAt_Raw(&item.FinishedAt)
|
||
|
}
|
||
|
|
||
|
return db.db.UpdateNoReturn_GracefulExitTransferQueue_By_NodeId_And_Path(ctx,
|
||
|
dbx.GracefulExitTransferQueue_NodeId(item.NodeID.Bytes()),
|
||
|
dbx.GracefulExitTransferQueue_Path(item.Path),
|
||
|
update,
|
||
|
)
|
||
|
}
|
||
|
|
||
|
// DeleteTransferQueueItem deletes a graceful exit transfer queue entry.
|
||
|
func (db *gracefulexitDB) DeleteTransferQueueItem(ctx context.Context, nodeID storj.NodeID, path []byte) (err error) {
|
||
|
defer mon.Task()(&ctx)(&err)
|
||
|
_, err = db.db.Delete_GracefulExitTransferQueue_By_NodeId_And_Path(ctx, dbx.GracefulExitTransferQueue_NodeId(nodeID.Bytes()), dbx.GracefulExitTransferQueue_Path(path))
|
||
|
return Error.Wrap(err)
|
||
|
}
|
||
|
|
||
|
// DeleteTransferQueueItem deletes a graceful exit transfer queue entries by nodeID.
|
||
|
func (db *gracefulexitDB) DeleteTransferQueueItems(ctx context.Context, nodeID storj.NodeID) (err error) {
|
||
|
defer mon.Task()(&ctx)(&err)
|
||
|
_, err = db.db.Delete_GracefulExitTransferQueue_By_NodeId(ctx, dbx.GracefulExitTransferQueue_NodeId(nodeID.Bytes()))
|
||
|
return Error.Wrap(err)
|
||
|
}
|
||
|
|
||
|
// DeleteFinishedTransferQueueItem deletes finiahed graceful exit transfer queue entries by nodeID.
|
||
|
func (db *gracefulexitDB) DeleteFinishedTransferQueueItems(ctx context.Context, nodeID storj.NodeID) (err error) {
|
||
|
defer mon.Task()(&ctx)(&err)
|
||
|
_, err = db.db.Delete_GracefulExitTransferQueue_By_NodeId_And_FinishedAt_IsNot_Null(ctx, dbx.GracefulExitTransferQueue_NodeId(nodeID.Bytes()))
|
||
|
return Error.Wrap(err)
|
||
|
}
|
||
|
|
||
|
// GetTransferQueueItem gets a graceful exit transfer queue entry.
|
||
|
func (db *gracefulexitDB) GetTransferQueueItem(ctx context.Context, nodeID storj.NodeID, path []byte) (_ *gracefulexit.TransferQueueItem, err error) {
|
||
|
defer mon.Task()(&ctx)(&err)
|
||
|
dbxTransferQueue, err := db.db.Get_GracefulExitTransferQueue_By_NodeId_And_Path(ctx,
|
||
|
dbx.GracefulExitTransferQueue_NodeId(nodeID.Bytes()),
|
||
|
dbx.GracefulExitTransferQueue_Path(path))
|
||
|
if err != nil {
|
||
|
return nil, Error.Wrap(err)
|
||
|
}
|
||
|
|
||
|
transferQueueItem, err := dbxToTransferQueueItem(dbxTransferQueue)
|
||
|
if err != nil {
|
||
|
return nil, Error.Wrap(err)
|
||
|
}
|
||
|
|
||
|
return transferQueueItem, Error.Wrap(err)
|
||
|
}
|
||
|
|
||
|
// GetIncomplete gets incomplete graceful exit transfer queue entries in the database ordered by the queued date ascending.
|
||
|
func (db *gracefulexitDB) GetIncomplete(ctx context.Context, nodeID storj.NodeID, limit int, offset int64) (_ []*gracefulexit.TransferQueueItem, err error) {
|
||
|
defer mon.Task()(&ctx)(&err)
|
||
|
dbxTransferQueueItemRows, err := db.db.Limited_GracefulExitTransferQueue_By_NodeId_And_FinishedAt_Is_Null_OrderBy_Asc_QueuedAt(ctx, dbx.GracefulExitTransferQueue_NodeId(nodeID.Bytes()), limit, offset)
|
||
|
if err != nil {
|
||
|
return nil, Error.Wrap(err)
|
||
|
}
|
||
|
|
||
|
var transferQueueItemRows = make([]*gracefulexit.TransferQueueItem, len(dbxTransferQueueItemRows))
|
||
|
for i, dbxTransferQueue := range dbxTransferQueueItemRows {
|
||
|
transferQueueItem, err := dbxToTransferQueueItem(dbxTransferQueue)
|
||
|
if err != nil {
|
||
|
return nil, Error.Wrap(err)
|
||
|
}
|
||
|
transferQueueItemRows[i] = transferQueueItem
|
||
|
}
|
||
|
|
||
|
return transferQueueItemRows, nil
|
||
|
}
|
||
|
|
||
|
func dbxToTransferQueueItem(dbxTransferQueue *dbx.GracefulExitTransferQueue) (item *gracefulexit.TransferQueueItem, err error) {
|
||
|
nID, err := storj.NodeIDFromBytes(dbxTransferQueue.NodeId)
|
||
|
if err != nil {
|
||
|
return nil, Error.Wrap(err)
|
||
|
}
|
||
|
|
||
|
item = &gracefulexit.TransferQueueItem{
|
||
|
NodeID: nID,
|
||
|
Path: dbxTransferQueue.Path,
|
||
|
PieceNum: int32(dbxTransferQueue.PieceNum),
|
||
|
DurabilityRatio: dbxTransferQueue.DurabilityRatio,
|
||
|
QueuedAt: dbxTransferQueue.QueuedAt,
|
||
|
}
|
||
|
if dbxTransferQueue.LastFailedCode != nil {
|
||
|
item.LastFailedCode = *dbxTransferQueue.LastFailedCode
|
||
|
}
|
||
|
if dbxTransferQueue.FailedCount != nil {
|
||
|
item.FailedCount = *dbxTransferQueue.FailedCount
|
||
|
}
|
||
|
if dbxTransferQueue.RequestedAt != nil && !dbxTransferQueue.RequestedAt.IsZero() {
|
||
|
item.RequestedAt = *dbxTransferQueue.RequestedAt
|
||
|
}
|
||
|
if dbxTransferQueue.LastFailedAt != nil && !dbxTransferQueue.LastFailedAt.IsZero() {
|
||
|
item.LastFailedAt = *dbxTransferQueue.LastFailedAt
|
||
|
}
|
||
|
if dbxTransferQueue.FinishedAt != nil && !dbxTransferQueue.FinishedAt.IsZero() {
|
||
|
item.FinishedAt = *dbxTransferQueue.FinishedAt
|
||
|
}
|
||
|
|
||
|
return item, nil
|
||
|
}
|