From 06425924202998d01f58956b5368307bbe03b0ca Mon Sep 17 00:00:00 2001 From: Maximillian von Briesen Date: Thu, 10 Jan 2019 11:35:18 -0500 Subject: [PATCH] Replace pdb client with server in audit service (#1016) * add logger to audit service * use pointerdb instead of pdbclient in audit * linter fixes --- pkg/audit/cursor.go | 45 +++++++++++++++++++++++--------------- pkg/audit/cursor_test.go | 47 ++++++++++------------------------------ pkg/audit/service.go | 18 +++++++++------ 3 files changed, 50 insertions(+), 60 deletions(-) diff --git a/pkg/audit/cursor.go b/pkg/audit/cursor.go index ee117d1ec..18839f4c9 100644 --- a/pkg/audit/cursor.go +++ b/pkg/audit/cursor.go @@ -13,7 +13,7 @@ import ( "storj.io/storj/pkg/eestream" "storj.io/storj/pkg/pb" - "storj.io/storj/pkg/pointerdb/pdbclient" + "storj.io/storj/pkg/pointerdb" "storj.io/storj/pkg/storage/meta" "storj.io/storj/pkg/storj" ) @@ -28,13 +28,13 @@ type Stripe struct { // Cursor keeps track of audit location in pointer db type Cursor struct { - pointers pdbclient.Client + pointers *pointerdb.Server lastPath storj.Path mutex sync.Mutex } // NewCursor creates a Cursor which iterates over pointer db -func NewCursor(pointers pdbclient.Client) *Cursor { +func NewCursor(pointers *pointerdb.Server) *Cursor { return &Cursor{pointers: pointers} } @@ -43,19 +43,25 @@ func (cursor *Cursor) NextStripe(ctx context.Context) (stripe *Stripe, err error cursor.mutex.Lock() defer cursor.mutex.Unlock() - var pointerItems []pdbclient.ListItem + var pointerItems []*pb.ListResponse_Item var path storj.Path var more bool - if cursor.lastPath == "" { - pointerItems, more, err = cursor.pointers.List(ctx, "", "", "", true, 0, meta.None) - } else { - pointerItems, more, err = cursor.pointers.List(ctx, "", cursor.lastPath, "", true, 0, meta.None) - } + listRes, err := cursor.pointers.List(ctx, &pb.ListRequest{ + Prefix: "", + StartAfter: cursor.lastPath, + EndBefore: "", + Recursive: true, + Limit: 0, + MetaFlags: meta.None, + }) if err != nil { return nil, err } + pointerItems = listRes.GetItems() + more = listRes.GetMore() + if len(pointerItems) == 0 { return nil, nil } @@ -75,10 +81,13 @@ func (cursor *Cursor) NextStripe(ctx context.Context) (stripe *Stripe, err error } // get pointer info - pointer, _, _, err := cursor.pointers.Get(ctx, path) + getRes, err := cursor.pointers.Get(ctx, &pb.GetRequest{Path: path}) if err != nil { return nil, err } + pointer := getRes.GetPointer() + pba := getRes.GetPba() + authorization := getRes.GetAuthorization() if pointer.GetType() != pb.Pointer_REMOTE { return nil, nil @@ -99,12 +108,12 @@ func (cursor *Cursor) NextStripe(ctx context.Context) (stripe *Stripe, err error return nil, err } - authorization := cursor.pointers.SignedMessage() - pba, err := cursor.pointers.PayerBandwidthAllocation(ctx, pb.PayerBandwidthAllocation_GET_AUDIT) - if err != nil { - return nil, err - } - return &Stripe{Index: index, Segment: pointer, PBA: pba, Authorization: authorization}, nil + return &Stripe{ + Index: index, + Segment: pointer, + PBA: pba, + Authorization: authorization, + }, nil } func makeErasureScheme(rs *pb.RedundancyScheme) (eestream.ErasureScheme, error) { @@ -134,10 +143,10 @@ func getRandomStripe(es eestream.ErasureScheme, pointer *pb.Pointer) (index int, return int(randomStripeIndex.Int64()), nil } -func getRandomPointer(pointerItems []pdbclient.ListItem) (pointer pdbclient.ListItem, err error) { +func getRandomPointer(pointerItems []*pb.ListResponse_Item) (pointer *pb.ListResponse_Item, err error) { randomNum, err := rand.Int(rand.Reader, big.NewInt(int64(len(pointerItems)))) if err != nil { - return pdbclient.ListItem{}, err + return &pb.ListResponse_Item{}, err } randomNumInt64 := randomNum.Int64() pointerItem := pointerItems[randomNumInt64] diff --git a/pkg/audit/cursor_test.go b/pkg/audit/cursor_test.go index 477377e19..25228bebf 100644 --- a/pkg/audit/cursor_test.go +++ b/pkg/audit/cursor_test.go @@ -14,7 +14,6 @@ import ( "github.com/stretchr/testify/assert" "go.uber.org/zap" - "google.golang.org/grpc" "storj.io/storj/internal/testidentity" "storj.io/storj/internal/teststorj" @@ -22,7 +21,6 @@ import ( "storj.io/storj/pkg/overlay" "storj.io/storj/pkg/pb" "storj.io/storj/pkg/pointerdb" - "storj.io/storj/pkg/pointerdb/pdbclient" "storj.io/storj/pkg/storage/meta" "storj.io/storj/pkg/storj" "storj.io/storj/storage/teststore" @@ -34,35 +32,6 @@ var ( ErrNoNum = errors.New("num error: failed to get num") ) -// pointerDBWrapper wraps pb.PointerDBServer to be compatible with pb.PointerDBClient -type pointerDBWrapper struct { - s pb.PointerDBServer -} - -func newPointerDBWrapper(pdbs pb.PointerDBServer) pb.PointerDBClient { - return &pointerDBWrapper{pdbs} -} - -func (pbd *pointerDBWrapper) Put(ctx context.Context, in *pb.PutRequest, opts ...grpc.CallOption) (*pb.PutResponse, error) { - return pbd.s.Put(ctx, in) -} - -func (pbd *pointerDBWrapper) Get(ctx context.Context, in *pb.GetRequest, opts ...grpc.CallOption) (*pb.GetResponse, error) { - return pbd.s.Get(ctx, in) -} - -func (pbd *pointerDBWrapper) List(ctx context.Context, in *pb.ListRequest, opts ...grpc.CallOption) (*pb.ListResponse, error) { - return pbd.s.List(ctx, in) -} - -func (pbd *pointerDBWrapper) Delete(ctx context.Context, in *pb.DeleteRequest, opts ...grpc.CallOption) (*pb.DeleteResponse, error) { - return pbd.s.Delete(ctx, in) -} - -func (pbd *pointerDBWrapper) PayerBandwidthAllocation(ctx context.Context, in *pb.PayerBandwidthAllocationRequest, opts ...grpc.CallOption) (*pb.PayerBandwidthAllocationResponse, error) { - return pbd.s.PayerBandwidthAllocation(ctx, in) -} - func TestAuditSegment(t *testing.T) { type pathCount struct { path storj.Path @@ -131,8 +100,7 @@ func TestAuditSegment(t *testing.T) { cache := overlay.NewCache(teststore.New(), nil) - pdbw := newPointerDBWrapper(pointerdb.NewServer(db, cache, zap.NewNop(), c, identity)) - pointers := pdbclient.New(pdbw) + pointers := pointerdb.NewServer(db, cache, zap.NewNop(), c, identity) // create a pdb client and instance of audit cursor := NewCursor(pointers) @@ -150,7 +118,7 @@ func TestAuditSegment(t *testing.T) { req := &pb.PutRequest{Path: tt.path, Pointer: putRequest.Pointer} // put pointer into db - _, err := pdbw.Put(ctx, req) + _, err := pointers.Put(ctx, req) if err != nil { t.Fatalf("failed to put %v: error: %v", req.Pointer, err) assert1.NotNil(err) @@ -180,11 +148,20 @@ func TestAuditSegment(t *testing.T) { // test to see how random paths are t.Run("probabilisticTest", func(t *testing.T) { - list, _, err := pointers.List(ctx, "", "", "", true, 10, meta.None) + listRes, err := pointers.List(ctx, &pb.ListRequest{ + Prefix: "", + StartAfter: "", + EndBefore: "", + Recursive: true, + Limit: 10, + MetaFlags: meta.None, + }) if err != nil { t.Error(ErrNoList) } + list := listRes.GetItems() + // get count of items picked at random uniquePathCounted := []pathCount{} pathCounter := []pathCount{} diff --git a/pkg/audit/service.go b/pkg/audit/service.go index 5986560b8..5587cb015 100644 --- a/pkg/audit/service.go +++ b/pkg/audit/service.go @@ -10,13 +10,14 @@ import ( "go.uber.org/zap" "storj.io/storj/pkg/overlay" - "storj.io/storj/pkg/pointerdb/pdbclient" + "storj.io/storj/pkg/pointerdb" "storj.io/storj/pkg/provider" "storj.io/storj/pkg/transport" ) // Service helps coordinate Cursor and Verifier to run the audit process continuously type Service struct { + log *zap.Logger Cursor *Cursor Verifier *Verifier Reporter reporter @@ -34,7 +35,7 @@ type Config struct { // Run runs the repairer with the configured values func (c Config) Run(ctx context.Context, server *provider.Provider) (err error) { identity := server.Identity() - pointers, err := pdbclient.NewClient(identity, c.SatelliteAddr, c.APIKey) + pointers := pointerdb.LoadFromContext(ctx) if err != nil { return err } @@ -43,19 +44,21 @@ func (c Config) Run(ctx context.Context, server *provider.Provider) (err error) return err } transport := transport.NewClient(identity) - service, err := NewService(ctx, c.SatelliteAddr, c.Interval, c.MaxRetriesStatDB, pointers, transport, overlay, *identity, c.APIKey) + + log := zap.L() + service, err := NewService(ctx, log, c.SatelliteAddr, c.Interval, c.MaxRetriesStatDB, pointers, transport, overlay, *identity, c.APIKey) if err != nil { return err } go func() { err := service.Run(ctx) - zap.S().Error("audit service failed to run:", zap.Error(err)) + service.log.Error("audit service failed to run:", zap.Error(err)) }() return server.Run(ctx) } // NewService instantiates a Service with access to a Cursor and Verifier -func NewService(ctx context.Context, statDBPort string, interval time.Duration, maxRetries int, pointers pdbclient.Client, transport transport.Client, overlay overlay.Client, +func NewService(ctx context.Context, log *zap.Logger, statDBPort string, interval time.Duration, maxRetries int, pointers *pointerdb.Server, transport transport.Client, overlay overlay.Client, identity provider.FullIdentity, apiKey string) (service *Service, err error) { cursor := NewCursor(pointers) verifier := NewVerifier(transport, overlay, identity) @@ -65,6 +68,7 @@ func NewService(ctx context.Context, statDBPort string, interval time.Duration, } return &Service{ + log: log, Cursor: cursor, Verifier: verifier, Reporter: reporter, @@ -75,12 +79,12 @@ func NewService(ctx context.Context, statDBPort string, interval time.Duration, // Run runs auditing service func (service *Service) Run(ctx context.Context) (err error) { defer mon.Task()(&ctx)(&err) - zap.S().Info("Audit cron is starting up") + service.log.Info("Audit cron is starting up") for { err := service.process(ctx) if err != nil { - zap.L().Error("process", zap.Error(err)) + service.log.Error("process", zap.Error(err)) } select {