Replace pdb client with server in audit service (#1016)
* add logger to audit service * use pointerdb instead of pdbclient in audit * linter fixes
This commit is contained in:
parent
625ae46ae5
commit
0642592420
@ -13,7 +13,7 @@ import (
|
||||
|
||||
"storj.io/storj/pkg/eestream"
|
||||
"storj.io/storj/pkg/pb"
|
||||
"storj.io/storj/pkg/pointerdb/pdbclient"
|
||||
"storj.io/storj/pkg/pointerdb"
|
||||
"storj.io/storj/pkg/storage/meta"
|
||||
"storj.io/storj/pkg/storj"
|
||||
)
|
||||
@ -28,13 +28,13 @@ type Stripe struct {
|
||||
|
||||
// Cursor keeps track of audit location in pointer db
|
||||
type Cursor struct {
|
||||
pointers pdbclient.Client
|
||||
pointers *pointerdb.Server
|
||||
lastPath storj.Path
|
||||
mutex sync.Mutex
|
||||
}
|
||||
|
||||
// NewCursor creates a Cursor which iterates over pointer db
|
||||
func NewCursor(pointers pdbclient.Client) *Cursor {
|
||||
func NewCursor(pointers *pointerdb.Server) *Cursor {
|
||||
return &Cursor{pointers: pointers}
|
||||
}
|
||||
|
||||
@ -43,19 +43,25 @@ func (cursor *Cursor) NextStripe(ctx context.Context) (stripe *Stripe, err error
|
||||
cursor.mutex.Lock()
|
||||
defer cursor.mutex.Unlock()
|
||||
|
||||
var pointerItems []pdbclient.ListItem
|
||||
var pointerItems []*pb.ListResponse_Item
|
||||
var path storj.Path
|
||||
var more bool
|
||||
|
||||
if cursor.lastPath == "" {
|
||||
pointerItems, more, err = cursor.pointers.List(ctx, "", "", "", true, 0, meta.None)
|
||||
} else {
|
||||
pointerItems, more, err = cursor.pointers.List(ctx, "", cursor.lastPath, "", true, 0, meta.None)
|
||||
}
|
||||
listRes, err := cursor.pointers.List(ctx, &pb.ListRequest{
|
||||
Prefix: "",
|
||||
StartAfter: cursor.lastPath,
|
||||
EndBefore: "",
|
||||
Recursive: true,
|
||||
Limit: 0,
|
||||
MetaFlags: meta.None,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
pointerItems = listRes.GetItems()
|
||||
more = listRes.GetMore()
|
||||
|
||||
if len(pointerItems) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
@ -75,10 +81,13 @@ func (cursor *Cursor) NextStripe(ctx context.Context) (stripe *Stripe, err error
|
||||
}
|
||||
|
||||
// get pointer info
|
||||
pointer, _, _, err := cursor.pointers.Get(ctx, path)
|
||||
getRes, err := cursor.pointers.Get(ctx, &pb.GetRequest{Path: path})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
pointer := getRes.GetPointer()
|
||||
pba := getRes.GetPba()
|
||||
authorization := getRes.GetAuthorization()
|
||||
|
||||
if pointer.GetType() != pb.Pointer_REMOTE {
|
||||
return nil, nil
|
||||
@ -99,12 +108,12 @@ func (cursor *Cursor) NextStripe(ctx context.Context) (stripe *Stripe, err error
|
||||
return nil, err
|
||||
}
|
||||
|
||||
authorization := cursor.pointers.SignedMessage()
|
||||
pba, err := cursor.pointers.PayerBandwidthAllocation(ctx, pb.PayerBandwidthAllocation_GET_AUDIT)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &Stripe{Index: index, Segment: pointer, PBA: pba, Authorization: authorization}, nil
|
||||
return &Stripe{
|
||||
Index: index,
|
||||
Segment: pointer,
|
||||
PBA: pba,
|
||||
Authorization: authorization,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func makeErasureScheme(rs *pb.RedundancyScheme) (eestream.ErasureScheme, error) {
|
||||
@ -134,10 +143,10 @@ func getRandomStripe(es eestream.ErasureScheme, pointer *pb.Pointer) (index int,
|
||||
return int(randomStripeIndex.Int64()), nil
|
||||
}
|
||||
|
||||
func getRandomPointer(pointerItems []pdbclient.ListItem) (pointer pdbclient.ListItem, err error) {
|
||||
func getRandomPointer(pointerItems []*pb.ListResponse_Item) (pointer *pb.ListResponse_Item, err error) {
|
||||
randomNum, err := rand.Int(rand.Reader, big.NewInt(int64(len(pointerItems))))
|
||||
if err != nil {
|
||||
return pdbclient.ListItem{}, err
|
||||
return &pb.ListResponse_Item{}, err
|
||||
}
|
||||
randomNumInt64 := randomNum.Int64()
|
||||
pointerItem := pointerItems[randomNumInt64]
|
||||
|
@ -14,7 +14,6 @@ import (
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"go.uber.org/zap"
|
||||
"google.golang.org/grpc"
|
||||
|
||||
"storj.io/storj/internal/testidentity"
|
||||
"storj.io/storj/internal/teststorj"
|
||||
@ -22,7 +21,6 @@ import (
|
||||
"storj.io/storj/pkg/overlay"
|
||||
"storj.io/storj/pkg/pb"
|
||||
"storj.io/storj/pkg/pointerdb"
|
||||
"storj.io/storj/pkg/pointerdb/pdbclient"
|
||||
"storj.io/storj/pkg/storage/meta"
|
||||
"storj.io/storj/pkg/storj"
|
||||
"storj.io/storj/storage/teststore"
|
||||
@ -34,35 +32,6 @@ var (
|
||||
ErrNoNum = errors.New("num error: failed to get num")
|
||||
)
|
||||
|
||||
// pointerDBWrapper wraps pb.PointerDBServer to be compatible with pb.PointerDBClient
|
||||
type pointerDBWrapper struct {
|
||||
s pb.PointerDBServer
|
||||
}
|
||||
|
||||
func newPointerDBWrapper(pdbs pb.PointerDBServer) pb.PointerDBClient {
|
||||
return &pointerDBWrapper{pdbs}
|
||||
}
|
||||
|
||||
func (pbd *pointerDBWrapper) Put(ctx context.Context, in *pb.PutRequest, opts ...grpc.CallOption) (*pb.PutResponse, error) {
|
||||
return pbd.s.Put(ctx, in)
|
||||
}
|
||||
|
||||
func (pbd *pointerDBWrapper) Get(ctx context.Context, in *pb.GetRequest, opts ...grpc.CallOption) (*pb.GetResponse, error) {
|
||||
return pbd.s.Get(ctx, in)
|
||||
}
|
||||
|
||||
func (pbd *pointerDBWrapper) List(ctx context.Context, in *pb.ListRequest, opts ...grpc.CallOption) (*pb.ListResponse, error) {
|
||||
return pbd.s.List(ctx, in)
|
||||
}
|
||||
|
||||
func (pbd *pointerDBWrapper) Delete(ctx context.Context, in *pb.DeleteRequest, opts ...grpc.CallOption) (*pb.DeleteResponse, error) {
|
||||
return pbd.s.Delete(ctx, in)
|
||||
}
|
||||
|
||||
func (pbd *pointerDBWrapper) PayerBandwidthAllocation(ctx context.Context, in *pb.PayerBandwidthAllocationRequest, opts ...grpc.CallOption) (*pb.PayerBandwidthAllocationResponse, error) {
|
||||
return pbd.s.PayerBandwidthAllocation(ctx, in)
|
||||
}
|
||||
|
||||
func TestAuditSegment(t *testing.T) {
|
||||
type pathCount struct {
|
||||
path storj.Path
|
||||
@ -131,8 +100,7 @@ func TestAuditSegment(t *testing.T) {
|
||||
|
||||
cache := overlay.NewCache(teststore.New(), nil)
|
||||
|
||||
pdbw := newPointerDBWrapper(pointerdb.NewServer(db, cache, zap.NewNop(), c, identity))
|
||||
pointers := pdbclient.New(pdbw)
|
||||
pointers := pointerdb.NewServer(db, cache, zap.NewNop(), c, identity)
|
||||
|
||||
// create a pdb client and instance of audit
|
||||
cursor := NewCursor(pointers)
|
||||
@ -150,7 +118,7 @@ func TestAuditSegment(t *testing.T) {
|
||||
req := &pb.PutRequest{Path: tt.path, Pointer: putRequest.Pointer}
|
||||
|
||||
// put pointer into db
|
||||
_, err := pdbw.Put(ctx, req)
|
||||
_, err := pointers.Put(ctx, req)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to put %v: error: %v", req.Pointer, err)
|
||||
assert1.NotNil(err)
|
||||
@ -180,11 +148,20 @@ func TestAuditSegment(t *testing.T) {
|
||||
|
||||
// test to see how random paths are
|
||||
t.Run("probabilisticTest", func(t *testing.T) {
|
||||
list, _, err := pointers.List(ctx, "", "", "", true, 10, meta.None)
|
||||
listRes, err := pointers.List(ctx, &pb.ListRequest{
|
||||
Prefix: "",
|
||||
StartAfter: "",
|
||||
EndBefore: "",
|
||||
Recursive: true,
|
||||
Limit: 10,
|
||||
MetaFlags: meta.None,
|
||||
})
|
||||
if err != nil {
|
||||
t.Error(ErrNoList)
|
||||
}
|
||||
|
||||
list := listRes.GetItems()
|
||||
|
||||
// get count of items picked at random
|
||||
uniquePathCounted := []pathCount{}
|
||||
pathCounter := []pathCount{}
|
||||
|
@ -10,13 +10,14 @@ import (
|
||||
"go.uber.org/zap"
|
||||
|
||||
"storj.io/storj/pkg/overlay"
|
||||
"storj.io/storj/pkg/pointerdb/pdbclient"
|
||||
"storj.io/storj/pkg/pointerdb"
|
||||
"storj.io/storj/pkg/provider"
|
||||
"storj.io/storj/pkg/transport"
|
||||
)
|
||||
|
||||
// Service helps coordinate Cursor and Verifier to run the audit process continuously
|
||||
type Service struct {
|
||||
log *zap.Logger
|
||||
Cursor *Cursor
|
||||
Verifier *Verifier
|
||||
Reporter reporter
|
||||
@ -34,7 +35,7 @@ type Config struct {
|
||||
// Run runs the repairer with the configured values
|
||||
func (c Config) Run(ctx context.Context, server *provider.Provider) (err error) {
|
||||
identity := server.Identity()
|
||||
pointers, err := pdbclient.NewClient(identity, c.SatelliteAddr, c.APIKey)
|
||||
pointers := pointerdb.LoadFromContext(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -43,19 +44,21 @@ func (c Config) Run(ctx context.Context, server *provider.Provider) (err error)
|
||||
return err
|
||||
}
|
||||
transport := transport.NewClient(identity)
|
||||
service, err := NewService(ctx, c.SatelliteAddr, c.Interval, c.MaxRetriesStatDB, pointers, transport, overlay, *identity, c.APIKey)
|
||||
|
||||
log := zap.L()
|
||||
service, err := NewService(ctx, log, c.SatelliteAddr, c.Interval, c.MaxRetriesStatDB, pointers, transport, overlay, *identity, c.APIKey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
go func() {
|
||||
err := service.Run(ctx)
|
||||
zap.S().Error("audit service failed to run:", zap.Error(err))
|
||||
service.log.Error("audit service failed to run:", zap.Error(err))
|
||||
}()
|
||||
return server.Run(ctx)
|
||||
}
|
||||
|
||||
// NewService instantiates a Service with access to a Cursor and Verifier
|
||||
func NewService(ctx context.Context, statDBPort string, interval time.Duration, maxRetries int, pointers pdbclient.Client, transport transport.Client, overlay overlay.Client,
|
||||
func NewService(ctx context.Context, log *zap.Logger, statDBPort string, interval time.Duration, maxRetries int, pointers *pointerdb.Server, transport transport.Client, overlay overlay.Client,
|
||||
identity provider.FullIdentity, apiKey string) (service *Service, err error) {
|
||||
cursor := NewCursor(pointers)
|
||||
verifier := NewVerifier(transport, overlay, identity)
|
||||
@ -65,6 +68,7 @@ func NewService(ctx context.Context, statDBPort string, interval time.Duration,
|
||||
}
|
||||
|
||||
return &Service{
|
||||
log: log,
|
||||
Cursor: cursor,
|
||||
Verifier: verifier,
|
||||
Reporter: reporter,
|
||||
@ -75,12 +79,12 @@ func NewService(ctx context.Context, statDBPort string, interval time.Duration,
|
||||
// Run runs auditing service
|
||||
func (service *Service) Run(ctx context.Context) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
zap.S().Info("Audit cron is starting up")
|
||||
service.log.Info("Audit cron is starting up")
|
||||
|
||||
for {
|
||||
err := service.process(ctx)
|
||||
if err != nil {
|
||||
zap.L().Error("process", zap.Error(err))
|
||||
service.log.Error("process", zap.Error(err))
|
||||
}
|
||||
|
||||
select {
|
||||
|
Loading…
Reference in New Issue
Block a user