2019-09-25 18:12:44 +01:00
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package satellitedb
import (
"bytes"
"context"
2019-10-11 22:18:05 +01:00
"database/sql"
2019-09-25 18:12:44 +01:00
"sort"
"time"
"github.com/lib/pq"
2019-10-11 22:18:05 +01:00
"github.com/zeebo/errs"
2019-09-25 18:12:44 +01:00
"storj.io/storj/pkg/storj"
"storj.io/storj/satellite/gracefulexit"
dbx "storj.io/storj/satellite/satellitedb/dbx"
)
type gracefulexitDB struct {
2019-12-14 02:29:54 +00:00
db * satelliteDB
2019-09-25 18:12:44 +01:00
}
// IncrementProgress increments transfer stats for a node.
func ( db * gracefulexitDB ) IncrementProgress ( ctx context . Context , nodeID storj . NodeID , bytes int64 , successfulTransfers int64 , failedTransfers int64 ) ( err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
statement := db . db . Rebind (
` INSERT INTO graceful_exit_progress ( node_id , bytes_transferred , pieces_transferred , pieces_failed , updated_at ) VALUES ( ? , ? , ? , ? , ? )
ON CONFLICT ( node_id )
DO UPDATE SET bytes_transferred = graceful_exit_progress . bytes_transferred + excluded . bytes_transferred ,
pieces_transferred = graceful_exit_progress . pieces_transferred + excluded . pieces_transferred ,
pieces_failed = graceful_exit_progress . pieces_failed + excluded . pieces_failed ,
updated_at = excluded . updated_at ; ` ,
)
now := time . Now ( ) . UTC ( )
_ , err = db . db . ExecContext ( ctx , statement , nodeID , bytes , successfulTransfers , failedTransfers , now )
if err != nil {
return Error . Wrap ( err )
}
return nil
}
// GetProgress gets a graceful exit progress entry.
func ( db * gracefulexitDB ) GetProgress ( ctx context . Context , nodeID storj . NodeID ) ( _ * gracefulexit . Progress , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
dbxProgress , err := db . db . Get_GracefulExitProgress_By_NodeId ( ctx , dbx . GracefulExitProgress_NodeId ( nodeID . Bytes ( ) ) )
2019-10-23 02:06:01 +01:00
if err == sql . ErrNoRows {
return nil , gracefulexit . ErrNodeNotFound . Wrap ( err )
} else if err != nil {
2019-09-25 18:12:44 +01:00
return nil , Error . Wrap ( err )
}
nID , err := storj . NodeIDFromBytes ( dbxProgress . NodeId )
if err != nil {
return nil , Error . Wrap ( err )
}
progress := & gracefulexit . Progress {
NodeID : nID ,
BytesTransferred : dbxProgress . BytesTransferred ,
PiecesTransferred : dbxProgress . PiecesTransferred ,
PiecesFailed : dbxProgress . PiecesFailed ,
UpdatedAt : dbxProgress . UpdatedAt ,
}
return progress , Error . Wrap ( err )
}
// Enqueue batch inserts graceful exit transfer queue entries it does not exist.
func ( db * gracefulexitDB ) Enqueue ( ctx context . Context , items [ ] gracefulexit . TransferQueueItem ) ( err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
2019-10-18 22:27:57 +01:00
sort . Slice ( items , func ( i , k int ) bool {
compare := bytes . Compare ( items [ i ] . NodeID . Bytes ( ) , items [ k ] . NodeID . Bytes ( ) )
if compare == 0 {
return bytes . Compare ( items [ i ] . Path , items [ k ] . Path ) < 0
2019-09-25 18:12:44 +01:00
}
2019-10-18 22:27:57 +01:00
return compare < 0
} )
var nodeIDs [ ] storj . NodeID
var paths [ ] [ ] byte
var pieceNums [ ] int32
2019-11-07 16:13:05 +00:00
var rootPieceIDs [ ] [ ] byte
2019-10-18 22:27:57 +01:00
var durabilities [ ] float64
for _ , item := range items {
nodeIDs = append ( nodeIDs , item . NodeID )
paths = append ( paths , item . Path )
pieceNums = append ( pieceNums , item . PieceNum )
2019-11-07 16:13:05 +00:00
rootPieceIDs = append ( rootPieceIDs , item . RootPieceID . Bytes ( ) )
2019-10-18 22:27:57 +01:00
durabilities = append ( durabilities , item . DurabilityRatio )
}
2019-09-25 18:12:44 +01:00
2019-10-18 22:27:57 +01:00
_ , err = db . db . ExecContext ( ctx , db . db . Rebind ( `
2019-11-07 16:13:05 +00:00
INSERT INTO graceful_exit_transfer_queue ( node_id , path , piece_num , root_piece_id , durability_ratio , queued_at )
SELECT unnest ( $ 1 : : bytea [ ] ) , unnest ( $ 2 : : bytea [ ] ) , unnest ( $ 3 : : integer [ ] ) , unnest ( $ 4 : : bytea [ ] ) , unnest ( $ 5 : : float8 [ ] ) , $ 6
ON CONFLICT DO NOTHING ; ` ) , postgresNodeIDList ( nodeIDs ) , pq . ByteaArray ( paths ) , pq . Array ( pieceNums ) , pq . ByteaArray ( rootPieceIDs ) , pq . Array ( durabilities ) , time . Now ( ) . UTC ( ) )
2019-09-25 18:12:44 +01:00
2019-10-18 22:27:57 +01:00
return Error . Wrap ( err )
2019-09-25 18:12:44 +01:00
}
// UpdateTransferQueueItem creates a graceful exit transfer queue entry.
func ( db * gracefulexitDB ) UpdateTransferQueueItem ( ctx context . Context , item gracefulexit . TransferQueueItem ) ( err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
update := dbx . GracefulExitTransferQueue_Update_Fields {
DurabilityRatio : dbx . GracefulExitTransferQueue_DurabilityRatio ( item . DurabilityRatio ) ,
2019-10-11 22:18:05 +01:00
LastFailedCode : dbx . GracefulExitTransferQueue_LastFailedCode_Raw ( item . LastFailedCode ) ,
FailedCount : dbx . GracefulExitTransferQueue_FailedCount_Raw ( item . FailedCount ) ,
2019-09-25 18:12:44 +01:00
}
2019-10-11 22:18:05 +01:00
if item . RequestedAt != nil {
update . RequestedAt = dbx . GracefulExitTransferQueue_RequestedAt_Raw ( item . RequestedAt )
2019-09-25 18:12:44 +01:00
}
2019-10-11 22:18:05 +01:00
if item . LastFailedAt != nil {
update . LastFailedAt = dbx . GracefulExitTransferQueue_LastFailedAt_Raw ( item . LastFailedAt )
2019-09-25 18:12:44 +01:00
}
2019-10-11 22:18:05 +01:00
if item . FinishedAt != nil {
update . FinishedAt = dbx . GracefulExitTransferQueue_FinishedAt_Raw ( item . FinishedAt )
2019-09-25 18:12:44 +01:00
}
2019-10-28 15:08:33 +00:00
return db . db . UpdateNoReturn_GracefulExitTransferQueue_By_NodeId_And_Path_And_PieceNum ( ctx ,
2019-09-25 18:12:44 +01:00
dbx . GracefulExitTransferQueue_NodeId ( item . NodeID . Bytes ( ) ) ,
dbx . GracefulExitTransferQueue_Path ( item . Path ) ,
2019-10-28 15:08:33 +00:00
dbx . GracefulExitTransferQueue_PieceNum ( int ( item . PieceNum ) ) ,
2019-09-25 18:12:44 +01:00
update ,
)
}
// DeleteTransferQueueItem deletes a graceful exit transfer queue entry.
2019-10-28 15:08:33 +00:00
func ( db * gracefulexitDB ) DeleteTransferQueueItem ( ctx context . Context , nodeID storj . NodeID , path [ ] byte , pieceNum int32 ) ( err error ) {
2019-09-25 18:12:44 +01:00
defer mon . Task ( ) ( & ctx ) ( & err )
2019-10-28 15:08:33 +00:00
_ , err = db . db . Delete_GracefulExitTransferQueue_By_NodeId_And_Path_And_PieceNum ( ctx , dbx . GracefulExitTransferQueue_NodeId ( nodeID . Bytes ( ) ) , dbx . GracefulExitTransferQueue_Path ( path ) ,
dbx . GracefulExitTransferQueue_PieceNum ( int ( pieceNum ) ) )
2019-09-25 18:12:44 +01:00
return Error . Wrap ( err )
}
// DeleteTransferQueueItem deletes a graceful exit transfer queue entries by nodeID.
func ( db * gracefulexitDB ) DeleteTransferQueueItems ( ctx context . Context , nodeID storj . NodeID ) ( err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
_ , err = db . db . Delete_GracefulExitTransferQueue_By_NodeId ( ctx , dbx . GracefulExitTransferQueue_NodeId ( nodeID . Bytes ( ) ) )
return Error . Wrap ( err )
}
// DeleteFinishedTransferQueueItem deletes finiahed graceful exit transfer queue entries by nodeID.
func ( db * gracefulexitDB ) DeleteFinishedTransferQueueItems ( ctx context . Context , nodeID storj . NodeID ) ( err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
_ , err = db . db . Delete_GracefulExitTransferQueue_By_NodeId_And_FinishedAt_IsNot_Null ( ctx , dbx . GracefulExitTransferQueue_NodeId ( nodeID . Bytes ( ) ) )
return Error . Wrap ( err )
}
// GetTransferQueueItem gets a graceful exit transfer queue entry.
2019-10-28 15:08:33 +00:00
func ( db * gracefulexitDB ) GetTransferQueueItem ( ctx context . Context , nodeID storj . NodeID , path [ ] byte , pieceNum int32 ) ( _ * gracefulexit . TransferQueueItem , err error ) {
2019-09-25 18:12:44 +01:00
defer mon . Task ( ) ( & ctx ) ( & err )
2019-10-28 15:08:33 +00:00
dbxTransferQueue , err := db . db . Get_GracefulExitTransferQueue_By_NodeId_And_Path_And_PieceNum ( ctx ,
2019-09-25 18:12:44 +01:00
dbx . GracefulExitTransferQueue_NodeId ( nodeID . Bytes ( ) ) ,
2019-10-28 15:08:33 +00:00
dbx . GracefulExitTransferQueue_Path ( path ) ,
dbx . GracefulExitTransferQueue_PieceNum ( int ( pieceNum ) ) )
2019-09-25 18:12:44 +01:00
if err != nil {
return nil , Error . Wrap ( err )
}
transferQueueItem , err := dbxToTransferQueueItem ( dbxTransferQueue )
if err != nil {
return nil , Error . Wrap ( err )
}
return transferQueueItem , Error . Wrap ( err )
}
2019-10-11 22:18:05 +01:00
// GetIncomplete gets incomplete graceful exit transfer queue entries ordered by durability ratio and queued date ascending.
2019-09-25 18:12:44 +01:00
func ( db * gracefulexitDB ) GetIncomplete ( ctx context . Context , nodeID storj . NodeID , limit int , offset int64 ) ( _ [ ] * gracefulexit . TransferQueueItem , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
2019-11-13 14:54:50 +00:00
sql := ` SELECT node_id , path , piece_num , root_piece_id , durability_ratio , queued_at , requested_at , last_failed_at , last_failed_code , failed_count , finished_at , order_limit_send_count
2019-10-11 22:18:05 +01:00
FROM graceful_exit_transfer_queue
WHERE node_id = ?
AND finished_at is NULL
ORDER BY durability_ratio asc , queued_at asc LIMIT ? OFFSET ? `
rows , err := db . db . Query ( db . db . Rebind ( sql ) , nodeID . Bytes ( ) , limit , offset )
2019-09-25 18:12:44 +01:00
if err != nil {
return nil , Error . Wrap ( err )
}
2019-10-11 22:18:05 +01:00
defer func ( ) {
err = errs . Combine ( err , rows . Close ( ) )
} ( )
transferQueueItemRows , err := scanRows ( rows )
if err != nil {
return nil , Error . Wrap ( err )
}
return transferQueueItemRows , nil
}
// GetIncompleteNotFailed gets incomplete graceful exit transfer queue entries that haven't failed, ordered by durability ratio and queued date ascending.
func ( db * gracefulexitDB ) GetIncompleteNotFailed ( ctx context . Context , nodeID storj . NodeID , limit int , offset int64 ) ( _ [ ] * gracefulexit . TransferQueueItem , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
2019-11-13 14:54:50 +00:00
sql := ` SELECT node_id , path , piece_num , root_piece_id , durability_ratio , queued_at , requested_at , last_failed_at , last_failed_code , failed_count , finished_at , order_limit_send_count
2019-10-11 22:18:05 +01:00
FROM graceful_exit_transfer_queue
WHERE node_id = ?
AND finished_at is NULL
AND last_failed_at is NULL
ORDER BY durability_ratio asc , queued_at asc LIMIT ? OFFSET ? `
rows , err := db . db . Query ( db . db . Rebind ( sql ) , nodeID . Bytes ( ) , limit , offset )
if err != nil {
return nil , Error . Wrap ( err )
}
defer func ( ) {
err = errs . Combine ( err , rows . Close ( ) )
} ( )
transferQueueItemRows , err := scanRows ( rows )
if err != nil {
return nil , Error . Wrap ( err )
}
return transferQueueItemRows , nil
}
// GetIncompleteNotFailed gets incomplete graceful exit transfer queue entries that have failed <= maxFailures times, ordered by durability ratio and queued date ascending.
func ( db * gracefulexitDB ) GetIncompleteFailed ( ctx context . Context , nodeID storj . NodeID , maxFailures int , limit int , offset int64 ) ( _ [ ] * gracefulexit . TransferQueueItem , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
2019-11-13 14:54:50 +00:00
sql := ` SELECT node_id , path , piece_num , root_piece_id , durability_ratio , queued_at , requested_at , last_failed_at , last_failed_code , failed_count , finished_at , order_limit_send_count
2019-10-11 22:18:05 +01:00
FROM graceful_exit_transfer_queue
WHERE node_id = ?
AND finished_at is NULL
AND last_failed_at is not NULL
2019-10-24 17:24:42 +01:00
AND failed_count < ?
2019-10-11 22:18:05 +01:00
ORDER BY durability_ratio asc , queued_at asc LIMIT ? OFFSET ? `
rows , err := db . db . Query ( db . db . Rebind ( sql ) , nodeID . Bytes ( ) , maxFailures , limit , offset )
if err != nil {
return nil , Error . Wrap ( err )
}
defer func ( ) {
err = errs . Combine ( err , rows . Close ( ) )
} ( )
transferQueueItemRows , err := scanRows ( rows )
if err != nil {
return nil , Error . Wrap ( err )
}
return transferQueueItemRows , nil
}
2019-11-13 14:54:50 +00:00
// IncrementOrderLimitSendCount increments the number of times a node has been sent an order limit for transferring.
func ( db * gracefulexitDB ) IncrementOrderLimitSendCount ( ctx context . Context , nodeID storj . NodeID , path [ ] byte , pieceNum int32 ) ( err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
sql := db . db . Rebind (
` UPDATE graceful_exit_transfer_queue SET order_limit_send_count = graceful_exit_transfer_queue . order_limit_send_count + 1
WHERE node_id = ?
AND path = ?
AND piece_num = ? ` ,
)
_ , err = db . db . ExecContext ( ctx , sql , nodeID , path , pieceNum )
if err != nil {
return Error . Wrap ( err )
}
return nil
}
2019-10-11 22:18:05 +01:00
func scanRows ( rows * sql . Rows ) ( transferQueueItemRows [ ] * gracefulexit . TransferQueueItem , err error ) {
for rows . Next ( ) {
transferQueueItem := & gracefulexit . TransferQueueItem { }
2019-11-07 16:13:05 +00:00
var pieceIDBytes [ ] byte
err = rows . Scan ( & transferQueueItem . NodeID , & transferQueueItem . Path , & transferQueueItem . PieceNum , & pieceIDBytes ,
& transferQueueItem . DurabilityRatio , & transferQueueItem . QueuedAt , & transferQueueItem . RequestedAt , & transferQueueItem . LastFailedAt ,
2019-11-13 14:54:50 +00:00
& transferQueueItem . LastFailedCode , & transferQueueItem . FailedCount , & transferQueueItem . FinishedAt , & transferQueueItem . OrderLimitSendCount )
2019-09-25 18:12:44 +01:00
if err != nil {
return nil , Error . Wrap ( err )
}
2019-11-07 16:13:05 +00:00
if pieceIDBytes != nil {
transferQueueItem . RootPieceID , err = storj . PieceIDFromBytes ( pieceIDBytes )
if err != nil {
return nil , Error . Wrap ( err )
}
}
2019-09-25 18:12:44 +01:00
2019-10-11 22:18:05 +01:00
transferQueueItemRows = append ( transferQueueItemRows , transferQueueItem )
}
2019-09-25 18:12:44 +01:00
return transferQueueItemRows , nil
}
func dbxToTransferQueueItem ( dbxTransferQueue * dbx . GracefulExitTransferQueue ) ( item * gracefulexit . TransferQueueItem , err error ) {
nID , err := storj . NodeIDFromBytes ( dbxTransferQueue . NodeId )
if err != nil {
return nil , Error . Wrap ( err )
}
item = & gracefulexit . TransferQueueItem {
2019-11-13 14:54:50 +00:00
NodeID : nID ,
Path : dbxTransferQueue . Path ,
PieceNum : int32 ( dbxTransferQueue . PieceNum ) ,
DurabilityRatio : dbxTransferQueue . DurabilityRatio ,
QueuedAt : dbxTransferQueue . QueuedAt ,
OrderLimitSendCount : dbxTransferQueue . OrderLimitSendCount ,
2019-09-25 18:12:44 +01:00
}
2019-11-07 16:13:05 +00:00
if dbxTransferQueue . RootPieceId != nil {
item . RootPieceID , err = storj . PieceIDFromBytes ( dbxTransferQueue . RootPieceId )
if err != nil {
return nil , err
}
}
2019-09-25 18:12:44 +01:00
if dbxTransferQueue . LastFailedCode != nil {
2019-10-11 22:18:05 +01:00
item . LastFailedCode = dbxTransferQueue . LastFailedCode
2019-09-25 18:12:44 +01:00
}
if dbxTransferQueue . FailedCount != nil {
2019-10-11 22:18:05 +01:00
item . FailedCount = dbxTransferQueue . FailedCount
2019-09-25 18:12:44 +01:00
}
if dbxTransferQueue . RequestedAt != nil && ! dbxTransferQueue . RequestedAt . IsZero ( ) {
2019-10-11 22:18:05 +01:00
item . RequestedAt = dbxTransferQueue . RequestedAt
2019-09-25 18:12:44 +01:00
}
if dbxTransferQueue . LastFailedAt != nil && ! dbxTransferQueue . LastFailedAt . IsZero ( ) {
2019-10-11 22:18:05 +01:00
item . LastFailedAt = dbxTransferQueue . LastFailedAt
2019-09-25 18:12:44 +01:00
}
if dbxTransferQueue . FinishedAt != nil && ! dbxTransferQueue . FinishedAt . IsZero ( ) {
2019-10-11 22:18:05 +01:00
item . FinishedAt = dbxTransferQueue . FinishedAt
2019-09-25 18:12:44 +01:00
}
return item , nil
}