Diagnostic tool to inspect repair queue (#656)

* initial repair queue diag tool development

* fixes linter warnings

* code review updates
This commit is contained in:
aligeti 2018-11-16 08:31:33 -05:00 committed by GitHub
parent 08ade45446
commit 7958994ae2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 107 additions and 3 deletions

View File

@ -18,8 +18,9 @@ import (
"storj.io/storj/pkg/auth/grpcauth"
"storj.io/storj/pkg/bwagreement"
"storj.io/storj/pkg/bwagreement/database-manager"
dbmanager "storj.io/storj/pkg/bwagreement/database-manager"
"storj.io/storj/pkg/cfgstruct"
"storj.io/storj/pkg/datarepair/queue"
"storj.io/storj/pkg/kademlia"
"storj.io/storj/pkg/overlay"
mockOverlay "storj.io/storj/pkg/overlay/mocks"
@ -28,6 +29,7 @@ import (
"storj.io/storj/pkg/process"
"storj.io/storj/pkg/provider"
"storj.io/storj/pkg/statdb"
"storj.io/storj/storage/redis"
)
var (
@ -50,6 +52,11 @@ var (
Short: "Diagnostic Tool support",
RunE: cmdDiag,
}
qdiagCmd = &cobra.Command{
Use: "qdiag",
Short: "Repair Queue Diagnostic Tool support",
RunE: cmdQDiag,
}
runCfg struct {
Identity provider.IdentityConfig
@ -73,6 +80,10 @@ var (
diagCfg struct {
DatabaseURL string `help:"the database connection string to use" default:"sqlite3://$CONFDIR/bw.db"`
}
qdiagCfg struct {
DatabaseURL string `help:"the database connection string to use" default:"redis://127.0.0.1:6378?db=1&password=abc123"`
QListLimit int `help:"maximum segments that can be requested" default:"1000"`
}
defaultConfDir = "$HOME/.storj/satellite"
)
@ -81,9 +92,11 @@ func init() {
rootCmd.AddCommand(runCmd)
rootCmd.AddCommand(setupCmd)
rootCmd.AddCommand(diagCmd)
rootCmd.AddCommand(qdiagCmd)
cfgstruct.Bind(runCmd.Flags(), &runCfg, cfgstruct.ConfDir(defaultConfDir))
cfgstruct.Bind(setupCmd.Flags(), &setupCfg, cfgstruct.ConfDir(defaultConfDir))
cfgstruct.Bind(diagCmd.Flags(), &diagCfg, cfgstruct.ConfDir(defaultConfDir))
cfgstruct.Bind(qdiagCmd.Flags(), &qdiagCfg, cfgstruct.ConfDir(defaultConfDir))
}
func cmdRun(cmd *cobra.Command, args []string) (err error) {
@ -224,8 +237,36 @@ func cmdDiag(cmd *cobra.Command, args []string) (err error) {
}
// display the data
err = w.Flush()
return err
return w.Flush()
}
func cmdQDiag(cmd *cobra.Command, args []string) (err error) {
// open the redis db
dbpath := qdiagCfg.DatabaseURL
redisQ, err := redis.NewQueueFrom(dbpath)
if err != nil {
return err
}
queue := queue.NewQueue(redisQ)
list, err := queue.Peekqueue(qdiagCfg.QListLimit)
if err != nil {
return err
}
// initialize the table header (fields)
const padding = 3
w := tabwriter.NewWriter(os.Stdout, 0, 0, padding, ' ', tabwriter.AlignRight|tabwriter.Debug)
fmt.Fprintln(w, "Path\tLost Pieces\t")
// populate the row fields
for _, v := range list {
fmt.Fprint(w, v.GetPath(), "\t", v.GetLostPieces(), "\t")
}
// display the data
return w.Flush()
}
func main() {

View File

@ -13,6 +13,7 @@ import (
type RepairQueue interface {
Enqueue(qi *pb.InjuredSegment) error
Dequeue() (pb.InjuredSegment, error)
Peekqueue(limit int) ([]pb.InjuredSegment, error)
}
// Queue implements the RepairQueue interface
@ -52,3 +53,23 @@ func (q *Queue) Dequeue() (pb.InjuredSegment, error) {
}
return *seg, nil
}
// Peekqueue returns upto 'limit' of the entries from the repair queue
func (q *Queue) Peekqueue(limit int) ([]pb.InjuredSegment, error) {
if limit < 0 || limit > storage.LookupLimit {
limit = storage.LookupLimit
}
result, err := q.db.Peekqueue(limit)
if err != nil {
return []pb.InjuredSegment{}, Error.New("error peeking into repair queue %s", err)
}
segs := make([]pb.InjuredSegment, 0)
for _, v := range result {
seg := &pb.InjuredSegment{}
if err = proto.Unmarshal(v, seg); err != nil {
return nil, err
}
segs = append(segs, *seg)
}
return segs, nil
}

View File

@ -54,6 +54,11 @@ func TestSequential(t *testing.T) {
assert.NoError(t, err)
addSegs = append(addSegs, seg)
}
list, err := q.Peekqueue(100)
assert.NoError(t, err)
for i := 0; i < N; i++ {
assert.True(t, proto.Equal(addSegs[i], &list[i]))
}
for i := 0; i < N; i++ {
dqSeg, err := q.Dequeue()
assert.NoError(t, err)

View File

@ -76,6 +76,8 @@ type Queue interface {
Enqueue(Value) error
//Dequeue removes a FIFO element, returning ErrEmptyQueue if empty
Dequeue() (Value, error)
//Peekqueue returns 'limit' elements from the queue
Peekqueue(limit int) ([]Value, error)
//Close closes the store
Close() error
}

View File

@ -46,3 +46,17 @@ func (client *Queue) Dequeue() (storage.Value, error) {
}
return storage.Value(out), nil
}
// Peekqueue returns upto 1000 entries in the queue without removing
func (client *Queue) Peekqueue(limit int) ([]storage.Value, error) {
cmd := client.db.LRange(queueKey, 0, int64(limit))
items, err := cmd.Result()
if err != nil {
return nil, err
}
result := make([]storage.Value, 0)
for _, v := range items {
result = append(result, storage.Value([]byte(v)))
}
return result, err
}

View File

@ -41,6 +41,24 @@ func (q *Queue) Dequeue() (storage.Value, error) {
return nil, storage.ErrEmptyQueue
}
//Peekqueue gets upto 'limit' entries from the list queue
func (q *Queue) Peekqueue(limit int) ([]storage.Value, error) {
q.mu.Lock()
defer q.mu.Unlock()
if limit < 0 || limit > storage.LookupLimit {
limit = storage.LookupLimit
}
result := make([]storage.Value, 0)
for e := q.s.Front(); e != nil; e = e.Next() {
result = append(result, e.Value.(storage.Value))
limit--
if limit <= 0 {
break
}
}
return result, nil
}
//Close closes the queue
func (q *Queue) Close() error {
return nil

View File

@ -22,6 +22,9 @@ func testBasic(t *testing.T, q storage.Queue) {
assert.NoError(t, err)
err = q.Enqueue(storage.Value([]byte{0, 0, 0, 0, 255, 255, 255, 255}))
assert.NoError(t, err)
list, err := q.Peekqueue(100)
assert.NotNil(t, list)
assert.NoError(t, err)
out, err := q.Dequeue()
assert.NoError(t, err)
assert.Equal(t, out, storage.Value("hello world"))