2019-07-12 18:35:20 +01:00
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
2019-08-26 11:19:02 +01:00
package queue_test
2019-07-12 18:35:20 +01:00
import (
2019-07-22 15:10:42 +01:00
"context"
2020-02-21 21:32:05 +00:00
"math/rand"
2020-06-12 07:35:26 +01:00
"strconv"
2019-07-12 18:35:20 +01:00
"testing"
2019-07-22 15:10:42 +01:00
"time"
2019-07-12 18:35:20 +01:00
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
2019-12-27 11:48:47 +00:00
"storj.io/common/testcontext"
2019-07-12 18:35:20 +01:00
"storj.io/storj/satellite"
2020-10-30 10:41:22 +00:00
"storj.io/storj/satellite/internalpb"
2020-01-15 02:29:51 +00:00
"storj.io/storj/satellite/satellitedb/dbx"
2019-07-12 18:35:20 +01:00
"storj.io/storj/satellite/satellitedb/satellitedbtest"
"storj.io/storj/storage"
)
2019-08-26 11:19:02 +01:00
func TestUntilEmpty ( t * testing . T ) {
2020-01-19 16:29:15 +00:00
satellitedbtest . Run ( t , func ( ctx * testcontext . Context , t * testing . T , db satellite . DB ) {
2019-07-12 18:35:20 +01:00
repairQueue := db . RepairQueue ( )
// insert a bunch of segments
pathsMap := make ( map [ string ] int )
2020-05-22 20:54:05 +01:00
for i := 0 ; i < 20 ; i ++ {
2020-06-12 07:35:26 +01:00
path := "/path/" + strconv . Itoa ( i )
2020-10-30 10:41:22 +00:00
injuredSeg := & internalpb . InjuredSegment { Path : [ ] byte ( path ) }
2020-05-22 20:54:05 +01:00
alreadyInserted , err := repairQueue . Insert ( ctx , injuredSeg , 10 )
2019-07-12 18:35:20 +01:00
require . NoError ( t , err )
2020-05-22 20:54:05 +01:00
require . False ( t , alreadyInserted )
2019-07-12 18:35:20 +01:00
pathsMap [ path ] = 0
}
// select segments until no more are returned, and we should get each one exactly once
for {
injuredSeg , err := repairQueue . Select ( ctx )
if err != nil {
require . True ( t , storage . ErrEmptyQueue . Has ( err ) )
break
}
pathsMap [ string ( injuredSeg . Path ) ] ++
}
for _ , selectCount := range pathsMap {
assert . Equal ( t , selectCount , 1 )
}
} )
}
2019-07-22 15:10:42 +01:00
2019-08-26 11:19:02 +01:00
func TestOrder ( t * testing . T ) {
2020-01-19 16:29:15 +00:00
satellitedbtest . Run ( t , func ( ctx * testcontext . Context , t * testing . T , db satellite . DB ) {
2019-07-22 15:10:42 +01:00
repairQueue := db . RepairQueue ( )
nullPath := [ ] byte ( "/path/null" )
recentRepairPath := [ ] byte ( "/path/recent" )
oldRepairPath := [ ] byte ( "/path/old" )
olderRepairPath := [ ] byte ( "/path/older" )
for _ , path := range [ ] [ ] byte { oldRepairPath , recentRepairPath , nullPath , olderRepairPath } {
2020-10-30 10:41:22 +00:00
injuredSeg := & internalpb . InjuredSegment { Path : path }
2020-05-22 20:54:05 +01:00
alreadyInserted , err := repairQueue . Insert ( ctx , injuredSeg , 10 )
2019-07-22 15:10:42 +01:00
require . NoError ( t , err )
2020-05-22 20:54:05 +01:00
require . False ( t , alreadyInserted )
2019-07-22 15:10:42 +01:00
}
2019-08-26 11:19:02 +01:00
// TODO: remove dependency on *dbx.DB
2020-11-28 16:23:39 +00:00
dbAccess := db . RepairQueue ( ) . TestDBAccess ( )
2019-07-22 15:10:42 +01:00
err := dbAccess . WithTx ( ctx , func ( ctx context . Context , tx * dbx . Tx ) error {
updateList := [ ] struct {
path [ ] byte
attempted time . Time
} {
{ recentRepairPath , time . Now ( ) } ,
2020-02-21 21:32:05 +00:00
{ oldRepairPath , time . Now ( ) . Add ( - 7 * time . Hour ) } ,
{ olderRepairPath , time . Now ( ) . Add ( - 8 * time . Hour ) } ,
2019-07-22 15:10:42 +01:00
}
for _ , item := range updateList {
2020-02-11 15:33:34 +00:00
res , err := tx . Tx . ExecContext ( ctx , dbAccess . Rebind ( ` UPDATE injuredsegments SET attempted = ? WHERE path = ? ` ) , item . attempted , item . path )
2019-07-22 15:10:42 +01:00
if err != nil {
return err
}
count , err := res . RowsAffected ( )
if err != nil {
return err
}
require . EqualValues ( t , 1 , count )
}
return nil
} )
require . NoError ( t , err )
// path with attempted = null should be selected first
injuredSeg , err := repairQueue . Select ( ctx )
require . NoError ( t , err )
assert . Equal ( t , string ( nullPath ) , string ( injuredSeg . Path ) )
2020-02-21 21:32:05 +00:00
// path with attempted = 8 hours ago should be selected next
2019-07-22 15:10:42 +01:00
injuredSeg , err = repairQueue . Select ( ctx )
require . NoError ( t , err )
assert . Equal ( t , string ( olderRepairPath ) , string ( injuredSeg . Path ) )
2020-02-21 21:32:05 +00:00
// path with attempted = 7 hours ago should be selected next
2019-07-22 15:10:42 +01:00
injuredSeg , err = repairQueue . Select ( ctx )
require . NoError ( t , err )
assert . Equal ( t , string ( oldRepairPath ) , string ( injuredSeg . Path ) )
// queue should be considered "empty" now
injuredSeg , err = repairQueue . Select ( ctx )
assert . True ( t , storage . ErrEmptyQueue . Has ( err ) )
assert . Nil ( t , injuredSeg )
} )
}
2019-07-30 16:21:40 +01:00
2020-02-21 21:32:05 +00:00
// TestOrderHealthyPieces ensures that we select in the correct order, accounting for segment health as well as last attempted repair time.
func TestOrderHealthyPieces ( t * testing . T ) {
satellitedbtest . Run ( t , func ( ctx * testcontext . Context , t * testing . T , db satellite . DB ) {
repairQueue := db . RepairQueue ( )
2020-10-21 23:02:54 +01:00
// we insert (path, segmentHealth, lastAttempted) as follows:
2020-02-21 21:32:05 +00:00
// ("path/a", 6, now-8h)
// ("path/b", 7, now)
// ("path/c", 8, null)
// ("path/d", 9, null)
// ("path/e", 9, now-7h)
// ("path/f", 9, now-8h)
// ("path/g", 10, null)
// ("path/h", 10, now-8h)
// TODO: remove dependency on *dbx.DB
2020-11-28 16:23:39 +00:00
dbAccess := db . RepairQueue ( ) . TestDBAccess ( )
2020-02-21 21:32:05 +00:00
// insert the 8 segments according to the plan above
injuredSegList := [ ] struct {
2020-10-21 23:02:54 +01:00
path [ ] byte
segmentHealth float64
attempted time . Time
2020-02-21 21:32:05 +00:00
} {
{ [ ] byte ( "path/a" ) , 6 , time . Now ( ) . Add ( - 8 * time . Hour ) } ,
{ [ ] byte ( "path/b" ) , 7 , time . Now ( ) } ,
{ [ ] byte ( "path/c" ) , 8 , time . Time { } } ,
{ [ ] byte ( "path/d" ) , 9 , time . Time { } } ,
{ [ ] byte ( "path/e" ) , 9 , time . Now ( ) . Add ( - 7 * time . Hour ) } ,
{ [ ] byte ( "path/f" ) , 9 , time . Now ( ) . Add ( - 8 * time . Hour ) } ,
{ [ ] byte ( "path/g" ) , 10 , time . Time { } } ,
{ [ ] byte ( "path/h" ) , 10 , time . Now ( ) . Add ( - 8 * time . Hour ) } ,
}
// shuffle list since select order should not depend on insert order
rand . Seed ( time . Now ( ) . UnixNano ( ) )
rand . Shuffle ( len ( injuredSegList ) , func ( i , j int ) {
injuredSegList [ i ] , injuredSegList [ j ] = injuredSegList [ j ] , injuredSegList [ i ]
} )
for _ , item := range injuredSegList {
// first, insert the injured segment
2020-10-30 10:41:22 +00:00
injuredSeg := & internalpb . InjuredSegment { Path : item . path }
2020-10-21 23:02:54 +01:00
alreadyInserted , err := repairQueue . Insert ( ctx , injuredSeg , item . segmentHealth )
2020-02-21 21:32:05 +00:00
require . NoError ( t , err )
2020-05-22 20:54:05 +01:00
require . False ( t , alreadyInserted )
2020-02-21 21:32:05 +00:00
// next, if applicable, update the "attempted at" timestamp
if ! item . attempted . IsZero ( ) {
2020-03-10 22:05:01 +00:00
res , err := dbAccess . ExecContext ( ctx , dbAccess . Rebind ( ` UPDATE injuredsegments SET attempted = ? WHERE path = ? ` ) , item . attempted , item . path )
2020-02-21 21:32:05 +00:00
require . NoError ( t , err )
count , err := res . RowsAffected ( )
require . NoError ( t , err )
require . EqualValues ( t , 1 , count )
}
}
// we expect segment health to be prioritized first
// if segment health is equal, we expect the least recently attempted, with nulls first, to be prioritized first
// (excluding segments that have been attempted in the past six hours)
// we do not expect to see segments that have been attempted in the past hour
// therefore, the order of selection should be:
// "path/a", "path/c", "path/d", "path/f", "path/e", "path/g", "path/h"
// "path/b" will not be selected because it was attempted recently
for _ , nextPath := range [ ] string {
"path/a" ,
"path/c" ,
"path/d" ,
"path/f" ,
"path/e" ,
"path/g" ,
"path/h" ,
} {
injuredSeg , err := repairQueue . Select ( ctx )
require . NoError ( t , err )
assert . Equal ( t , nextPath , string ( injuredSeg . Path ) )
}
// queue should be considered "empty" now
injuredSeg , err := repairQueue . Select ( ctx )
assert . True ( t , storage . ErrEmptyQueue . Has ( err ) )
assert . Nil ( t , injuredSeg )
} )
}
// TestOrderOverwrite ensures that re-inserting the same segment with a lower health, will properly adjust its prioritizationTestOrderOverwrite ensures that re-inserting the same segment with a lower health, will properly adjust its prioritization.
func TestOrderOverwrite ( t * testing . T ) {
satellitedbtest . Run ( t , func ( ctx * testcontext . Context , t * testing . T , db satellite . DB ) {
repairQueue := db . RepairQueue ( )
2020-10-21 23:02:54 +01:00
// insert "path/a" with segment segment health 10
// insert "path/b" with segment segment health 9
// re-insert "path/a" with segment segment health 8
2020-02-21 21:32:05 +00:00
// when we select, expect "path/a" first since after the re-insert, it is the least durable segment.
// insert the 8 segments according to the plan above
injuredSegList := [ ] struct {
2020-10-21 23:02:54 +01:00
path [ ] byte
segmentHealth float64
2020-02-21 21:32:05 +00:00
} {
{ [ ] byte ( "path/a" ) , 10 } ,
{ [ ] byte ( "path/b" ) , 9 } ,
{ [ ] byte ( "path/a" ) , 8 } ,
}
2020-05-22 20:54:05 +01:00
for i , item := range injuredSegList {
2020-10-30 10:41:22 +00:00
injuredSeg := & internalpb . InjuredSegment { Path : item . path }
2020-10-21 23:02:54 +01:00
alreadyInserted , err := repairQueue . Insert ( ctx , injuredSeg , item . segmentHealth )
2020-02-21 21:32:05 +00:00
require . NoError ( t , err )
2020-05-22 20:54:05 +01:00
if i == 2 {
require . True ( t , alreadyInserted )
} else {
require . False ( t , alreadyInserted )
}
2020-02-21 21:32:05 +00:00
}
for _ , nextPath := range [ ] string {
"path/a" ,
"path/b" ,
} {
injuredSeg , err := repairQueue . Select ( ctx )
require . NoError ( t , err )
assert . Equal ( t , nextPath , string ( injuredSeg . Path ) )
}
// queue should be considered "empty" now
injuredSeg , err := repairQueue . Select ( ctx )
assert . True ( t , storage . ErrEmptyQueue . Has ( err ) )
assert . Nil ( t , injuredSeg )
} )
}
2019-08-26 11:19:02 +01:00
func TestCount ( t * testing . T ) {
2020-01-19 16:29:15 +00:00
satellitedbtest . Run ( t , func ( ctx * testcontext . Context , t * testing . T , db satellite . DB ) {
2019-07-30 16:21:40 +01:00
repairQueue := db . RepairQueue ( )
// insert a bunch of segments
pathsMap := make ( map [ string ] int )
2020-05-22 20:54:05 +01:00
numSegments := 20
2019-07-30 16:21:40 +01:00
for i := 0 ; i < numSegments ; i ++ {
2020-06-12 07:35:26 +01:00
path := "/path/" + strconv . Itoa ( i )
2020-10-30 10:41:22 +00:00
injuredSeg := & internalpb . InjuredSegment { Path : [ ] byte ( path ) }
2020-05-22 20:54:05 +01:00
alreadyInserted , err := repairQueue . Insert ( ctx , injuredSeg , 10 )
2019-07-30 16:21:40 +01:00
require . NoError ( t , err )
2020-05-22 20:54:05 +01:00
require . False ( t , alreadyInserted )
2019-07-30 16:21:40 +01:00
pathsMap [ path ] = 0
}
count , err := repairQueue . Count ( ctx )
require . NoError ( t , err )
require . Equal ( t , count , numSegments )
} )
}