2019-07-12 18:35:20 +01:00
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
2019-08-26 11:19:02 +01:00
package queue_test
2019-07-12 18:35:20 +01:00
import (
2019-07-22 15:10:42 +01:00
"context"
2019-07-12 18:35:20 +01:00
"testing"
2019-07-22 15:10:42 +01:00
"time"
2019-07-12 18:35:20 +01:00
2019-07-22 15:10:42 +01:00
"github.com/lib/pq"
sqlite3 "github.com/mattn/go-sqlite3"
2019-07-12 18:35:20 +01:00
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"storj.io/storj/internal/testcontext"
"storj.io/storj/pkg/pb"
"storj.io/storj/satellite"
2019-07-22 15:10:42 +01:00
dbx "storj.io/storj/satellite/satellitedb/dbx"
2019-07-12 18:35:20 +01:00
"storj.io/storj/satellite/satellitedb/satellitedbtest"
"storj.io/storj/storage"
)
2019-08-26 11:19:02 +01:00
func TestUntilEmpty ( t * testing . T ) {
2019-07-12 18:35:20 +01:00
satellitedbtest . Run ( t , func ( t * testing . T , db satellite . DB ) {
ctx := testcontext . New ( t )
defer ctx . Cleanup ( )
repairQueue := db . RepairQueue ( )
// insert a bunch of segments
pathsMap := make ( map [ string ] int )
for i := 0 ; i < 100 ; i ++ {
path := "/path/" + string ( i )
injuredSeg := & pb . InjuredSegment { Path : [ ] byte ( path ) }
err := repairQueue . Insert ( ctx , injuredSeg )
require . NoError ( t , err )
pathsMap [ path ] = 0
}
// select segments until no more are returned, and we should get each one exactly once
for {
injuredSeg , err := repairQueue . Select ( ctx )
if err != nil {
require . True ( t , storage . ErrEmptyQueue . Has ( err ) )
break
}
pathsMap [ string ( injuredSeg . Path ) ] ++
}
for _ , selectCount := range pathsMap {
assert . Equal ( t , selectCount , 1 )
}
} )
}
2019-07-22 15:10:42 +01:00
2019-08-26 11:19:02 +01:00
func TestOrder ( t * testing . T ) {
2019-07-22 15:10:42 +01:00
satellitedbtest . Run ( t , func ( t * testing . T , db satellite . DB ) {
ctx := testcontext . New ( t )
defer ctx . Cleanup ( )
repairQueue := db . RepairQueue ( )
nullPath := [ ] byte ( "/path/null" )
recentRepairPath := [ ] byte ( "/path/recent" )
oldRepairPath := [ ] byte ( "/path/old" )
olderRepairPath := [ ] byte ( "/path/older" )
for _ , path := range [ ] [ ] byte { oldRepairPath , recentRepairPath , nullPath , olderRepairPath } {
injuredSeg := & pb . InjuredSegment { Path : path }
err := repairQueue . Insert ( ctx , injuredSeg )
require . NoError ( t , err )
}
2019-08-26 11:19:02 +01:00
// TODO: remove dependency on *dbx.DB
2019-07-22 15:10:42 +01:00
dbAccess := db . ( interface { TestDBAccess ( ) * dbx . DB } ) . TestDBAccess ( )
var timeConvertPrefix string
switch d := dbAccess . DB . Driver ( ) . ( type ) {
case * sqlite3 . SQLiteDriver :
timeConvertPrefix = "datetime("
case * pq . Driver :
timeConvertPrefix = "timezone('utc', "
default :
t . Errorf ( "Unsupported database type %t" , d )
}
err := dbAccess . WithTx ( ctx , func ( ctx context . Context , tx * dbx . Tx ) error {
updateList := [ ] struct {
path [ ] byte
attempted time . Time
} {
{ recentRepairPath , time . Now ( ) } ,
{ oldRepairPath , time . Now ( ) . Add ( - 2 * time . Hour ) } ,
{ olderRepairPath , time . Now ( ) . Add ( - 3 * time . Hour ) } ,
}
for _ , item := range updateList {
res , err := tx . Tx . ExecContext ( ctx , dbAccess . Rebind ( ` UPDATE injuredsegments SET attempted = ` + timeConvertPrefix + ` ?) WHERE path = ? ` ) , item . attempted , item . path )
if err != nil {
return err
}
count , err := res . RowsAffected ( )
if err != nil {
return err
}
require . EqualValues ( t , 1 , count )
}
return nil
} )
require . NoError ( t , err )
// path with attempted = null should be selected first
injuredSeg , err := repairQueue . Select ( ctx )
require . NoError ( t , err )
assert . Equal ( t , string ( nullPath ) , string ( injuredSeg . Path ) )
// path with attempted = 3 hours ago should be selected next
injuredSeg , err = repairQueue . Select ( ctx )
require . NoError ( t , err )
assert . Equal ( t , string ( olderRepairPath ) , string ( injuredSeg . Path ) )
// path with attempted = 2 hours ago should be selected next
injuredSeg , err = repairQueue . Select ( ctx )
require . NoError ( t , err )
assert . Equal ( t , string ( oldRepairPath ) , string ( injuredSeg . Path ) )
// queue should be considered "empty" now
injuredSeg , err = repairQueue . Select ( ctx )
assert . True ( t , storage . ErrEmptyQueue . Has ( err ) )
assert . Nil ( t , injuredSeg )
} )
}
2019-07-30 16:21:40 +01:00
2019-08-26 11:19:02 +01:00
func TestCount ( t * testing . T ) {
2019-07-30 16:21:40 +01:00
satellitedbtest . Run ( t , func ( t * testing . T , db satellite . DB ) {
ctx := testcontext . New ( t )
defer ctx . Cleanup ( )
repairQueue := db . RepairQueue ( )
// insert a bunch of segments
pathsMap := make ( map [ string ] int )
numSegments := 100
for i := 0 ; i < numSegments ; i ++ {
path := "/path/" + string ( i )
injuredSeg := & pb . InjuredSegment { Path : [ ] byte ( path ) }
err := repairQueue . Insert ( ctx , injuredSeg )
require . NoError ( t , err )
pathsMap [ path ] = 0
}
count , err := repairQueue . Count ( ctx )
require . NoError ( t , err )
require . Equal ( t , count , numSegments )
} )
}