From 7958994ae23804af636d3382cba751f5ee821058 Mon Sep 17 00:00:00 2001 From: aligeti <34487396+aligeti@users.noreply.github.com> Date: Fri, 16 Nov 2018 08:31:33 -0500 Subject: [PATCH] Diagnostic tool to inspect repair queue (#656) * initial repair queue diag tool development * fixes linter warnings * code review updates --- cmd/satellite/main.go | 47 ++++++++++++++++++++++++++++-- pkg/datarepair/queue/queue.go | 21 +++++++++++++ pkg/datarepair/queue/queue_test.go | 5 ++++ storage/common.go | 2 ++ storage/redis/client_queue.go | 14 +++++++++ storage/testqueue/queue.go | 18 ++++++++++++ storage/testsuite/test_queue.go | 3 ++ 7 files changed, 107 insertions(+), 3 deletions(-) diff --git a/cmd/satellite/main.go b/cmd/satellite/main.go index cf051ba32..f10fa458e 100644 --- a/cmd/satellite/main.go +++ b/cmd/satellite/main.go @@ -18,8 +18,9 @@ import ( "storj.io/storj/pkg/auth/grpcauth" "storj.io/storj/pkg/bwagreement" - "storj.io/storj/pkg/bwagreement/database-manager" + dbmanager "storj.io/storj/pkg/bwagreement/database-manager" "storj.io/storj/pkg/cfgstruct" + "storj.io/storj/pkg/datarepair/queue" "storj.io/storj/pkg/kademlia" "storj.io/storj/pkg/overlay" mockOverlay "storj.io/storj/pkg/overlay/mocks" @@ -28,6 +29,7 @@ import ( "storj.io/storj/pkg/process" "storj.io/storj/pkg/provider" "storj.io/storj/pkg/statdb" + "storj.io/storj/storage/redis" ) var ( @@ -50,6 +52,11 @@ var ( Short: "Diagnostic Tool support", RunE: cmdDiag, } + qdiagCmd = &cobra.Command{ + Use: "qdiag", + Short: "Repair Queue Diagnostic Tool support", + RunE: cmdQDiag, + } runCfg struct { Identity provider.IdentityConfig @@ -73,6 +80,10 @@ var ( diagCfg struct { DatabaseURL string `help:"the database connection string to use" default:"sqlite3://$CONFDIR/bw.db"` } + qdiagCfg struct { + DatabaseURL string `help:"the database connection string to use" default:"redis://127.0.0.1:6378?db=1&password=abc123"` + QListLimit int `help:"maximum segments that can be requested" default:"1000"` + } defaultConfDir = "$HOME/.storj/satellite" ) @@ -81,9 +92,11 @@ func init() { rootCmd.AddCommand(runCmd) rootCmd.AddCommand(setupCmd) rootCmd.AddCommand(diagCmd) + rootCmd.AddCommand(qdiagCmd) cfgstruct.Bind(runCmd.Flags(), &runCfg, cfgstruct.ConfDir(defaultConfDir)) cfgstruct.Bind(setupCmd.Flags(), &setupCfg, cfgstruct.ConfDir(defaultConfDir)) cfgstruct.Bind(diagCmd.Flags(), &diagCfg, cfgstruct.ConfDir(defaultConfDir)) + cfgstruct.Bind(qdiagCmd.Flags(), &qdiagCfg, cfgstruct.ConfDir(defaultConfDir)) } func cmdRun(cmd *cobra.Command, args []string) (err error) { @@ -224,8 +237,36 @@ func cmdDiag(cmd *cobra.Command, args []string) (err error) { } // display the data - err = w.Flush() - return err + return w.Flush() +} + +func cmdQDiag(cmd *cobra.Command, args []string) (err error) { + // open the redis db + dbpath := qdiagCfg.DatabaseURL + + redisQ, err := redis.NewQueueFrom(dbpath) + if err != nil { + return err + } + + queue := queue.NewQueue(redisQ) + list, err := queue.Peekqueue(qdiagCfg.QListLimit) + if err != nil { + return err + } + + // initialize the table header (fields) + const padding = 3 + w := tabwriter.NewWriter(os.Stdout, 0, 0, padding, ' ', tabwriter.AlignRight|tabwriter.Debug) + fmt.Fprintln(w, "Path\tLost Pieces\t") + + // populate the row fields + for _, v := range list { + fmt.Fprint(w, v.GetPath(), "\t", v.GetLostPieces(), "\t") + } + + // display the data + return w.Flush() } func main() { diff --git a/pkg/datarepair/queue/queue.go b/pkg/datarepair/queue/queue.go index e49148536..201ac2a81 100644 --- a/pkg/datarepair/queue/queue.go +++ b/pkg/datarepair/queue/queue.go @@ -13,6 +13,7 @@ import ( type RepairQueue interface { Enqueue(qi *pb.InjuredSegment) error Dequeue() (pb.InjuredSegment, error) + Peekqueue(limit int) ([]pb.InjuredSegment, error) } // Queue implements the RepairQueue interface @@ -52,3 +53,23 @@ func (q *Queue) Dequeue() (pb.InjuredSegment, error) { } return *seg, nil } + +// Peekqueue returns upto 'limit' of the entries from the repair queue +func (q *Queue) Peekqueue(limit int) ([]pb.InjuredSegment, error) { + if limit < 0 || limit > storage.LookupLimit { + limit = storage.LookupLimit + } + result, err := q.db.Peekqueue(limit) + if err != nil { + return []pb.InjuredSegment{}, Error.New("error peeking into repair queue %s", err) + } + segs := make([]pb.InjuredSegment, 0) + for _, v := range result { + seg := &pb.InjuredSegment{} + if err = proto.Unmarshal(v, seg); err != nil { + return nil, err + } + segs = append(segs, *seg) + } + return segs, nil +} diff --git a/pkg/datarepair/queue/queue_test.go b/pkg/datarepair/queue/queue_test.go index 0af6b74f0..4f45bb154 100644 --- a/pkg/datarepair/queue/queue_test.go +++ b/pkg/datarepair/queue/queue_test.go @@ -54,6 +54,11 @@ func TestSequential(t *testing.T) { assert.NoError(t, err) addSegs = append(addSegs, seg) } + list, err := q.Peekqueue(100) + assert.NoError(t, err) + for i := 0; i < N; i++ { + assert.True(t, proto.Equal(addSegs[i], &list[i])) + } for i := 0; i < N; i++ { dqSeg, err := q.Dequeue() assert.NoError(t, err) diff --git a/storage/common.go b/storage/common.go index b106215bd..eec96fe3b 100644 --- a/storage/common.go +++ b/storage/common.go @@ -76,6 +76,8 @@ type Queue interface { Enqueue(Value) error //Dequeue removes a FIFO element, returning ErrEmptyQueue if empty Dequeue() (Value, error) + //Peekqueue returns 'limit' elements from the queue + Peekqueue(limit int) ([]Value, error) //Close closes the store Close() error } diff --git a/storage/redis/client_queue.go b/storage/redis/client_queue.go index e60d38ae6..dff35910a 100644 --- a/storage/redis/client_queue.go +++ b/storage/redis/client_queue.go @@ -46,3 +46,17 @@ func (client *Queue) Dequeue() (storage.Value, error) { } return storage.Value(out), nil } + +// Peekqueue returns upto 1000 entries in the queue without removing +func (client *Queue) Peekqueue(limit int) ([]storage.Value, error) { + cmd := client.db.LRange(queueKey, 0, int64(limit)) + items, err := cmd.Result() + if err != nil { + return nil, err + } + result := make([]storage.Value, 0) + for _, v := range items { + result = append(result, storage.Value([]byte(v))) + } + return result, err +} diff --git a/storage/testqueue/queue.go b/storage/testqueue/queue.go index 8b813197d..008ab9f3a 100644 --- a/storage/testqueue/queue.go +++ b/storage/testqueue/queue.go @@ -41,6 +41,24 @@ func (q *Queue) Dequeue() (storage.Value, error) { return nil, storage.ErrEmptyQueue } +//Peekqueue gets upto 'limit' entries from the list queue +func (q *Queue) Peekqueue(limit int) ([]storage.Value, error) { + q.mu.Lock() + defer q.mu.Unlock() + if limit < 0 || limit > storage.LookupLimit { + limit = storage.LookupLimit + } + result := make([]storage.Value, 0) + for e := q.s.Front(); e != nil; e = e.Next() { + result = append(result, e.Value.(storage.Value)) + limit-- + if limit <= 0 { + break + } + } + return result, nil +} + //Close closes the queue func (q *Queue) Close() error { return nil diff --git a/storage/testsuite/test_queue.go b/storage/testsuite/test_queue.go index 0871eeaa3..eb9ffbeb6 100644 --- a/storage/testsuite/test_queue.go +++ b/storage/testsuite/test_queue.go @@ -22,6 +22,9 @@ func testBasic(t *testing.T, q storage.Queue) { assert.NoError(t, err) err = q.Enqueue(storage.Value([]byte{0, 0, 0, 0, 255, 255, 255, 255})) assert.NoError(t, err) + list, err := q.Peekqueue(100) + assert.NotNil(t, list) + assert.NoError(t, err) out, err := q.Dequeue() assert.NoError(t, err) assert.Equal(t, out, storage.Value("hello world"))