reorganizes audit package into Service, Cursor, and Verifier (#450)
This commit is contained in:
parent
0d28101085
commit
14b67af327
@ -18,47 +18,47 @@ import (
|
||||
"storj.io/storj/pkg/storage/meta"
|
||||
)
|
||||
|
||||
// Audit to audit segments
|
||||
type Audit struct {
|
||||
// Stripe keeps track of a stripe's index and its parent segment
|
||||
type Stripe struct {
|
||||
Index int
|
||||
Segment *pb.Pointer
|
||||
}
|
||||
|
||||
// Cursor keeps track of audit location in pointer db
|
||||
type Cursor struct {
|
||||
pointers pdbclient.Client
|
||||
lastPath *paths.Path
|
||||
mutex sync.Mutex
|
||||
}
|
||||
|
||||
// NewAudit creates a new instance of audit
|
||||
func NewAudit(pointers pdbclient.Client) *Audit {
|
||||
return &Audit{
|
||||
pointers: pointers,
|
||||
}
|
||||
}
|
||||
|
||||
// Stripe is a struct that contains stripe info
|
||||
type Stripe struct {
|
||||
Index int
|
||||
// NewCursor creates a Cursor which iterates over pointer db
|
||||
func NewCursor(pointers pdbclient.Client) *Cursor {
|
||||
return &Cursor{pointers: pointers}
|
||||
}
|
||||
|
||||
// NextStripe returns a random stripe to be audited
|
||||
func (audit *Audit) NextStripe(ctx context.Context) (stripe *Stripe, more bool, err error) {
|
||||
audit.mutex.Lock()
|
||||
defer audit.mutex.Unlock()
|
||||
func (cursor *Cursor) NextStripe(ctx context.Context) (stripe *Stripe, err error) {
|
||||
cursor.mutex.Lock()
|
||||
defer cursor.mutex.Unlock()
|
||||
|
||||
var pointerItems []pdbclient.ListItem
|
||||
var path paths.Path
|
||||
var more bool
|
||||
|
||||
if audit.lastPath == nil {
|
||||
pointerItems, more, err = audit.pointers.List(ctx, nil, nil, nil, true, 0, meta.None)
|
||||
if cursor.lastPath == nil {
|
||||
pointerItems, more, err = cursor.pointers.List(ctx, nil, nil, nil, true, 0, meta.None)
|
||||
} else {
|
||||
pointerItems, more, err = audit.pointers.List(ctx, nil, *audit.lastPath, nil, true, 0, meta.None)
|
||||
pointerItems, more, err = cursor.pointers.List(ctx, nil, *cursor.lastPath, nil, true, 0, meta.None)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return nil, more, err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// get random pointer
|
||||
pointerItem, err := getRandomPointer(pointerItems)
|
||||
if err != nil {
|
||||
return nil, more, err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// get path
|
||||
@ -66,30 +66,30 @@ func (audit *Audit) NextStripe(ctx context.Context) (stripe *Stripe, more bool,
|
||||
|
||||
// keep track of last path listed
|
||||
if !more {
|
||||
audit.lastPath = nil
|
||||
cursor.lastPath = nil
|
||||
} else {
|
||||
audit.lastPath = &pointerItems[len(pointerItems)-1].Path
|
||||
cursor.lastPath = &pointerItems[len(pointerItems)-1].Path
|
||||
}
|
||||
|
||||
// get pointer info
|
||||
pointer, err := audit.pointers.Get(ctx, path)
|
||||
pointer, err := cursor.pointers.Get(ctx, path)
|
||||
if err != nil {
|
||||
return nil, more, err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// create the erasure scheme so we can get the stripe size
|
||||
es, err := makeErasureScheme(pointer.GetRemote().GetRedundancy())
|
||||
if err != nil {
|
||||
return nil, more, err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
//get random stripe
|
||||
index, err := getRandomStripe(es, pointer)
|
||||
if err != nil {
|
||||
return nil, more, err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &Stripe{Index: index}, more, nil
|
||||
return &Stripe{Index: index, Segment: pointer}, nil
|
||||
}
|
||||
|
||||
// create the erasure scheme
|
@ -115,7 +115,7 @@ func TestAuditSegment(t *testing.T) {
|
||||
|
||||
ctx = auth.WithAPIKey(ctx, nil)
|
||||
|
||||
//PointerDB instantation
|
||||
// PointerDB instantiation
|
||||
db := teststore.New()
|
||||
c := pointerdb.Config{MaxInlineSegmentSize: 8000}
|
||||
|
||||
@ -135,7 +135,7 @@ func TestAuditSegment(t *testing.T) {
|
||||
pointers := pdbclient.New(pdbw)
|
||||
|
||||
// create a pdb client and instance of audit
|
||||
a := NewAudit(pointers)
|
||||
cursor := NewCursor(pointers)
|
||||
|
||||
// put 10 paths in db
|
||||
t.Run("putToDB", func(t *testing.T) {
|
||||
@ -149,7 +149,7 @@ func TestAuditSegment(t *testing.T) {
|
||||
// create putreq. object
|
||||
req := &pb.PutRequest{Path: tt.path.String(), Pointer: putRequest.Pointer}
|
||||
|
||||
//Put pointer into db
|
||||
// put pointer into db
|
||||
_, err := pdbw.Put(ctx, req)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to put %v: error: %v", req.Pointer, err)
|
||||
@ -166,7 +166,7 @@ func TestAuditSegment(t *testing.T) {
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.bm, func(t *testing.T) {
|
||||
assert1 := assert.New(t)
|
||||
stripe, _, err := a.NextStripe(ctx)
|
||||
stripe, err := cursor.NextStripe(ctx)
|
||||
if err != nil {
|
||||
assert1.Error(err)
|
||||
assert1.Nil(stripe)
|
||||
@ -179,7 +179,7 @@ func TestAuditSegment(t *testing.T) {
|
||||
})
|
||||
|
||||
// test to see how random paths are
|
||||
t.Run("probalisticTest", func(t *testing.T) {
|
||||
t.Run("probabilisticTest", func(t *testing.T) {
|
||||
list, _, err := pointers.List(ctx, nil, nil, nil, true, 10, meta.None)
|
||||
if err != nil {
|
||||
t.Error(ErrNoList)
|
50
pkg/audit/service.go
Normal file
50
pkg/audit/service.go
Normal file
@ -0,0 +1,50 @@
|
||||
// Copyright (C) 2018 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package audit
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"storj.io/storj/pkg/overlay"
|
||||
"storj.io/storj/pkg/pb"
|
||||
"storj.io/storj/pkg/pointerdb/pdbclient"
|
||||
"storj.io/storj/pkg/provider"
|
||||
"storj.io/storj/pkg/transport"
|
||||
)
|
||||
|
||||
// Service helps coordinate Cursor and Verifier to run the audit process continuously
|
||||
type Service struct {
|
||||
Cursor *Cursor
|
||||
Verifier *Verifier
|
||||
statdb *statdb
|
||||
}
|
||||
|
||||
// NewService instantiates a Service with access to a Cursor and Verifier
|
||||
func NewService(pointers pdbclient.Client, transport transport.Client, overlay overlay.Client, id provider.FullIdentity) *Service {
|
||||
cursor := NewCursor(pointers)
|
||||
verifier := NewVerifier(transport, overlay, id)
|
||||
return &Service{Cursor: cursor, Verifier: verifier}
|
||||
}
|
||||
|
||||
// Run calls Cursor and Verifier to continuously request random pointers, then verify data correctness at
|
||||
// a random stripe within a segment
|
||||
func (service *Service) Run(ctx context.Context) (err error) {
|
||||
// TODO(James): make this function run indefinitely instead of once
|
||||
stripe, err := service.Cursor.NextStripe(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
failedNodes, err := service.Verifier.verify(ctx, stripe.Index, stripe.Segment)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, fail := range failedNodes {
|
||||
service.statdb.RecordFailedAudit(fail)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type statdb struct{}
|
||||
|
||||
func (db *statdb) RecordFailedAudit(*pb.Node) {}
|
@ -28,8 +28,8 @@ type share struct {
|
||||
Data []byte
|
||||
}
|
||||
|
||||
// Auditor implements the downloader interface
|
||||
type Auditor struct {
|
||||
// Verifier helps verify the correctness of a given stripe
|
||||
type Verifier struct {
|
||||
downloader downloader
|
||||
}
|
||||
|
||||
@ -37,23 +37,21 @@ type downloader interface {
|
||||
DownloadShares(ctx context.Context, pointer *pb.Pointer, stripeIndex int) (shares []share, nodes []*pb.Node, err error)
|
||||
}
|
||||
|
||||
// downloader implements the downloader interface
|
||||
//nolint - defaultDownloader isn't called in tests
|
||||
// defaultDownloader downloads shares from networked storage nodes
|
||||
type defaultDownloader struct {
|
||||
transport transport.Client
|
||||
overlay overlay.Client
|
||||
identity provider.FullIdentity
|
||||
}
|
||||
|
||||
// newDownloader creates a new instance of a defaultDownloader struct
|
||||
//nolint - newDefaultDownloader isn't called in tests
|
||||
func newDefaultDownloader(t transport.Client, o overlay.Client, id provider.FullIdentity) *defaultDownloader {
|
||||
return &defaultDownloader{transport: t, overlay: o, identity: id}
|
||||
// newDefaultDownloader creates a defaultDownloader
|
||||
func newDefaultDownloader(transport transport.Client, overlay overlay.Client, id provider.FullIdentity) *defaultDownloader {
|
||||
return &defaultDownloader{transport: transport, overlay: overlay, identity: id}
|
||||
}
|
||||
|
||||
// NewAuditor creates a new instance of an Auditor struct
|
||||
func NewAuditor(downloader downloader) *Auditor {
|
||||
return &Auditor{downloader: downloader}
|
||||
// NewVerifier creates a Verifier
|
||||
func NewVerifier(transport transport.Client, overlay overlay.Client, id provider.FullIdentity) *Verifier {
|
||||
return &Verifier{downloader: newDefaultDownloader(transport, overlay, id)}
|
||||
}
|
||||
|
||||
func (d *defaultDownloader) dial(ctx context.Context, node *pb.Node) (ps client.PSClient, err error) {
|
||||
@ -159,8 +157,6 @@ func makeCopies(ctx context.Context, originals []share) (copies []infectious.Sha
|
||||
}
|
||||
copies[i].Data = append([]byte(nil), original.Data...)
|
||||
copies[i].Number = original.PieceNumber
|
||||
|
||||
// do i encode inside of copies?
|
||||
}
|
||||
return copies, nil
|
||||
}
|
||||
@ -199,13 +195,11 @@ func calcPadded(size int64, blockSize int) int64 {
|
||||
return size + int64(blockSize) - mod
|
||||
}
|
||||
|
||||
// auditStripe gets remote segments from a pointer and runs an audit on shares
|
||||
// at a given stripe index
|
||||
// TODO(nat): maybe removed required/total here?
|
||||
func (a *Auditor) auditStripe(ctx context.Context, pointer *pb.Pointer, stripeIndex int) (badNodes []*pb.Node, err error) {
|
||||
// verify downloads shares then verifies the data correctness at the given stripe
|
||||
func (verifier *Verifier) verify(ctx context.Context, stripeIndex int, pointer *pb.Pointer) (failedNodes []*pb.Node, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
shares, nodes, err := a.downloader.DownloadShares(ctx, pointer, stripeIndex)
|
||||
shares, nodes, err := verifier.downloader.DownloadShares(ctx, pointer, stripeIndex)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -216,9 +210,10 @@ func (a *Auditor) auditStripe(ctx context.Context, pointer *pb.Pointer, stripeIn
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, pieceNum := range pieceNums {
|
||||
badNodes = append(badNodes, nodes[pieceNum])
|
||||
}
|
||||
|
||||
return badNodes, nil
|
||||
for _, pieceNum := range pieceNums {
|
||||
failedNodes = append(failedNodes, nodes[pieceNum])
|
||||
}
|
||||
// TODO(nat): update statdb with the bad nodes
|
||||
return failedNodes, nil
|
||||
}
|
@ -41,14 +41,14 @@ func TestPassingAudit(t *testing.T) {
|
||||
}
|
||||
}
|
||||
md := mockDownloader{shares: mockShares}
|
||||
auditor := &Auditor{downloader: &md}
|
||||
verifier := &Verifier{downloader: &md}
|
||||
pointer := makePointer(tt.nodeAmt)
|
||||
badNodes, err := auditor.auditStripe(ctx, pointer, 6)
|
||||
failedNodes, err := verifier.verify(ctx, 6, pointer)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if len(badNodes) != 0 {
|
||||
t.Fatal(err)
|
||||
if len(failedNodes) != 0 {
|
||||
t.Fatal("expected there to be no recorded bad nodes")
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -141,7 +141,6 @@ func TestNotEnoughShares(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestCalcPadded(t *testing.T) {
|
||||
|
||||
for _, tt := range []struct {
|
||||
segSize int64
|
||||
blockSize int
|
Loading…
Reference in New Issue
Block a user