f654c3ae4c
We don't need column uses_segment_transfer_queue in graceful_exit_progress as now all exiting nodes are using graceful_exit_segment_transfer_queue and table graceful_exit_transfer_queue has been dropped. Change-Id: I4b7c087433f04138cf09bcf8ad3d8de2c185502a
611 lines
20 KiB
Go
611 lines
20 KiB
Go
// Copyright (C) 2019 Storj Labs, Inc.
|
|
// See LICENSE for copying information.
|
|
|
|
package satellitedb
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"database/sql"
|
|
"errors"
|
|
"sort"
|
|
"time"
|
|
|
|
"github.com/zeebo/errs"
|
|
|
|
"storj.io/common/storj"
|
|
"storj.io/common/uuid"
|
|
"storj.io/private/dbutil"
|
|
"storj.io/private/dbutil/pgutil"
|
|
"storj.io/private/tagsql"
|
|
"storj.io/storj/satellite/gracefulexit"
|
|
"storj.io/storj/satellite/metabase"
|
|
"storj.io/storj/satellite/satellitedb/dbx"
|
|
)
|
|
|
|
type gracefulexitDB struct {
|
|
db *satelliteDB
|
|
}
|
|
|
|
const (
|
|
deleteExitProgressBatchSize = 1000
|
|
)
|
|
|
|
// IncrementProgress increments transfer stats for a node.
|
|
func (db *gracefulexitDB) IncrementProgress(ctx context.Context, nodeID storj.NodeID, bytes int64, successfulTransfers int64, failedTransfers int64) (err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
statement := db.db.Rebind(
|
|
`INSERT INTO graceful_exit_progress (node_id, bytes_transferred, pieces_transferred, pieces_failed, updated_at) VALUES (?, ?, ?, ?, ?)
|
|
ON CONFLICT(node_id)
|
|
DO UPDATE SET bytes_transferred = graceful_exit_progress.bytes_transferred + excluded.bytes_transferred,
|
|
pieces_transferred = graceful_exit_progress.pieces_transferred + excluded.pieces_transferred,
|
|
pieces_failed = graceful_exit_progress.pieces_failed + excluded.pieces_failed,
|
|
updated_at = excluded.updated_at;`,
|
|
)
|
|
now := time.Now().UTC()
|
|
_, err = db.db.ExecContext(ctx, statement, nodeID, bytes, successfulTransfers, failedTransfers, now)
|
|
if err != nil {
|
|
return Error.Wrap(err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// GetProgress gets a graceful exit progress entry.
|
|
func (db *gracefulexitDB) GetProgress(ctx context.Context, nodeID storj.NodeID) (_ *gracefulexit.Progress, err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
dbxProgress, err := db.db.Get_GracefulExitProgress_By_NodeId(ctx, dbx.GracefulExitProgress_NodeId(nodeID.Bytes()))
|
|
if errors.Is(err, sql.ErrNoRows) {
|
|
return nil, gracefulexit.ErrNodeNotFound.Wrap(err)
|
|
} else if err != nil {
|
|
return nil, Error.Wrap(err)
|
|
}
|
|
nID, err := storj.NodeIDFromBytes(dbxProgress.NodeId)
|
|
if err != nil {
|
|
return nil, Error.Wrap(err)
|
|
}
|
|
|
|
progress := &gracefulexit.Progress{
|
|
NodeID: nID,
|
|
BytesTransferred: dbxProgress.BytesTransferred,
|
|
PiecesTransferred: dbxProgress.PiecesTransferred,
|
|
PiecesFailed: dbxProgress.PiecesFailed,
|
|
UpdatedAt: dbxProgress.UpdatedAt,
|
|
}
|
|
|
|
return progress, Error.Wrap(err)
|
|
}
|
|
|
|
// Enqueue batch inserts graceful exit transfer queue entries if it does not exist.
|
|
func (db *gracefulexitDB) Enqueue(ctx context.Context, items []gracefulexit.TransferQueueItem, batchSize int) (err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
sort.Slice(items, func(i, k int) bool {
|
|
compare := bytes.Compare(items[i].NodeID.Bytes(), items[k].NodeID.Bytes())
|
|
if compare == 0 {
|
|
compare = bytes.Compare(items[i].StreamID[:], items[k].StreamID[:])
|
|
if compare == 0 {
|
|
return items[i].Position.Encode() < items[k].Position.Encode()
|
|
}
|
|
return compare < 0
|
|
}
|
|
return compare < 0
|
|
})
|
|
|
|
for i := 0; i < len(items); i += batchSize {
|
|
lowerBound := i
|
|
upperBound := lowerBound + batchSize
|
|
|
|
if upperBound > len(items) {
|
|
upperBound = len(items)
|
|
}
|
|
|
|
var nodeIDs []storj.NodeID
|
|
var streamIds [][]byte
|
|
var positions []int64
|
|
var pieceNums []int32
|
|
var rootPieceIDs [][]byte
|
|
var durabilities []float64
|
|
|
|
for _, item := range items[lowerBound:upperBound] {
|
|
item := item
|
|
nodeIDs = append(nodeIDs, item.NodeID)
|
|
streamIds = append(streamIds, item.StreamID[:])
|
|
positions = append(positions, int64(item.Position.Encode()))
|
|
pieceNums = append(pieceNums, item.PieceNum)
|
|
rootPieceIDs = append(rootPieceIDs, item.RootPieceID.Bytes())
|
|
durabilities = append(durabilities, item.DurabilityRatio)
|
|
}
|
|
|
|
_, err = db.db.ExecContext(ctx, db.db.Rebind(`
|
|
INSERT INTO graceful_exit_segment_transfer_queue (
|
|
node_id, stream_id, position, piece_num,
|
|
root_piece_id, durability_ratio, queued_at
|
|
) SELECT
|
|
unnest($1::bytea[]), unnest($2::bytea[]), unnest($3::int8[]),
|
|
unnest($4::int4[]), unnest($5::bytea[]), unnest($6::float8[]),
|
|
$7
|
|
ON CONFLICT DO NOTHING;`), pgutil.NodeIDArray(nodeIDs), pgutil.ByteaArray(streamIds), pgutil.Int8Array(positions),
|
|
pgutil.Int4Array(pieceNums), pgutil.ByteaArray(rootPieceIDs), pgutil.Float8Array(durabilities),
|
|
time.Now().UTC())
|
|
|
|
if err != nil {
|
|
return Error.Wrap(err)
|
|
}
|
|
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// UpdateTransferQueueItem creates a graceful exit transfer queue entry.
|
|
func (db *gracefulexitDB) UpdateTransferQueueItem(ctx context.Context, item gracefulexit.TransferQueueItem) (err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
update := dbx.GracefulExitSegmentTransfer_Update_Fields{
|
|
DurabilityRatio: dbx.GracefulExitSegmentTransfer_DurabilityRatio(item.DurabilityRatio),
|
|
LastFailedCode: dbx.GracefulExitSegmentTransfer_LastFailedCode_Raw(item.LastFailedCode),
|
|
FailedCount: dbx.GracefulExitSegmentTransfer_FailedCount_Raw(item.FailedCount),
|
|
}
|
|
|
|
if item.RequestedAt != nil {
|
|
update.RequestedAt = dbx.GracefulExitSegmentTransfer_RequestedAt_Raw(item.RequestedAt)
|
|
}
|
|
if item.LastFailedAt != nil {
|
|
update.LastFailedAt = dbx.GracefulExitSegmentTransfer_LastFailedAt_Raw(item.LastFailedAt)
|
|
}
|
|
if item.FinishedAt != nil {
|
|
update.FinishedAt = dbx.GracefulExitSegmentTransfer_FinishedAt_Raw(item.FinishedAt)
|
|
}
|
|
|
|
return db.db.UpdateNoReturn_GracefulExitSegmentTransfer_By_NodeId_And_StreamId_And_Position_And_PieceNum(ctx,
|
|
dbx.GracefulExitSegmentTransfer_NodeId(item.NodeID.Bytes()),
|
|
dbx.GracefulExitSegmentTransfer_StreamId(item.StreamID[:]),
|
|
dbx.GracefulExitSegmentTransfer_Position(item.Position.Encode()),
|
|
dbx.GracefulExitSegmentTransfer_PieceNum(int(item.PieceNum)),
|
|
update,
|
|
)
|
|
}
|
|
|
|
// DeleteTransferQueueItem deletes a graceful exit transfer queue entry.
|
|
func (db *gracefulexitDB) DeleteTransferQueueItem(ctx context.Context, nodeID storj.NodeID, streamID uuid.UUID, position metabase.SegmentPosition, pieceNum int32) (err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
_, err = db.db.Delete_GracefulExitSegmentTransfer_By_NodeId_And_StreamId_And_Position_And_PieceNum(ctx,
|
|
dbx.GracefulExitSegmentTransfer_NodeId(nodeID.Bytes()),
|
|
dbx.GracefulExitSegmentTransfer_StreamId(streamID[:]),
|
|
dbx.GracefulExitSegmentTransfer_Position(position.Encode()), dbx.GracefulExitSegmentTransfer_PieceNum(int(pieceNum)))
|
|
|
|
return Error.Wrap(err)
|
|
}
|
|
|
|
// DeleteTransferQueueItem deletes a graceful exit transfer queue entries by nodeID.
|
|
func (db *gracefulexitDB) DeleteTransferQueueItems(ctx context.Context, nodeID storj.NodeID) (err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
_, err = db.db.Delete_GracefulExitSegmentTransfer_By_NodeId(ctx, dbx.GracefulExitSegmentTransfer_NodeId(nodeID.Bytes()))
|
|
return Error.Wrap(err)
|
|
|
|
}
|
|
|
|
// DeleteFinishedTransferQueueItem deletes finished graceful exit transfer queue entries by nodeID.
|
|
func (db *gracefulexitDB) DeleteFinishedTransferQueueItems(ctx context.Context, nodeID storj.NodeID) (err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
_, err = db.db.Delete_GracefulExitSegmentTransfer_By_NodeId_And_FinishedAt_IsNot_Null(ctx, dbx.GracefulExitSegmentTransfer_NodeId(nodeID.Bytes()))
|
|
return Error.Wrap(err)
|
|
}
|
|
|
|
// DeleteAllFinishedTransferQueueItems deletes all graceful exit transfer
|
|
// queue items whose nodes have finished the exit before the indicated time
|
|
// returning the total number of deleted items.
|
|
func (db *gracefulexitDB) DeleteAllFinishedTransferQueueItems(
|
|
ctx context.Context, before time.Time, asOfSystemInterval time.Duration, batchSize int) (_ int64, err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
switch db.db.impl {
|
|
case dbutil.Postgres:
|
|
statement := `
|
|
DELETE FROM graceful_exit_segment_transfer_queue
|
|
WHERE node_id IN (
|
|
SELECT node_id FROM graceful_exit_segment_transfer_queue INNER JOIN nodes
|
|
ON graceful_exit_segment_transfer_queue.node_id = nodes.id
|
|
WHERE nodes.exit_finished_at IS NOT NULL
|
|
AND nodes.exit_finished_at < $1
|
|
)`
|
|
res, err := db.db.ExecContext(ctx, statement, before)
|
|
if err != nil {
|
|
return 0, Error.Wrap(err)
|
|
}
|
|
|
|
count, err := res.RowsAffected()
|
|
if err != nil {
|
|
return 0, Error.Wrap(err)
|
|
}
|
|
|
|
return count, nil
|
|
|
|
case dbutil.Cockroach:
|
|
nodesQuery := `
|
|
SELECT id
|
|
FROM nodes
|
|
` + db.db.impl.AsOfSystemInterval(asOfSystemInterval) + `
|
|
WHERE exit_finished_at IS NOT NULL
|
|
AND exit_finished_at < $1
|
|
LIMIT $2 OFFSET $3
|
|
`
|
|
deleteStmt := `
|
|
DELETE FROM graceful_exit_segment_transfer_queue
|
|
WHERE node_id = $1
|
|
LIMIT $2
|
|
`
|
|
|
|
var (
|
|
deleteCount int64
|
|
offset int
|
|
)
|
|
for {
|
|
var nodeIDs storj.NodeIDList
|
|
deleteItems := func() (int64, error) {
|
|
// Select exited nodes
|
|
rows, err := db.db.QueryContext(ctx, nodesQuery, before, batchSize, offset)
|
|
if err != nil {
|
|
return deleteCount, Error.Wrap(err)
|
|
}
|
|
defer func() { err = errs.Combine(err, rows.Close()) }()
|
|
|
|
count := 0
|
|
for rows.Next() {
|
|
var id storj.NodeID
|
|
if err = rows.Scan(&id); err != nil {
|
|
return deleteCount, Error.Wrap(err)
|
|
}
|
|
nodeIDs = append(nodeIDs, id)
|
|
count++
|
|
}
|
|
|
|
if count == batchSize {
|
|
offset += count
|
|
} else {
|
|
offset = -1 // indicates that there aren't more nodes to query
|
|
}
|
|
|
|
for _, id := range nodeIDs {
|
|
for {
|
|
res, err := db.db.ExecContext(ctx, deleteStmt, id.Bytes(), batchSize)
|
|
if err != nil {
|
|
return deleteCount, Error.Wrap(err)
|
|
}
|
|
count, err := res.RowsAffected()
|
|
if err != nil {
|
|
return deleteCount, Error.Wrap(err)
|
|
}
|
|
deleteCount += count
|
|
if count < int64(batchSize) {
|
|
break
|
|
}
|
|
}
|
|
}
|
|
return deleteCount, nil
|
|
}
|
|
deleteCount, err = deleteItems()
|
|
if err != nil {
|
|
return deleteCount, err
|
|
}
|
|
// when offset is negative means that we have get already all the nodes
|
|
// which have exited
|
|
if offset < 0 {
|
|
break
|
|
}
|
|
}
|
|
return deleteCount, nil
|
|
}
|
|
|
|
return 0, Error.New("unsupported implementation: %s", db.db.impl)
|
|
}
|
|
|
|
// DeleteFinishedExitProgress deletes exit progress entries for nodes that
|
|
// finished exiting before the indicated time, returns number of deleted entries.
|
|
func (db *gracefulexitDB) DeleteFinishedExitProgress(
|
|
ctx context.Context, before time.Time, asOfSystemInterval time.Duration) (_ int64, err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
finishedNodes, err := db.GetFinishedExitNodes(ctx, before, asOfSystemInterval)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
return db.DeleteBatchExitProgress(ctx, finishedNodes)
|
|
}
|
|
|
|
// GetFinishedExitNodes gets nodes that are marked having finished graceful exit before a given time.
|
|
func (db *gracefulexitDB) GetFinishedExitNodes(ctx context.Context, before time.Time, asOfSystemInterval time.Duration) (finishedNodes []storj.NodeID, err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
stmt := `
|
|
SELECT id
|
|
FROM nodes
|
|
` + db.db.impl.AsOfSystemInterval(asOfSystemInterval) + `
|
|
WHERE exit_finished_at IS NOT NULL
|
|
AND exit_finished_at < ?
|
|
`
|
|
rows, err := db.db.Query(ctx, db.db.Rebind(stmt), before.UTC())
|
|
if err != nil {
|
|
return nil, Error.Wrap(err)
|
|
}
|
|
defer func() {
|
|
err = Error.Wrap(errs.Combine(err, rows.Close()))
|
|
}()
|
|
|
|
for rows.Next() {
|
|
var id storj.NodeID
|
|
err = rows.Scan(&id)
|
|
if err != nil {
|
|
return nil, Error.Wrap(err)
|
|
}
|
|
finishedNodes = append(finishedNodes, id)
|
|
}
|
|
return finishedNodes, Error.Wrap(rows.Err())
|
|
}
|
|
|
|
// DeleteBatchExitProgress batch deletes from exit progress. This is separate from
|
|
// getting the node IDs because the combined query is slow in CRDB. It's safe to do
|
|
// separately because if nodes are deleted between the get and delete, it doesn't
|
|
// affect correctness.
|
|
func (db *gracefulexitDB) DeleteBatchExitProgress(ctx context.Context, nodeIDs []storj.NodeID) (deleted int64, err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
stmt := `DELETE from graceful_exit_progress
|
|
WHERE node_id = ANY($1)`
|
|
for len(nodeIDs) > 0 {
|
|
numToSubmit := len(nodeIDs)
|
|
if numToSubmit > deleteExitProgressBatchSize {
|
|
numToSubmit = deleteExitProgressBatchSize
|
|
}
|
|
nodesToSubmit := nodeIDs[:numToSubmit]
|
|
res, err := db.db.ExecContext(ctx, stmt, pgutil.NodeIDArray(nodesToSubmit))
|
|
if err != nil {
|
|
return deleted, Error.Wrap(err)
|
|
}
|
|
count, err := res.RowsAffected()
|
|
if err != nil {
|
|
return deleted, Error.Wrap(err)
|
|
}
|
|
deleted += count
|
|
nodeIDs = nodeIDs[numToSubmit:]
|
|
}
|
|
return deleted, Error.Wrap(err)
|
|
}
|
|
|
|
// GetTransferQueueItem gets a graceful exit transfer queue entry.
|
|
func (db *gracefulexitDB) GetTransferQueueItem(ctx context.Context, nodeID storj.NodeID, streamID uuid.UUID, position metabase.SegmentPosition, pieceNum int32) (_ *gracefulexit.TransferQueueItem, err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
dbxTransferQueue, err := db.db.Get_GracefulExitSegmentTransfer_By_NodeId_And_StreamId_And_Position_And_PieceNum(ctx,
|
|
dbx.GracefulExitSegmentTransfer_NodeId(nodeID.Bytes()),
|
|
dbx.GracefulExitSegmentTransfer_StreamId(streamID[:]),
|
|
dbx.GracefulExitSegmentTransfer_Position(position.Encode()),
|
|
dbx.GracefulExitSegmentTransfer_PieceNum(int(pieceNum)))
|
|
|
|
if err != nil {
|
|
return nil, Error.Wrap(err)
|
|
}
|
|
transferQueueItem, err := dbxSegmentTransferToTransferQueueItem(dbxTransferQueue)
|
|
if err != nil {
|
|
return nil, Error.Wrap(err)
|
|
}
|
|
|
|
return transferQueueItem, Error.Wrap(err)
|
|
|
|
}
|
|
|
|
// GetIncomplete gets incomplete graceful exit transfer queue entries ordered by durability ratio and queued date ascending.
|
|
func (db *gracefulexitDB) GetIncomplete(ctx context.Context, nodeID storj.NodeID, limit int, offset int64) (_ []*gracefulexit.TransferQueueItem, err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
sql := `
|
|
SELECT
|
|
node_id, stream_id, position,
|
|
piece_num, root_piece_id, durability_ratio,
|
|
queued_at, requested_at, last_failed_at,
|
|
last_failed_code, failed_count, finished_at,
|
|
order_limit_send_count
|
|
FROM graceful_exit_segment_transfer_queue
|
|
WHERE node_id = ?
|
|
AND finished_at is NULL
|
|
ORDER BY durability_ratio asc, queued_at asc LIMIT ? OFFSET ?`
|
|
rows, err := db.db.Query(ctx, db.db.Rebind(sql), nodeID.Bytes(), limit, offset)
|
|
if err != nil {
|
|
return nil, Error.Wrap(err)
|
|
}
|
|
defer func() { err = errs.Combine(err, rows.Close()) }()
|
|
|
|
transferQueueItemRows, err := scanRows(rows)
|
|
if err != nil {
|
|
return nil, Error.Wrap(err)
|
|
}
|
|
|
|
return transferQueueItemRows, nil
|
|
}
|
|
|
|
// GetIncompleteNotFailed gets incomplete graceful exit transfer queue entries that haven't failed, ordered by durability ratio and queued date ascending.
|
|
func (db *gracefulexitDB) GetIncompleteNotFailed(ctx context.Context, nodeID storj.NodeID, limit int, offset int64) (_ []*gracefulexit.TransferQueueItem, err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
sql := `
|
|
SELECT
|
|
node_id, stream_id, position,
|
|
piece_num, root_piece_id, durability_ratio,
|
|
queued_at, requested_at, last_failed_at,
|
|
last_failed_code, failed_count, finished_at,
|
|
order_limit_send_count
|
|
FROM graceful_exit_segment_transfer_queue
|
|
WHERE node_id = ?
|
|
AND finished_at is NULL
|
|
AND last_failed_at is NULL
|
|
ORDER BY durability_ratio asc, queued_at asc LIMIT ? OFFSET ?`
|
|
rows, err := db.db.Query(ctx, db.db.Rebind(sql), nodeID.Bytes(), limit, offset)
|
|
if err != nil {
|
|
return nil, Error.Wrap(err)
|
|
}
|
|
defer func() { err = errs.Combine(err, rows.Close()) }()
|
|
|
|
transferQueueItemRows, err := scanRows(rows)
|
|
if err != nil {
|
|
return nil, Error.Wrap(err)
|
|
}
|
|
|
|
return transferQueueItemRows, nil
|
|
}
|
|
|
|
// GetIncompleteNotFailed gets incomplete graceful exit transfer queue entries that have failed <= maxFailures times, ordered by durability ratio and queued date ascending.
|
|
func (db *gracefulexitDB) GetIncompleteFailed(ctx context.Context, nodeID storj.NodeID, maxFailures int, limit int, offset int64) (_ []*gracefulexit.TransferQueueItem, err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
sql := `
|
|
SELECT
|
|
node_id, stream_id, position,
|
|
piece_num, root_piece_id, durability_ratio,
|
|
queued_at, requested_at, last_failed_at,
|
|
last_failed_code, failed_count, finished_at,
|
|
order_limit_send_count
|
|
FROM graceful_exit_segment_transfer_queue
|
|
WHERE node_id = ?
|
|
AND finished_at is NULL
|
|
AND last_failed_at is not NULL
|
|
AND failed_count < ?
|
|
ORDER BY durability_ratio asc, queued_at asc LIMIT ? OFFSET ?`
|
|
rows, err := db.db.Query(ctx, db.db.Rebind(sql), nodeID.Bytes(), maxFailures, limit, offset)
|
|
if err != nil {
|
|
return nil, Error.Wrap(err)
|
|
}
|
|
defer func() { err = errs.Combine(err, rows.Close()) }()
|
|
|
|
transferQueueItemRows, err := scanRows(rows)
|
|
if err != nil {
|
|
return nil, Error.Wrap(err)
|
|
}
|
|
|
|
return transferQueueItemRows, nil
|
|
}
|
|
|
|
// IncrementOrderLimitSendCount increments the number of times a node has been sent an order limit for transferring.
|
|
func (db *gracefulexitDB) IncrementOrderLimitSendCount(ctx context.Context, nodeID storj.NodeID, streamID uuid.UUID, position metabase.SegmentPosition, pieceNum int32) (err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
sql := `UPDATE graceful_exit_segment_transfer_queue SET order_limit_send_count = graceful_exit_segment_transfer_queue.order_limit_send_count + 1
|
|
WHERE node_id = ?
|
|
AND stream_id = ?
|
|
AND position = ?
|
|
AND piece_num = ?`
|
|
_, err = db.db.ExecContext(ctx, db.db.Rebind(sql), nodeID, streamID, position.Encode(), pieceNum)
|
|
|
|
return Error.Wrap(err)
|
|
}
|
|
|
|
// CountFinishedTransferQueueItemsByNode return a map of the nodes which has
|
|
// finished the exit before the indicated time but there are at least one item
|
|
// left in the transfer queue.
|
|
func (db *gracefulexitDB) CountFinishedTransferQueueItemsByNode(ctx context.Context, before time.Time, asOfSystemInterval time.Duration) (_ map[storj.NodeID]int64, err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
query := `SELECT n.id, count(getq.node_id)
|
|
FROM nodes as n INNER JOIN graceful_exit_segment_transfer_queue as getq
|
|
ON n.id = getq.node_id
|
|
` + db.db.impl.AsOfSystemInterval(asOfSystemInterval) + `
|
|
WHERE n.exit_finished_at IS NOT NULL
|
|
AND n.exit_finished_at < ?
|
|
GROUP BY n.id`
|
|
|
|
statement := db.db.Rebind(query)
|
|
|
|
rows, err := db.db.QueryContext(ctx, statement, before)
|
|
if err != nil {
|
|
return nil, Error.Wrap(err)
|
|
}
|
|
defer func() { err = errs.Combine(err, Error.Wrap(rows.Close())) }()
|
|
|
|
nodesItemsCount := make(map[storj.NodeID]int64)
|
|
for rows.Next() {
|
|
var (
|
|
nodeID storj.NodeID
|
|
n int64
|
|
)
|
|
err := rows.Scan(&nodeID, &n)
|
|
if err != nil {
|
|
return nil, Error.Wrap(err)
|
|
}
|
|
|
|
nodesItemsCount[nodeID] = n
|
|
}
|
|
|
|
return nodesItemsCount, Error.Wrap(rows.Err())
|
|
}
|
|
|
|
func scanRows(rows tagsql.Rows) (transferQueueItemRows []*gracefulexit.TransferQueueItem, err error) {
|
|
for rows.Next() {
|
|
transferQueueItem := &gracefulexit.TransferQueueItem{}
|
|
var pieceIDBytes []byte
|
|
err = rows.Scan(&transferQueueItem.NodeID, &transferQueueItem.StreamID, &transferQueueItem.Position, &transferQueueItem.PieceNum, &pieceIDBytes,
|
|
&transferQueueItem.DurabilityRatio, &transferQueueItem.QueuedAt, &transferQueueItem.RequestedAt, &transferQueueItem.LastFailedAt,
|
|
&transferQueueItem.LastFailedCode, &transferQueueItem.FailedCount, &transferQueueItem.FinishedAt, &transferQueueItem.OrderLimitSendCount)
|
|
|
|
if err != nil {
|
|
return nil, Error.Wrap(err)
|
|
}
|
|
if pieceIDBytes != nil {
|
|
transferQueueItem.RootPieceID, err = storj.PieceIDFromBytes(pieceIDBytes)
|
|
if err != nil {
|
|
return nil, Error.Wrap(err)
|
|
}
|
|
}
|
|
|
|
transferQueueItemRows = append(transferQueueItemRows, transferQueueItem)
|
|
}
|
|
return transferQueueItemRows, Error.Wrap(rows.Err())
|
|
}
|
|
|
|
func dbxSegmentTransferToTransferQueueItem(dbxSegmentTransfer *dbx.GracefulExitSegmentTransfer) (item *gracefulexit.TransferQueueItem, err error) {
|
|
nID, err := storj.NodeIDFromBytes(dbxSegmentTransfer.NodeId)
|
|
if err != nil {
|
|
return nil, Error.Wrap(err)
|
|
}
|
|
|
|
streamID, err := uuid.FromBytes(dbxSegmentTransfer.StreamId)
|
|
if err != nil {
|
|
return nil, Error.Wrap(err)
|
|
}
|
|
|
|
position := metabase.SegmentPositionFromEncoded(dbxSegmentTransfer.Position)
|
|
|
|
item = &gracefulexit.TransferQueueItem{
|
|
NodeID: nID,
|
|
StreamID: streamID,
|
|
Position: position,
|
|
PieceNum: int32(dbxSegmentTransfer.PieceNum),
|
|
DurabilityRatio: dbxSegmentTransfer.DurabilityRatio,
|
|
QueuedAt: dbxSegmentTransfer.QueuedAt,
|
|
OrderLimitSendCount: dbxSegmentTransfer.OrderLimitSendCount,
|
|
}
|
|
if dbxSegmentTransfer.RootPieceId != nil {
|
|
item.RootPieceID, err = storj.PieceIDFromBytes(dbxSegmentTransfer.RootPieceId)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
if dbxSegmentTransfer.LastFailedCode != nil {
|
|
item.LastFailedCode = dbxSegmentTransfer.LastFailedCode
|
|
}
|
|
if dbxSegmentTransfer.FailedCount != nil {
|
|
item.FailedCount = dbxSegmentTransfer.FailedCount
|
|
}
|
|
if dbxSegmentTransfer.RequestedAt != nil && !dbxSegmentTransfer.RequestedAt.IsZero() {
|
|
item.RequestedAt = dbxSegmentTransfer.RequestedAt
|
|
}
|
|
if dbxSegmentTransfer.LastFailedAt != nil && !dbxSegmentTransfer.LastFailedAt.IsZero() {
|
|
item.LastFailedAt = dbxSegmentTransfer.LastFailedAt
|
|
}
|
|
if dbxSegmentTransfer.FinishedAt != nil && !dbxSegmentTransfer.FinishedAt.IsZero() {
|
|
item.FinishedAt = dbxSegmentTransfer.FinishedAt
|
|
}
|
|
|
|
return item, nil
|
|
}
|