diff --git a/satellite/audit/chore.go b/satellite/audit/chore.go index 1c67762a1..802d889c8 100644 --- a/satellite/audit/chore.go +++ b/satellite/audit/chore.go @@ -10,7 +10,6 @@ import ( "go.uber.org/zap" - "storj.io/common/storj" "storj.io/common/sync2" "storj.io/storj/satellite/metainfo" ) @@ -53,30 +52,30 @@ func (chore *Chore) Run(ctx context.Context) (err error) { return err } - pathCollector := NewPathCollector(chore.config.Slots, chore.rand) - err = chore.metainfoLoop.Join(ctx, pathCollector) + collector := NewCollector(chore.config.Slots, chore.rand) + err = chore.metainfoLoop.Join(ctx, collector) if err != nil { chore.log.Error("error joining metainfoloop", zap.Error(err)) return nil } - var newQueue []storj.Path - queuePaths := make(map[storj.Path]struct{}) + var newQueue []Segment + queueSegments := make(map[Segment]struct{}) - // Add reservoir paths to queue in pseudorandom order. + // Add reservoir segments to queue in pseudorandom order. for i := 0; i < chore.config.Slots; i++ { - for _, res := range pathCollector.Reservoirs { - // Skip reservoir if no path at this index. - if len(res.Paths) <= i { + for _, res := range collector.Reservoirs { + // Skip reservoir if no segment at this index. + if len(res.Segments) <= i { continue } - path := res.Paths[i] - if path == "" { + segment := res.Segments[i] + if segment == (Segment{}) { continue } - if _, ok := queuePaths[path]; !ok { - newQueue = append(newQueue, path) - queuePaths[path] = struct{}{} + if _, ok := queueSegments[segment]; !ok { + newQueue = append(newQueue, segment) + queueSegments[segment] = struct{}{} } } } diff --git a/satellite/audit/pathcollector.go b/satellite/audit/collector.go similarity index 56% rename from satellite/audit/pathcollector.go rename to satellite/audit/collector.go index 9c511bcb5..a87419b82 100644 --- a/satellite/audit/pathcollector.go +++ b/satellite/audit/collector.go @@ -11,20 +11,18 @@ import ( "storj.io/storj/satellite/metainfo" ) -var _ metainfo.Observer = (*PathCollector)(nil) +var _ metainfo.Observer = (*Collector)(nil) -// PathCollector uses the metainfo loop to add paths to node reservoirs. -// -// architecture: Observer -type PathCollector struct { +// Collector uses the metainfo loop to add segments to node reservoirs. +type Collector struct { Reservoirs map[storj.NodeID]*Reservoir slotCount int rand *rand.Rand } -// NewPathCollector instantiates a path collector. -func NewPathCollector(reservoirSlots int, r *rand.Rand) *PathCollector { - return &PathCollector{ +// NewCollector instantiates a segment collector. +func NewCollector(reservoirSlots int, r *rand.Rand) *Collector { + return &Collector{ Reservoirs: make(map[storj.NodeID]*Reservoir), slotCount: reservoirSlots, rand: r, @@ -32,24 +30,22 @@ func NewPathCollector(reservoirSlots int, r *rand.Rand) *PathCollector { } // RemoteSegment takes a remote segment found in metainfo and creates a reservoir for it if it doesn't exist already. -func (collector *PathCollector) RemoteSegment(ctx context.Context, segment *metainfo.Segment) (err error) { - // TODO change Sample to accept SegmentLocation - key := string(segment.Location.Encode()) +func (collector *Collector) RemoteSegment(ctx context.Context, segment *metainfo.Segment) (err error) { for _, piece := range segment.Pieces { if _, ok := collector.Reservoirs[piece.StorageNode]; !ok { collector.Reservoirs[piece.StorageNode] = NewReservoir(collector.slotCount) } - collector.Reservoirs[piece.StorageNode].Sample(collector.rand, key) + collector.Reservoirs[piece.StorageNode].Sample(collector.rand, NewSegment(segment)) } return nil } // Object returns nil because the audit service does not interact with objects. -func (collector *PathCollector) Object(ctx context.Context, object *metainfo.Object) (err error) { +func (collector *Collector) Object(ctx context.Context, object *metainfo.Object) (err error) { return nil } // InlineSegment returns nil because we're only auditing for storage nodes for now. -func (collector *PathCollector) InlineSegment(ctx context.Context, segment *metainfo.Segment) (err error) { +func (collector *Collector) InlineSegment(ctx context.Context, segment *metainfo.Segment) (err error) { return nil } diff --git a/satellite/audit/pathcollector_test.go b/satellite/audit/collector_test.go similarity index 75% rename from satellite/audit/pathcollector_test.go rename to satellite/audit/collector_test.go index aeeed82f3..f5f2b5809 100644 --- a/satellite/audit/pathcollector_test.go +++ b/satellite/audit/collector_test.go @@ -13,14 +13,13 @@ import ( "github.com/stretchr/testify/require" "storj.io/common/memory" - "storj.io/common/storj" "storj.io/common/testcontext" "storj.io/common/testrand" "storj.io/storj/private/testplanet" "storj.io/storj/satellite/audit" ) -// TestAuditPathCollector does the following: +// TestAuditCollector does the following: // - start testplanet with 5 nodes and a reservoir size of 3 // - upload 5 files // - iterate over all the segments in satellite.Metainfo and store them in allPieces map @@ -30,7 +29,7 @@ import ( // - expect that there is a reservoir for that node on the audit observer // - that the reservoir size is <= 2 (the maxReservoirSize) // - that every item in the reservoir is unique -func TestAuditPathCollector(t *testing.T) { +func TestAuditCollector(t *testing.T) { testplanet.Run(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 5, UplinkCount: 1, Reconfigure: testplanet.Reconfigure{ @@ -51,23 +50,23 @@ func TestAuditPathCollector(t *testing.T) { } r := rand.New(rand.NewSource(time.Now().Unix())) - observer := audit.NewPathCollector(4, r) + observer := audit.NewCollector(4, r) err := satellite.Metainfo.Loop.Join(ctx, observer) require.NoError(t, err) for _, node := range planet.StorageNodes { // expect a reservoir for every node require.NotNil(t, observer.Reservoirs[node.ID()]) - require.True(t, len(observer.Reservoirs[node.ID()].Paths) > 1) + require.True(t, len(observer.Reservoirs[node.ID()].Segments) > 1) - // Require that len paths are <= 3 even though the PathCollector was instantiated with 4 + // Require that len segments are <= 3 even though the Collector was instantiated with 4 // because the maxReservoirSize is currently 3. - require.True(t, len(observer.Reservoirs[node.ID()].Paths) <= 3) + require.True(t, len(observer.Reservoirs[node.ID()].Segments) <= 3) - repeats := make(map[storj.Path]bool) - for _, path := range observer.Reservoirs[node.ID()].Paths { - assert.False(t, repeats[path], "expected every item in reservoir to be unique") - repeats[path] = true + repeats := make(map[audit.Segment]bool) + for _, segment := range observer.Reservoirs[node.ID()].Segments { + assert.False(t, repeats[segment], "expected every item in reservoir to be unique") + repeats[segment] = true } } }) diff --git a/satellite/audit/containment.go b/satellite/audit/containment.go index ae18afd27..d3e1a769e 100644 --- a/satellite/audit/containment.go +++ b/satellite/audit/containment.go @@ -10,6 +10,7 @@ import ( "storj.io/common/pb" "storj.io/common/storj" + "storj.io/storj/satellite/metainfo/metabase" ) var ( @@ -27,11 +28,11 @@ var ( type PendingAudit struct { NodeID storj.NodeID PieceID storj.PieceID - StripeIndex int64 + StripeIndex int32 ShareSize int32 ExpectedShareHash []byte ReverifyCount int32 - Path storj.Path + Segment metabase.SegmentLocation } // Containment holds information about pending audits for contained nodes. diff --git a/satellite/audit/disqualification_test.go b/satellite/audit/disqualification_test.go index dfb044412..0c9756fb9 100644 --- a/satellite/audit/disqualification_test.go +++ b/satellite/audit/disqualification_test.go @@ -21,7 +21,6 @@ import ( "storj.io/storj/satellite/audit" "storj.io/storj/satellite/metainfo/metabase" "storj.io/storj/satellite/overlay" - "storj.io/uplink/private/storage/meta" ) // TestDisqualificationTooManyFailedAudits does the following: @@ -63,7 +62,7 @@ func TestDisqualificationTooManyFailedAudits(t *testing.T) { // failed audits. iterations := 1 for ; ; iterations++ { - _, err := satellitePeer.Audit.Reporter.RecordAudits(ctx, report, "") + _, err := satellitePeer.Audit.Reporter.RecordAudits(ctx, report) require.NoError(t, err) dossier, err := satellitePeer.Overlay.Service.Get(ctx, nodeID) @@ -124,20 +123,19 @@ func TestDisqualifiedNodesGetNoDownload(t *testing.T) { bucket := metabase.BucketLocation{ProjectID: uplinkPeer.Projects[0].ID, BucketName: "testbucket"} - items, _, err := satellitePeer.Metainfo.Service.List(ctx, metabase.SegmentKey{}, "", true, 10, meta.All) + segments, err := satellitePeer.Metainfo.Metabase.TestingAllSegments(ctx) require.NoError(t, err) - require.Equal(t, 1, len(items)) + require.Equal(t, 1, len(segments)) - pointer, err := satellitePeer.Metainfo.Service.Get(ctx, metabase.SegmentKey(items[0].Path)) - require.NoError(t, err) + segment := segments[0] + disqualifiedNode := segment.Pieces[0].StorageNode - disqualifiedNode := pointer.GetRemote().GetRemotePieces()[0].NodeId err = satellitePeer.DB.OverlayCache().DisqualifyNode(ctx, disqualifiedNode) require.NoError(t, err) - limits, _, err := satellitePeer.Orders.Service.CreateGetOrderLimits(ctx, bucket, pointer) + limits, _, err := satellitePeer.Orders.Service.CreateGetOrderLimits(ctx, bucket, segment) require.NoError(t, err) - assert.Len(t, limits, len(pointer.GetRemote().GetRemotePieces())-1) + assert.Len(t, limits, len(segment.Pieces)-1) for _, orderLimit := range limits { assert.False(t, isDisqualified(t, ctx, satellitePeer, orderLimit.Limit.StorageNodeId)) diff --git a/satellite/audit/getshare_test.go b/satellite/audit/getshare_test.go index 6187f60d0..ae0c74b03 100644 --- a/satellite/audit/getshare_test.go +++ b/satellite/audit/getshare_test.go @@ -58,7 +58,7 @@ func reformVerifierWithMockConnector(t testing.TB, sat *testplanet.Satellite, mo verifier := audit.NewVerifier( zaptest.NewLogger(t).Named("a-special-verifier"), - sat.Metainfo.Service, + sat.Metainfo.Metabase, newDialer, sat.Overlay.Service, sat.DB.Containment(), @@ -89,16 +89,16 @@ func TestGetShareDoesNameLookupIfNecessary(t *testing.T) { audits.Chore.Loop.TriggerWait() queue := audits.Queues.Fetch() - path, err := queue.Next() + queueSegment, err := queue.Next() require.NoError(t, err) - pointer, err := testSatellite.Metainfo.Service.Get(ctx, metabase.SegmentKey(path)) - require.NoError(t, err) - shareSize := pointer.GetRemote().GetRedundancy().GetErasureShareSize() - segmentLocation, err := metabase.ParseSegmentKey(metabase.SegmentKey(path)) + segment, err := testSatellite.Metainfo.Metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{ + StreamID: queueSegment.StreamID, + Position: queueSegment.Position, + }) require.NoError(t, err) - orderLimits, privateKey, _, err := testSatellite.Orders.Service.CreateAuditOrderLimits(ctx, segmentLocation.Bucket(), pointer, nil) + orderLimits, privateKey, _, err := testSatellite.Orders.Service.CreateAuditOrderLimits(ctx, queueSegment.Bucket(), segment, nil) require.NoError(t, err) // find any non-nil limit @@ -116,7 +116,7 @@ func TestGetShareDoesNameLookupIfNecessary(t *testing.T) { mock := &mockConnector{} verifier := reformVerifierWithMockConnector(t, testSatellite, mock) - share, err := verifier.GetShare(ctx, limit, privateKey, cachedIPAndPort, 0, shareSize, orderNum) + share, err := verifier.GetShare(ctx, limit, privateKey, cachedIPAndPort, 0, segment.Redundancy.ShareSize, orderNum) require.NoError(t, err) require.NoError(t, share.Error) @@ -143,16 +143,16 @@ func TestGetSharePrefers(t *testing.T) { audits.Chore.Loop.TriggerWait() queue := audits.Queues.Fetch() - path, err := queue.Next() + queueSegment, err := queue.Next() require.NoError(t, err) - pointer, err := testSatellite.Metainfo.Service.Get(ctx, metabase.SegmentKey(path)) - require.NoError(t, err) - shareSize := pointer.GetRemote().GetRedundancy().GetErasureShareSize() - segmentLocation, err := metabase.ParseSegmentKey(metabase.SegmentKey(path)) + segment, err := testSatellite.Metainfo.Metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{ + StreamID: queueSegment.StreamID, + Position: queueSegment.Position, + }) require.NoError(t, err) - orderLimits, privateKey, _, err := testSatellite.Orders.Service.CreateAuditOrderLimits(ctx, segmentLocation.Bucket(), pointer, nil) + orderLimits, privateKey, _, err := testSatellite.Orders.Service.CreateAuditOrderLimits(ctx, queueSegment.Bucket(), segment, nil) require.NoError(t, err) require.GreaterOrEqual(t, len(orderLimits), 1) @@ -179,7 +179,7 @@ func TestGetSharePrefers(t *testing.T) { } verifier := reformVerifierWithMockConnector(t, testSatellite, mock) - share, err := verifier.GetShare(ctx, limit, privateKey, cachedIPAndPort, 0, shareSize, orderNum) + share, err := verifier.GetShare(ctx, limit, privateKey, cachedIPAndPort, 0, segment.Redundancy.ShareSize, orderNum) require.NoError(t, err) require.NoError(t, share.Error) diff --git a/satellite/audit/integration_test.go b/satellite/audit/integration_test.go index 6c2d7267e..56cae9b4c 100644 --- a/satellite/audit/integration_test.go +++ b/satellite/audit/integration_test.go @@ -10,7 +10,6 @@ import ( "github.com/stretchr/testify/require" "storj.io/common/memory" - "storj.io/common/storj" "storj.io/common/testcontext" "storj.io/common/testrand" "storj.io/storj/private/testplanet" @@ -40,23 +39,23 @@ func TestChoreAndWorkerIntegration(t *testing.T) { queue := audits.Queues.Fetch() require.EqualValues(t, 2, queue.Size(), "audit queue") - uniquePaths := make(map[storj.Path]struct{}) + uniqueSegments := make(map[audit.Segment]struct{}) var err error - var path storj.Path - var pathCount int + var segment audit.Segment + var segmentCount int for { - path, err = queue.Next() + segment, err = queue.Next() if err != nil { break } - pathCount++ - _, ok := uniquePaths[path] - require.False(t, ok, "expected unique path in chore queue") + segmentCount++ + _, ok := uniqueSegments[segment] + require.False(t, ok, "expected unique segment in chore queue") - uniquePaths[path] = struct{}{} + uniqueSegments[segment] = struct{}{} } require.True(t, audit.ErrEmptyQueue.Has(err)) - require.Equal(t, 2, pathCount) + require.Equal(t, 2, segmentCount) require.Equal(t, 0, queue.Size()) // Repopulate the queue for the worker. diff --git a/satellite/audit/queue.go b/satellite/audit/queue.go index 2c18f3548..f602e7485 100644 --- a/satellite/audit/queue.go +++ b/satellite/audit/queue.go @@ -8,30 +8,28 @@ import ( "sync" "github.com/zeebo/errs" - - "storj.io/common/storj" ) // ErrEmptyQueue is used to indicate that the queue is empty. var ErrEmptyQueue = errs.Class("empty audit queue") -// Queue is a list of paths to audit, shared between the reservoir chore and audit workers. +// Queue is a list of segments to audit, shared between the reservoir chore and audit workers. // It is not safe for concurrent use. type Queue struct { - queue []storj.Path + queue []Segment } // NewQueue creates a new audit queue. -func NewQueue(paths []storj.Path) *Queue { +func NewQueue(segments []Segment) *Queue { return &Queue{ - queue: paths, + queue: segments, } } // Next gets the next item in the queue. -func (q *Queue) Next() (storj.Path, error) { +func (q *Queue) Next() (Segment, error) { if len(q.queue) == 0 { - return "", ErrEmptyQueue.New("") + return Segment{}, ErrEmptyQueue.New("") } next := q.queue[0] @@ -60,7 +58,7 @@ type Queues struct { // NewQueues creates a new Queues object. func NewQueues() *Queues { queues := &Queues{ - nextQueue: NewQueue([]storj.Path{}), + nextQueue: NewQueue([]Segment{}), } return queues } @@ -78,7 +76,7 @@ func (queues *Queues) Fetch() *Queue { if queues.swapQueue != nil { queues.swapQueue() } else { - queues.nextQueue = NewQueue([]storj.Path{}) + queues.nextQueue = NewQueue([]Segment{}) } return active @@ -88,7 +86,7 @@ func (queues *Queues) Fetch() *Queue { // Push adds a pending queue to be swapped in when ready. // If nextQueue is empty, it immediately replaces the queue. Otherwise it creates a swapQueue callback to be called when nextQueue is fetched. // Only one call to Push is permitted at a time, otherwise it will return ErrPendingQueueInProgress. -func (queues *Queues) Push(pendingQueue []storj.Path) error { +func (queues *Queues) Push(pendingQueue []Segment) error { queues.mu.Lock() defer queues.mu.Unlock() diff --git a/satellite/audit/queue_test.go b/satellite/audit/queue_test.go index fe7663faa..8e622763a 100644 --- a/satellite/audit/queue_test.go +++ b/satellite/audit/queue_test.go @@ -11,8 +11,9 @@ import ( "golang.org/x/sync/errgroup" "storj.io/common/errs2" - "storj.io/common/storj" "storj.io/common/testcontext" + "storj.io/common/testrand" + "storj.io/storj/satellite/metainfo/metabase" ) func TestQueues(t *testing.T) { @@ -25,7 +26,7 @@ func TestQueues(t *testing.T) { _, err := q.Next() require.True(t, ErrEmptyQueue.Has(err), "required ErrEmptyQueue error") - testQueue1 := []storj.Path{"a", "b", "c"} + testQueue1 := []Segment{testSegment("a"), testSegment("b"), testSegment("c")} err = queues.Push(testQueue1) require.NoError(t, err) err = queues.WaitForSwap(ctx) @@ -47,14 +48,14 @@ func TestQueuesPush(t *testing.T) { queues := NewQueues() // when next queue is empty, WaitForSwap should return immediately - testQueue1 := []storj.Path{"a", "b", "c"} + testQueue1 := []Segment{testSegment("a"), testSegment("b"), testSegment("c")} err := queues.Push(testQueue1) require.NoError(t, err) err = queues.WaitForSwap(ctx) require.NoError(t, err) // second call to WaitForSwap should block until Fetch is called the first time - testQueue2 := []storj.Path{"d", "e"} + testQueue2 := []Segment{testSegment("d"), testSegment("e")} err = queues.Push(testQueue2) require.NoError(t, err) var group errgroup.Group @@ -86,14 +87,14 @@ func TestQueuesPushCancel(t *testing.T) { queues := NewQueues() // when queue is empty, WaitForSwap should return immediately - testQueue1 := []storj.Path{"a", "b", "c"} + testQueue1 := []Segment{testSegment("a"), testSegment("b"), testSegment("c")} err := queues.Push(testQueue1) require.NoError(t, err) err = queues.WaitForSwap(ctx) require.NoError(t, err) ctxWithCancel, cancel := context.WithCancel(ctx) - testQueue2 := []storj.Path{"d", "e"} + testQueue2 := []Segment{testSegment("d"), testSegment("e")} err = queues.Push(testQueue2) require.NoError(t, err) var group errgroup.Group @@ -112,3 +113,14 @@ func TestQueuesPushCancel(t *testing.T) { err = group.Wait() require.NoError(t, err) } + +func testSegment(objectKey string) Segment { + return Segment{ + SegmentLocation: metabase.SegmentLocation{ + ProjectID: testrand.UUID(), + BucketName: "test", + ObjectKey: metabase.ObjectKey(objectKey), + }, + StreamID: testrand.UUID(), + } +} diff --git a/satellite/audit/reporter.go b/satellite/audit/reporter.go index 70c5d839a..d628b1786 100644 --- a/satellite/audit/reporter.go +++ b/satellite/audit/reporter.go @@ -46,7 +46,7 @@ func NewReporter(log *zap.Logger, overlay *overlay.Service, containment Containm // RecordAudits saves audit results to overlay. When no error, it returns // nil for both return values, otherwise it returns the report with the fields // set to the values which have been saved and the error. -func (reporter *Reporter) RecordAudits(ctx context.Context, req Report, path storj.Path) (_ Report, err error) { +func (reporter *Reporter) RecordAudits(ctx context.Context, req Report) (_ Report, err error) { defer mon.Task()(&ctx)(&err) successes := req.Successes @@ -241,7 +241,7 @@ func (reporter *Reporter) recordPendingAudits(ctx context.Context, pendingAudits if len(failed) > 0 { for _, v := range failed { - reporter.log.Debug("failed to record Pending Nodes ", zap.Stringer("NodeID", v.NodeID), zap.String("Path", v.Path)) + reporter.log.Debug("failed to record Pending Nodes ", zap.Stringer("NodeID", v.NodeID), zap.ByteString("Segment Location", []byte(v.Segment.Encode()))) } return failed, errs.Combine(Error.New("failed to record some pending audits"), errlist.Err()) } diff --git a/satellite/audit/reporter_test.go b/satellite/audit/reporter_test.go index 35a4e133b..dcb7513b5 100644 --- a/satellite/audit/reporter_test.go +++ b/satellite/audit/reporter_test.go @@ -42,7 +42,7 @@ func TestReportPendingAudits(t *testing.T) { overlay := satellite.Overlay.Service containment := satellite.DB.Containment() - failed, err := audits.Reporter.RecordAudits(ctx, report, "") + failed, err := audits.Reporter.RecordAudits(ctx, report) require.NoError(t, err) assert.Zero(t, failed) @@ -69,7 +69,7 @@ func TestRecordAuditsAtLeastOnce(t *testing.T) { report := audit.Report{Successes: []storj.NodeID{nodeID}} // expect RecordAudits to try recording at least once (maxRetries is set to 0) - failed, err := audits.Reporter.RecordAudits(ctx, report, "") + failed, err := audits.Reporter.RecordAudits(ctx, report) require.NoError(t, err) require.Zero(t, failed) @@ -107,13 +107,12 @@ func TestRecordAuditsCorrectOutcome(t *testing.T) { ShareSize: 10, ExpectedShareHash: []byte{}, ReverifyCount: 0, - Path: "", }, }, Offlines: []storj.NodeID{offlineNode}, } - failed, err := audits.Reporter.RecordAudits(ctx, report, "") + failed, err := audits.Reporter.RecordAudits(ctx, report) require.NoError(t, err) require.Zero(t, failed) @@ -155,7 +154,7 @@ func TestSuspensionTimeNotResetBySuccessiveAudit(t *testing.T) { suspendedNode := planet.StorageNodes[0].ID() - failed, err := audits.Reporter.RecordAudits(ctx, audit.Report{Unknown: []storj.NodeID{suspendedNode}}, "") + failed, err := audits.Reporter.RecordAudits(ctx, audit.Report{Unknown: []storj.NodeID{suspendedNode}}) require.NoError(t, err) require.Zero(t, failed) @@ -168,7 +167,7 @@ func TestSuspensionTimeNotResetBySuccessiveAudit(t *testing.T) { suspendedAt := node.UnknownAuditSuspended - failed, err = audits.Reporter.RecordAudits(ctx, audit.Report{Unknown: []storj.NodeID{suspendedNode}}, "") + failed, err = audits.Reporter.RecordAudits(ctx, audit.Report{Unknown: []storj.NodeID{suspendedNode}}) require.NoError(t, err) require.Zero(t, failed) @@ -224,7 +223,7 @@ func TestGracefullyExitedNotUpdated(t *testing.T) { PendingAudits: []*audit.PendingAudit{&pending}, Unknown: storj.NodeIDList{unknownNode.ID()}, } - failed, err := audits.Reporter.RecordAudits(ctx, report, "") + failed, err := audits.Reporter.RecordAudits(ctx, report) require.NoError(t, err) assert.Zero(t, failed) @@ -253,7 +252,7 @@ func TestReportOfflineAudits(t *testing.T) { audits.Worker.Loop.Pause() cache := satellite.Overlay.DB - _, err := audits.Reporter.RecordAudits(ctx, audit.Report{Offlines: storj.NodeIDList{node.ID()}}, "") + _, err := audits.Reporter.RecordAudits(ctx, audit.Report{Offlines: storj.NodeIDList{node.ID()}}) require.NoError(t, err) d, err := cache.Get(ctx, node.ID()) diff --git a/satellite/audit/reservoir.go b/satellite/audit/reservoir.go index da61ebad7..770172cf7 100644 --- a/satellite/audit/reservoir.go +++ b/satellite/audit/reservoir.go @@ -5,17 +5,20 @@ package audit import ( "math/rand" + "time" - "storj.io/common/storj" + "storj.io/common/uuid" + "storj.io/storj/satellite/metainfo" + "storj.io/storj/satellite/metainfo/metabase" ) const maxReservoirSize = 3 // Reservoir holds a certain number of segments to reflect a random sample. type Reservoir struct { - Paths [maxReservoirSize]storj.Path - size int8 - index int64 + Segments [maxReservoirSize]Segment + size int8 + index int64 } // NewReservoir instantiates a Reservoir. @@ -33,14 +36,35 @@ func NewReservoir(size int) *Reservoir { // Sample makes sure that for every segment in metainfo from index i=size..n-1, // pick a random number r = rand(0..i), and if r < size, replace reservoir.Segments[r] with segment. -func (reservoir *Reservoir) Sample(r *rand.Rand, path storj.Path) { +func (reservoir *Reservoir) Sample(r *rand.Rand, segment Segment) { reservoir.index++ if reservoir.index < int64(reservoir.size) { - reservoir.Paths[reservoir.index] = path + reservoir.Segments[reservoir.index] = segment } else { random := r.Int63n(reservoir.index) if random < int64(reservoir.size) { - reservoir.Paths[random] = path + reservoir.Segments[random] = segment } } } + +// Segment is a segment to audit. +type Segment struct { + metabase.SegmentLocation + StreamID uuid.UUID + ExpirationDate time.Time +} + +// NewSegment creates a new segment to audit from a metainfo loop segment. +func NewSegment(loopSegment *metainfo.Segment) Segment { + return Segment{ + SegmentLocation: loopSegment.Location, + StreamID: loopSegment.StreamID, + ExpirationDate: loopSegment.ExpirationDate, + } +} + +// Expired checks if segment is expired relative to now. +func (segment *Segment) Expired(now time.Time) bool { + return !segment.ExpirationDate.IsZero() && segment.ExpirationDate.Before(now) +} diff --git a/satellite/audit/reverify_test.go b/satellite/audit/reverify_test.go index 12d14977f..211408889 100644 --- a/satellite/audit/reverify_test.go +++ b/satellite/audit/reverify_test.go @@ -13,7 +13,6 @@ import ( "go.uber.org/zap" "storj.io/common/memory" - "storj.io/common/pb" "storj.io/common/peertls/tlsopts" "storj.io/common/pkcrypto" "storj.io/common/rpc" @@ -25,7 +24,6 @@ import ( "storj.io/storj/satellite" "storj.io/storj/satellite/audit" "storj.io/storj/satellite/metainfo/metabase" - "storj.io/storj/storage" "storj.io/storj/storagenode" ) @@ -55,53 +53,55 @@ func TestReverifySuccess(t *testing.T) { audits.Chore.Loop.TriggerWait() queue := audits.Queues.Fetch() - path, err := queue.Next() + queueSegment, err := queue.Next() require.NoError(t, err) - pointer, err := satellite.Metainfo.Service.Get(ctx, metabase.SegmentKey(path)) + segment, err := satellite.Metainfo.Metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{ + StreamID: queueSegment.StreamID, + Position: queueSegment.Position, + }) require.NoError(t, err) - randomIndex, err := audit.GetRandomStripe(ctx, pointer) + randomIndex, err := audit.GetRandomStripe(ctx, segment) require.NoError(t, err) orders := satellite.Orders.Service containment := satellite.DB.Containment() - bucket := metabase.BucketLocation{ProjectID: ul.Projects[0].ID, BucketName: "testbucket"} - shareSize := pointer.GetRemote().GetRedundancy().GetErasureShareSize() + shareSize := segment.Redundancy.ShareSize + pieces := segment.Pieces + rootPieceID := segment.RootPieceID - pieces := pointer.GetRemote().GetRemotePieces() - rootPieceID := pointer.GetRemote().RootPieceId - limit, privateKey, cachedIPAndPort, err := orders.CreateAuditOrderLimit(ctx, bucket, pieces[0].NodeId, pieces[0].PieceNum, rootPieceID, shareSize) + limit, privateKey, cachedIPAndPort, err := orders.CreateAuditOrderLimit(ctx, queueSegment.Bucket(), pieces[0].StorageNode, pieces[0].Number, rootPieceID, shareSize) require.NoError(t, err) - share, err := audits.Verifier.GetShare(ctx, limit, privateKey, cachedIPAndPort, randomIndex, shareSize, int(pieces[0].PieceNum)) + share, err := audits.Verifier.GetShare(ctx, limit, privateKey, cachedIPAndPort, randomIndex, shareSize, int(pieces[0].Number)) require.NoError(t, err) pending := &audit.PendingAudit{ - NodeID: pieces[0].NodeId, + NodeID: pieces[0].StorageNode, PieceID: rootPieceID, StripeIndex: randomIndex, ShareSize: shareSize, ExpectedShareHash: pkcrypto.SHA256Hash(share.Data), ReverifyCount: 0, - Path: path, + Segment: queueSegment.SegmentLocation, } err = containment.IncrementPending(ctx, pending) require.NoError(t, err) - report, err := audits.Verifier.Reverify(ctx, path) + report, err := audits.Verifier.Reverify(ctx, queueSegment) require.NoError(t, err) require.Len(t, report.Fails, 0) require.Len(t, report.Offlines, 0) require.Len(t, report.PendingAudits, 0) require.Len(t, report.Successes, 1) - require.Equal(t, report.Successes[0], pieces[0].NodeId) + require.Equal(t, report.Successes[0], pieces[0].StorageNode) // record audit - _, err = audits.Reporter.RecordAudits(ctx, report, path) + _, err = audits.Reporter.RecordAudits(ctx, report) require.NoError(t, err) // make sure that pending audit is removed by the reporter when audit is recorded @@ -135,60 +135,62 @@ func TestReverifyFailMissingShare(t *testing.T) { audits.Chore.Loop.TriggerWait() queue := audits.Queues.Fetch() - path, err := queue.Next() + queueSegment, err := queue.Next() require.NoError(t, err) - pointer, err := satellite.Metainfo.Service.Get(ctx, metabase.SegmentKey(path)) + segment, err := satellite.Metainfo.Metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{ + StreamID: queueSegment.StreamID, + Position: queueSegment.Position, + }) require.NoError(t, err) - randomIndex, err := audit.GetRandomStripe(ctx, pointer) + randomIndex, err := audit.GetRandomStripe(ctx, segment) require.NoError(t, err) orders := satellite.Orders.Service containment := satellite.DB.Containment() - bucket := metabase.BucketLocation{ProjectID: ul.Projects[0].ID, BucketName: "testbucket"} - shareSize := pointer.GetRemote().GetRedundancy().GetErasureShareSize() + shareSize := segment.Redundancy.ShareSize + pieces := segment.Pieces + rootPieceID := segment.RootPieceID - pieces := pointer.GetRemote().GetRemotePieces() - rootPieceID := pointer.GetRemote().RootPieceId - limit, privateKey, cachedIPAndPort, err := orders.CreateAuditOrderLimit(ctx, bucket, pieces[0].NodeId, pieces[0].PieceNum, rootPieceID, shareSize) + limit, privateKey, cachedIPAndPort, err := orders.CreateAuditOrderLimit(ctx, queueSegment.Bucket(), pieces[0].StorageNode, pieces[0].Number, rootPieceID, shareSize) require.NoError(t, err) - share, err := audits.Verifier.GetShare(ctx, limit, privateKey, cachedIPAndPort, randomIndex, shareSize, int(pieces[0].PieceNum)) + share, err := audits.Verifier.GetShare(ctx, limit, privateKey, cachedIPAndPort, randomIndex, shareSize, int(pieces[0].Number)) require.NoError(t, err) pending := &audit.PendingAudit{ - NodeID: pieces[0].NodeId, + NodeID: pieces[0].StorageNode, PieceID: rootPieceID, StripeIndex: randomIndex, ShareSize: shareSize, ExpectedShareHash: pkcrypto.SHA256Hash(share.Data), ReverifyCount: 0, - Path: path, + Segment: queueSegment.SegmentLocation, } err = containment.IncrementPending(ctx, pending) require.NoError(t, err) // delete the piece from the first node - piece := pointer.GetRemote().GetRemotePieces()[0] - pieceID := pointer.GetRemote().RootPieceId.Derive(piece.NodeId, piece.PieceNum) - node := planet.FindNode(piece.NodeId) + piece := pieces[0] + pieceID := rootPieceID.Derive(piece.StorageNode, int32(piece.Number)) + node := planet.FindNode(piece.StorageNode) err = node.Storage2.Store.Delete(ctx, satellite.ID(), pieceID) require.NoError(t, err) - report, err := audits.Verifier.Reverify(ctx, path) + report, err := audits.Verifier.Reverify(ctx, queueSegment) require.NoError(t, err) require.Len(t, report.Successes, 0) require.Len(t, report.Offlines, 0) require.Len(t, report.PendingAudits, 0) require.Len(t, report.Fails, 1) - require.Equal(t, report.Fails[0], pieces[0].NodeId) + require.Equal(t, report.Fails[0], pieces[0].StorageNode) // record audit - _, err = audits.Reporter.RecordAudits(ctx, report, path) + _, err = audits.Reporter.RecordAudits(ctx, report) require.NoError(t, err) // make sure that pending audit is removed by the reporter when audit is recorded @@ -222,34 +224,37 @@ func TestReverifyFailBadData(t *testing.T) { audits.Chore.Loop.TriggerWait() queue := audits.Queues.Fetch() - path, err := queue.Next() + queueSegment, err := queue.Next() require.NoError(t, err) - pointer, err := satellite.Metainfo.Service.Get(ctx, metabase.SegmentKey(path)) + segment, err := satellite.Metainfo.Metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{ + StreamID: queueSegment.StreamID, + Position: queueSegment.Position, + }) require.NoError(t, err) - randomIndex, err := audit.GetRandomStripe(ctx, pointer) + randomIndex, err := audit.GetRandomStripe(ctx, segment) require.NoError(t, err) - pieces := pointer.GetRemote().GetRemotePieces() - rootPieceID := pointer.GetRemote().RootPieceId - redundancy := pointer.GetRemote().GetRedundancy() + pieces := segment.Pieces + rootPieceID := segment.RootPieceID + redundancy := segment.Redundancy pending := &audit.PendingAudit{ - NodeID: pieces[0].NodeId, + NodeID: pieces[0].StorageNode, PieceID: rootPieceID, StripeIndex: randomIndex, - ShareSize: redundancy.ErasureShareSize, + ShareSize: redundancy.ShareSize, ExpectedShareHash: pkcrypto.SHA256Hash(nil), ReverifyCount: 0, - Path: path, + Segment: queueSegment.SegmentLocation, } err = satellite.DB.Containment().IncrementPending(ctx, pending) require.NoError(t, err) - nodeID := pieces[0].NodeId - report, err := audits.Verifier.Reverify(ctx, path) + nodeID := pieces[0].StorageNode + report, err := audits.Verifier.Reverify(ctx, queueSegment) require.NoError(t, err) require.Len(t, report.Successes, 0) @@ -259,7 +264,7 @@ func TestReverifyFailBadData(t *testing.T) { require.Equal(t, report.Fails[0], nodeID) // record audit - _, err = audits.Reporter.RecordAudits(ctx, report, path) + _, err = audits.Reporter.RecordAudits(ctx, report) require.NoError(t, err) // make sure that pending audit is removed by the reporter when audit is recorded @@ -294,43 +299,46 @@ func TestReverifyOffline(t *testing.T) { audits.Chore.Loop.TriggerWait() queue := audits.Queues.Fetch() - path, err := queue.Next() + queueSegment, err := queue.Next() require.NoError(t, err) - pointer, err := satellite.Metainfo.Service.Get(ctx, metabase.SegmentKey(path)) + segment, err := satellite.Metainfo.Metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{ + StreamID: queueSegment.StreamID, + Position: queueSegment.Position, + }) require.NoError(t, err) - randomIndex, err := audit.GetRandomStripe(ctx, pointer) + randomIndex, err := audit.GetRandomStripe(ctx, segment) require.NoError(t, err) - pieces := pointer.GetRemote().GetRemotePieces() - rootPieceID := pointer.GetRemote().RootPieceId - redundancy := pointer.GetRemote().GetRedundancy() + pieces := segment.Pieces + rootPieceID := segment.RootPieceID + redundancy := segment.Redundancy pending := &audit.PendingAudit{ - NodeID: pieces[0].NodeId, + NodeID: pieces[0].StorageNode, PieceID: rootPieceID, StripeIndex: randomIndex, - ShareSize: redundancy.ErasureShareSize, + ShareSize: redundancy.ShareSize, ExpectedShareHash: pkcrypto.SHA256Hash(testrand.Bytes(10)), ReverifyCount: 0, - Path: path, + Segment: queueSegment.SegmentLocation, } err = satellite.DB.Containment().IncrementPending(ctx, pending) require.NoError(t, err) - err = planet.StopNodeAndUpdate(ctx, planet.FindNode(pieces[0].NodeId)) + err = planet.StopNodeAndUpdate(ctx, planet.FindNode(pieces[0].StorageNode)) require.NoError(t, err) - report, err := audits.Verifier.Reverify(ctx, path) + report, err := audits.Verifier.Reverify(ctx, queueSegment) require.NoError(t, err) require.Len(t, report.Successes, 0) require.Len(t, report.Fails, 0) require.Len(t, report.PendingAudits, 0) require.Len(t, report.Offlines, 1) - require.Equal(t, report.Offlines[0], pieces[0].NodeId) + require.Equal(t, report.Offlines[0], pieces[0].StorageNode) // make sure that pending audit is not removed containment := satellite.DB.Containment() @@ -364,13 +372,16 @@ func TestReverifyOfflineDialTimeout(t *testing.T) { audits.Chore.Loop.TriggerWait() queue := audits.Queues.Fetch() - path, err := queue.Next() + queueSegment, err := queue.Next() require.NoError(t, err) - pointer, err := satellite.Metainfo.Service.Get(ctx, metabase.SegmentKey(path)) + segment, err := satellite.Metainfo.Metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{ + StreamID: queueSegment.StreamID, + Position: queueSegment.Position, + }) require.NoError(t, err) - randomIndex, err := audit.GetRandomStripe(ctx, pointer) + randomIndex, err := audit.GetRandomStripe(ctx, segment) require.NoError(t, err) tlsOptions, err := tlsopts.NewOptions(satellite.Identity, tlsopts.Config{}, nil) @@ -391,7 +402,7 @@ func TestReverifyOfflineDialTimeout(t *testing.T) { verifier := audit.NewVerifier( satellite.Log.Named("verifier"), - satellite.Metainfo.Service, + satellite.Metainfo.Metabase, dialer, satellite.Overlay.Service, satellite.DB.Containment(), @@ -400,25 +411,24 @@ func TestReverifyOfflineDialTimeout(t *testing.T) { minBytesPerSecond, 5*time.Second) - pieces := pointer.GetRemote().GetRemotePieces() - - rootPieceID := pointer.GetRemote().RootPieceId - redundancy := pointer.GetRemote().GetRedundancy() + pieces := segment.Pieces + rootPieceID := segment.RootPieceID + redundancy := segment.Redundancy pending := &audit.PendingAudit{ - NodeID: pieces[0].NodeId, + NodeID: pieces[0].StorageNode, PieceID: rootPieceID, StripeIndex: randomIndex, - ShareSize: redundancy.ErasureShareSize, + ShareSize: redundancy.ShareSize, ExpectedShareHash: pkcrypto.SHA256Hash(nil), ReverifyCount: 0, - Path: path, + Segment: queueSegment.SegmentLocation, } err = satellite.DB.Containment().IncrementPending(ctx, pending) require.NoError(t, err) - report, err := verifier.Reverify(ctx, path) + report, err := verifier.Reverify(ctx, queueSegment) require.NoError(t, err) require.Len(t, report.Successes, 0) @@ -442,7 +452,7 @@ func TestReverifyDeletedSegment(t *testing.T) { }, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { // - uploads random data to all nodes - // - gets a path from the audit queue + // - gets a segment from the audit queue // - creates one pending audit for a node holding a piece for that segment // - deletes the file // - calls reverify on the deleted file @@ -463,24 +473,27 @@ func TestReverifyDeletedSegment(t *testing.T) { audits.Chore.Loop.TriggerWait() queue := audits.Queues.Fetch() - path, err := queue.Next() + queueSegment, err := queue.Next() require.NoError(t, err) - pointer, err := satellite.Metainfo.Service.Get(ctx, metabase.SegmentKey(path)) + segment, err := satellite.Metainfo.Metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{ + StreamID: queueSegment.StreamID, + Position: queueSegment.Position, + }) require.NoError(t, err) - randomIndex, err := audit.GetRandomStripe(ctx, pointer) + randomIndex, err := audit.GetRandomStripe(ctx, segment) require.NoError(t, err) - nodeID := pointer.GetRemote().GetRemotePieces()[0].NodeId + nodeID := segment.Pieces[0].StorageNode pending := &audit.PendingAudit{ NodeID: nodeID, - PieceID: pointer.GetRemote().RootPieceId, + PieceID: segment.RootPieceID, StripeIndex: randomIndex, - ShareSize: pointer.GetRemote().GetRedundancy().GetErasureShareSize(), + ShareSize: segment.Redundancy.ShareSize, ExpectedShareHash: pkcrypto.SHA256Hash(nil), ReverifyCount: 0, - Path: path, + Segment: queueSegment.SegmentLocation, } containment := satellite.DB.Containment() @@ -493,7 +506,7 @@ func TestReverifyDeletedSegment(t *testing.T) { // call reverify on the deleted file and expect no error // but expect that the node is still in containment - report, err := audits.Verifier.Reverify(ctx, path) + report, err := audits.Verifier.Reverify(ctx, queueSegment) require.NoError(t, err) assert.Empty(t, report) @@ -507,11 +520,11 @@ func TestReverifyDeletedSegment(t *testing.T) { audits.Chore.Loop.TriggerWait() queue = audits.Queues.Fetch() - path, err = queue.Next() + queueSegment, err = queue.Next() require.NoError(t, err) - // reverify the new path - report, err = audits.Verifier.Reverify(ctx, path) + // reverify the new segment + report, err = audits.Verifier.Reverify(ctx, queueSegment) require.NoError(t, err) assert.Empty(t, report.Fails) assert.Empty(t, report.Successes) @@ -538,7 +551,6 @@ func TestReverifyModifiedSegment(t *testing.T) { satellite := planet.Satellites[0] audits := satellite.Audit - metainfo := satellite.Metainfo.Service audits.Worker.Loop.Pause() audits.Chore.Loop.Pause() @@ -550,24 +562,27 @@ func TestReverifyModifiedSegment(t *testing.T) { audits.Chore.Loop.TriggerWait() queue := audits.Queues.Fetch() - pendingPath, err := queue.Next() + queueSegment, err := queue.Next() require.NoError(t, err) - pointer, err := satellite.Metainfo.Service.Get(ctx, metabase.SegmentKey(pendingPath)) + segment, err := satellite.Metainfo.Metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{ + StreamID: queueSegment.StreamID, + Position: queueSegment.Position, + }) require.NoError(t, err) - randomIndex, err := audit.GetRandomStripe(ctx, pointer) + randomIndex, err := audit.GetRandomStripe(ctx, segment) require.NoError(t, err) - nodeID := pointer.GetRemote().GetRemotePieces()[0].NodeId + nodeID := segment.Pieces[0].StorageNode pending := &audit.PendingAudit{ NodeID: nodeID, - PieceID: pointer.GetRemote().RootPieceId, + PieceID: segment.RootPieceID, StripeIndex: randomIndex, - ShareSize: pointer.GetRemote().GetRedundancy().GetErasureShareSize(), + ShareSize: segment.Redundancy.ShareSize, ExpectedShareHash: pkcrypto.SHA256Hash(nil), ReverifyCount: 0, - Path: pendingPath, + Segment: queueSegment.SegmentLocation, } containment := satellite.DB.Containment() @@ -577,8 +592,12 @@ func TestReverifyModifiedSegment(t *testing.T) { // remove a piece from the file (a piece that the contained node isn't holding) audits.Verifier.OnTestingCheckSegmentAlteredHook = func() { - pieceToRemove := pointer.Remote.RemotePieces[1] - _, err = metainfo.UpdatePieces(ctx, metabase.SegmentKey(pendingPath), pointer, nil, []*pb.RemotePiece{pieceToRemove}) + err = satellite.Metainfo.Metabase.UpdateSegmentPieces(ctx, metabase.UpdateSegmentPieces{ + StreamID: queueSegment.StreamID, + Position: queueSegment.Position, + OldPieces: segment.Pieces, + NewPieces: append([]metabase.Piece{segment.Pieces[0]}, segment.Pieces[2:]...), + }) require.NoError(t, err) } @@ -587,20 +606,20 @@ func TestReverifyModifiedSegment(t *testing.T) { err = ul.Upload(ctx, satellite, "testbucket", "test/path2", testData2) require.NoError(t, err) - // select the encrypted path that was not used for the pending audit + // select the segment that was not used for the pending audit audits.Chore.Loop.TriggerWait() queue = audits.Queues.Fetch() - path1, err := queue.Next() + queueSegment1, err := queue.Next() require.NoError(t, err) - path2, err := queue.Next() + queueSegment2, err := queue.Next() require.NoError(t, err) - reverifyPath := path1 - if path1 == pendingPath { - reverifyPath = path2 + reverifySegment := queueSegment1 + if queueSegment1 == queueSegment { + reverifySegment = queueSegment2 } - // reverify the path that was not modified - report, err := audits.Verifier.Reverify(ctx, reverifyPath) + // reverify the segment that was not modified + report, err := audits.Verifier.Reverify(ctx, reverifySegment) require.NoError(t, err) assert.Empty(t, report.Fails) assert.Empty(t, report.Successes) @@ -638,24 +657,27 @@ func TestReverifyReplacedSegment(t *testing.T) { audits.Chore.Loop.TriggerWait() queue := audits.Queues.Fetch() - pendingPath, err := queue.Next() + queueSegment, err := queue.Next() require.NoError(t, err) - pointer, err := satellite.Metainfo.Service.Get(ctx, metabase.SegmentKey(pendingPath)) + segment, err := satellite.Metainfo.Metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{ + StreamID: queueSegment.StreamID, + Position: queueSegment.Position, + }) require.NoError(t, err) - randomIndex, err := audit.GetRandomStripe(ctx, pointer) + randomIndex, err := audit.GetRandomStripe(ctx, segment) require.NoError(t, err) - nodeID := pointer.GetRemote().GetRemotePieces()[0].NodeId + nodeID := segment.Pieces[0].StorageNode pending := &audit.PendingAudit{ NodeID: nodeID, - PieceID: pointer.GetRemote().RootPieceId, + PieceID: segment.RootPieceID, StripeIndex: randomIndex, - ShareSize: pointer.GetRemote().GetRedundancy().GetErasureShareSize(), + ShareSize: segment.Redundancy.ShareSize, ExpectedShareHash: pkcrypto.SHA256Hash(nil), ReverifyCount: 0, - Path: pendingPath, + Segment: queueSegment.SegmentLocation, } containment := satellite.DB.Containment() @@ -672,20 +694,20 @@ func TestReverifyReplacedSegment(t *testing.T) { err = ul.Upload(ctx, satellite, "testbucket", "test/path2", testData2) require.NoError(t, err) - // select the encrypted path that was not used for the pending audit + // select the segment that was not used for the pending audit audits.Chore.Loop.TriggerWait() queue = audits.Queues.Fetch() - path1, err := queue.Next() + queueSegment1, err := queue.Next() require.NoError(t, err) - path2, err := queue.Next() + queueSegment2, err := queue.Next() require.NoError(t, err) - reverifyPath := path1 - if path1 == pendingPath { - reverifyPath = path2 + reverifySegment := queueSegment1 + if queueSegment1 == queueSegment { + reverifySegment = queueSegment2 } - // reverify the path that was not modified - report, err := audits.Verifier.Reverify(ctx, reverifyPath) + // reverify the segment that was not modified + report, err := audits.Verifier.Reverify(ctx, reverifySegment) require.NoError(t, err) assert.Empty(t, report.Fails) assert.Empty(t, report.Successes) @@ -732,46 +754,52 @@ func TestReverifyDifferentShare(t *testing.T) { audits.Chore.Loop.TriggerWait() queue := audits.Queues.Fetch() - path1, err := queue.Next() + queueSegment1, err := queue.Next() require.NoError(t, err) - path2, err := queue.Next() + queueSegment2, err := queue.Next() require.NoError(t, err) - require.NotEqual(t, path1, path2) + require.NotEqual(t, queueSegment1, queueSegment2) - pointer1, err := satellite.Metainfo.Service.Get(ctx, metabase.SegmentKey(path1)) + segment1, err := satellite.Metainfo.Metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{ + StreamID: queueSegment1.StreamID, + Position: queueSegment1.Position, + }) require.NoError(t, err) - pointer2, err := satellite.Metainfo.Service.Get(ctx, metabase.SegmentKey(path2)) + + segment2, err := satellite.Metainfo.Metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{ + StreamID: queueSegment2.StreamID, + Position: queueSegment2.Position, + }) require.NoError(t, err) // find a node that contains a piece for both files - // save that node ID and the piece number associated with it for pointer1 + // save that node ID and the piece number associated with it for segment1 var selectedNode storj.NodeID - var selectedPieceNum int32 - p1Nodes := make(map[storj.NodeID]int32) - for _, piece := range pointer1.GetRemote().GetRemotePieces() { - p1Nodes[piece.NodeId] = piece.PieceNum + var selectedPieceNum uint16 + p1Nodes := make(map[storj.NodeID]uint16) + for _, piece := range segment1.Pieces { + p1Nodes[piece.StorageNode] = piece.Number } - for _, piece := range pointer2.GetRemote().GetRemotePieces() { - pieceNum, ok := p1Nodes[piece.NodeId] + for _, piece := range segment2.Pieces { + pieceNum, ok := p1Nodes[piece.StorageNode] if ok { - selectedNode = piece.NodeId + selectedNode = piece.StorageNode selectedPieceNum = pieceNum break } } require.NotEqual(t, selectedNode, storj.NodeID{}) - randomIndex, err := audit.GetRandomStripe(ctx, pointer1) + randomIndex, err := audit.GetRandomStripe(ctx, segment1) require.NoError(t, err) orders := satellite.Orders.Service containment := satellite.DB.Containment() - bucket := metabase.BucketLocation{ProjectID: ul.Projects[0].ID, BucketName: "testbucket"} - shareSize := pointer1.GetRemote().GetRedundancy().GetErasureShareSize() + shareSize := segment1.Redundancy.ShareSize + rootPieceID := segment1.RootPieceID - rootPieceID := pointer1.GetRemote().RootPieceId - limit, privateKey, cachedIPAndPort, err := orders.CreateAuditOrderLimit(ctx, bucket, selectedNode, selectedPieceNum, rootPieceID, shareSize) + limit, privateKey, cachedIPAndPort, err := orders.CreateAuditOrderLimit(ctx, queueSegment1.Bucket(), selectedNode, selectedPieceNum, rootPieceID, shareSize) require.NoError(t, err) share, err := audits.Verifier.GetShare(ctx, limit, privateKey, cachedIPAndPort, randomIndex, shareSize, int(selectedPieceNum)) @@ -784,21 +812,21 @@ func TestReverifyDifferentShare(t *testing.T) { ShareSize: shareSize, ExpectedShareHash: pkcrypto.SHA256Hash(share.Data), ReverifyCount: 0, - Path: path1, + Segment: queueSegment1.SegmentLocation, } err = containment.IncrementPending(ctx, pending) require.NoError(t, err) - // delete the piece for pointer1 from the selected node - pieceID := pointer1.GetRemote().RootPieceId.Derive(selectedNode, selectedPieceNum) + // delete the piece for segment1 from the selected node + pieceID := segment1.RootPieceID.Derive(selectedNode, int32(selectedPieceNum)) node := planet.FindNode(selectedNode) err = node.Storage2.Store.Delete(ctx, satellite.ID(), pieceID) require.NoError(t, err) - // reverify with path 2. Since the selected node was put in containment for path1, - // it should be audited for path1 and fail - report, err := audits.Verifier.Reverify(ctx, path2) + // reverify with segment2. Since the selected node was put in containment for segment1, + // it should be audited for segment1 and fail + report, err := audits.Verifier.Reverify(ctx, queueSegment2) require.NoError(t, err) require.Len(t, report.Successes, 0) @@ -807,8 +835,8 @@ func TestReverifyDifferentShare(t *testing.T) { require.Len(t, report.Fails, 1) require.Equal(t, report.Fails[0], selectedNode) - // record audit - _, err = audits.Reporter.RecordAudits(ctx, report, path2) + // record audit + _, err = audits.Reporter.RecordAudits(ctx, report) require.NoError(t, err) // make sure that pending audit is removed by the reporter when audit is recorded @@ -831,30 +859,21 @@ func TestReverifyExpired1(t *testing.T) { ul := planet.Uplinks[0] testData := testrand.Bytes(8 * memory.KiB) - err := ul.Upload(ctx, satellite, "testbucket", "test/path", testData) + err := ul.UploadWithExpiration(ctx, satellite, "testbucket", "test/path", testData, time.Now().Add(1*time.Hour)) require.NoError(t, err) audits.Chore.Loop.TriggerWait() queue := audits.Queues.Fetch() - path, err := queue.Next() + queueSegment, err := queue.Next() require.NoError(t, err) - // set pointer's expiration date to be already expired - pointer, err := satellite.Metainfo.Service.Get(ctx, metabase.SegmentKey(path)) - require.NoError(t, err) - oldPointerBytes, err := pb.Marshal(pointer) - require.NoError(t, err) - newPointer := &pb.Pointer{} - err = pb.Unmarshal(oldPointerBytes, newPointer) - require.NoError(t, err) - newPointer.ExpirationDate = time.Now().Add(-1 * time.Hour) - newPointerBytes, err := pb.Marshal(newPointer) - require.NoError(t, err) - err = satellite.Metainfo.Database.CompareAndSwap(ctx, storage.Key(path), oldPointerBytes, newPointerBytes) - require.NoError(t, err) + // move time into the future so the segment is expired + audits.Verifier.SetNow(func() time.Time { + return time.Now().Add(2 * time.Hour) + }) // Reverify should not return an error - report, err := audits.Verifier.Reverify(ctx, path) + report, err := audits.Verifier.Reverify(ctx, queueSegment) require.NoError(t, err) assert.Len(t, report.Successes, 0) @@ -884,7 +903,7 @@ func TestReverifyExpired2(t *testing.T) { testData1 := testrand.Bytes(8 * memory.KiB) testData2 := testrand.Bytes(8 * memory.KiB) - err := ul.Upload(ctx, satellite, "testbucket", "test/path1", testData1) + err := ul.UploadWithExpiration(ctx, satellite, "testbucket", "test/path1", testData1, time.Now().Add(1*time.Hour)) require.NoError(t, err) err = ul.Upload(ctx, satellite, "testbucket", "test/path2", testData2) @@ -892,46 +911,57 @@ func TestReverifyExpired2(t *testing.T) { audits.Chore.Loop.TriggerWait() queue := audits.Queues.Fetch() - path1, err := queue.Next() + queueSegment1, err := queue.Next() require.NoError(t, err) - path2, err := queue.Next() + queueSegment2, err := queue.Next() require.NoError(t, err) - require.NotEqual(t, path1, path2) + require.NotEqual(t, queueSegment1, queueSegment2) - pointer1, err := satellite.Metainfo.Service.Get(ctx, metabase.SegmentKey(path1)) + // make sure queueSegment1 is the one with the expiration date + if queueSegment1.ExpirationDate.IsZero() { + queueSegment1, queueSegment2 = queueSegment2, queueSegment1 + } + + segment1, err := satellite.Metainfo.Metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{ + StreamID: queueSegment1.StreamID, + Position: queueSegment1.Position, + }) require.NoError(t, err) - pointer2, err := satellite.Metainfo.Service.Get(ctx, metabase.SegmentKey(path2)) + + segment2, err := satellite.Metainfo.Metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{ + StreamID: queueSegment2.StreamID, + Position: queueSegment2.Position, + }) require.NoError(t, err) // find a node that contains a piece for both files - // save that node ID and the piece number associated with it for pointer1 + // save that node ID and the piece number associated with it for segment1 var selectedNode storj.NodeID - var selectedPieceNum int32 - p1Nodes := make(map[storj.NodeID]int32) - for _, piece := range pointer1.GetRemote().GetRemotePieces() { - p1Nodes[piece.NodeId] = piece.PieceNum + var selectedPieceNum uint16 + p1Nodes := make(map[storj.NodeID]uint16) + for _, piece := range segment1.Pieces { + p1Nodes[piece.StorageNode] = piece.Number } - for _, piece := range pointer2.GetRemote().GetRemotePieces() { - pieceNum, ok := p1Nodes[piece.NodeId] + for _, piece := range segment2.Pieces { + pieceNum, ok := p1Nodes[piece.StorageNode] if ok { - selectedNode = piece.NodeId + selectedNode = piece.StorageNode selectedPieceNum = pieceNum break } } require.NotEqual(t, selectedNode, storj.NodeID{}) - randomIndex, err := audit.GetRandomStripe(ctx, pointer1) + randomIndex, err := audit.GetRandomStripe(ctx, segment1) require.NoError(t, err) orders := satellite.Orders.Service containment := satellite.DB.Containment() - bucket := metabase.BucketLocation{ProjectID: ul.Projects[0].ID, BucketName: "testbucket"} - shareSize := pointer1.GetRemote().GetRedundancy().GetErasureShareSize() + shareSize := segment1.Redundancy.ShareSize + rootPieceID := segment1.RootPieceID - rootPieceID := pointer1.GetRemote().RootPieceId - limit, privateKey, cachedIPAndPort, err := orders.CreateAuditOrderLimit(ctx, bucket, selectedNode, selectedPieceNum, rootPieceID, shareSize) + limit, privateKey, cachedIPAndPort, err := orders.CreateAuditOrderLimit(ctx, queueSegment1.Bucket(), selectedNode, selectedPieceNum, rootPieceID, shareSize) require.NoError(t, err) share, err := audits.Verifier.GetShare(ctx, limit, privateKey, cachedIPAndPort, randomIndex, shareSize, int(selectedPieceNum)) @@ -944,29 +974,22 @@ func TestReverifyExpired2(t *testing.T) { ShareSize: shareSize, ExpectedShareHash: pkcrypto.SHA256Hash(share.Data), ReverifyCount: 0, - Path: path1, + Segment: queueSegment1.SegmentLocation, } err = containment.IncrementPending(ctx, pending) require.NoError(t, err) - // update pointer1 to be expired - oldPointerBytes, err := pb.Marshal(pointer1) - require.NoError(t, err) - newPointer := &pb.Pointer{} - err = pb.Unmarshal(oldPointerBytes, newPointer) - require.NoError(t, err) - newPointer.ExpirationDate = time.Now().Add(-1 * time.Hour) - newPointerBytes, err := pb.Marshal(newPointer) - require.NoError(t, err) - err = satellite.Metainfo.Database.CompareAndSwap(ctx, storage.Key(path1), oldPointerBytes, newPointerBytes) - require.NoError(t, err) + // move time into the future so segment1 is expired + audits.Verifier.SetNow(func() time.Time { + return time.Now().Add(2 * time.Hour) + }) - // reverify with path 2. Since the selected node was put in containment for path1, - // it should be audited for path1 - // since path1 has expired, we expect no failure and we expect that the pointer has been deleted + // reverify with segment2. Since the selected node was put in containment for segment1, + // it should be audited for segment1 + // since segment1 has expired, we expect no failure and we expect that the segment has been deleted // and that the selected node has been removed from containment mode - report, err := audits.Verifier.Reverify(ctx, path2) + report, err := audits.Verifier.Reverify(ctx, queueSegment2) require.NoError(t, err) require.Len(t, report.Successes, 0) @@ -976,7 +999,7 @@ func TestReverifyExpired2(t *testing.T) { // Reverify should remove the node from containment mode _, err = containment.Get(ctx, pending.NodeID) - require.Error(t, err) + require.True(t, audit.ErrContainedNotFound.Has(err)) }) } @@ -1012,40 +1035,41 @@ func TestReverifySlowDownload(t *testing.T) { audits.Chore.Loop.TriggerWait() queue := audits.Queues.Fetch() - path, err := queue.Next() + queueSegment, err := queue.Next() require.NoError(t, err) - pointer, err := satellite.Metainfo.Service.Get(ctx, metabase.SegmentKey(path)) + segment, err := satellite.Metainfo.Metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{ + StreamID: queueSegment.StreamID, + Position: queueSegment.Position, + }) require.NoError(t, err) - slowPiece := pointer.Remote.RemotePieces[0] - slowNode := slowPiece.NodeId + slowPiece := segment.Pieces[0] + slowNode := slowPiece.StorageNode - randomIndex, err := audit.GetRandomStripe(ctx, pointer) + randomIndex, err := audit.GetRandomStripe(ctx, segment) require.NoError(t, err) orders := satellite.Orders.Service containment := satellite.DB.Containment() - bucket := metabase.BucketLocation{ProjectID: ul.Projects[0].ID, BucketName: "testbucket"} - shareSize := pointer.GetRemote().GetRedundancy().GetErasureShareSize() + shareSize := segment.Redundancy.ShareSize + rootPieceID := segment.RootPieceID - pieces := pointer.GetRemote().GetRemotePieces() - rootPieceID := pointer.GetRemote().RootPieceId - limit, privateKey, cachedIPAndPort, err := orders.CreateAuditOrderLimit(ctx, bucket, slowNode, slowPiece.PieceNum, rootPieceID, shareSize) + limit, privateKey, cachedIPAndPort, err := orders.CreateAuditOrderLimit(ctx, queueSegment.Bucket(), slowNode, slowPiece.Number, rootPieceID, shareSize) require.NoError(t, err) - share, err := audits.Verifier.GetShare(ctx, limit, privateKey, cachedIPAndPort, randomIndex, shareSize, int(pieces[0].PieceNum)) + share, err := audits.Verifier.GetShare(ctx, limit, privateKey, cachedIPAndPort, randomIndex, shareSize, int(slowPiece.Number)) require.NoError(t, err) pending := &audit.PendingAudit{ NodeID: slowNode, - PieceID: pointer.Remote.RootPieceId, + PieceID: rootPieceID, StripeIndex: randomIndex, ShareSize: shareSize, ExpectedShareHash: pkcrypto.SHA256Hash(share.Data), ReverifyCount: 0, - Path: path, + Segment: queueSegment.SegmentLocation, } err = containment.IncrementPending(ctx, pending) @@ -1057,7 +1081,7 @@ func TestReverifySlowDownload(t *testing.T) { delay := 1 * time.Second slowNodeDB.SetLatency(delay) - report, err := audits.Verifier.Reverify(ctx, path) + report, err := audits.Verifier.Reverify(ctx, queueSegment) require.NoError(t, err) require.Len(t, report.Successes, 0) @@ -1096,40 +1120,41 @@ func TestReverifyUnknownError(t *testing.T) { audits.Chore.Loop.TriggerWait() queue := audits.Queues.Fetch() - path, err := queue.Next() + queueSegment, err := queue.Next() require.NoError(t, err) - pointer, err := satellite.Metainfo.Service.Get(ctx, metabase.SegmentKey(path)) + segment, err := satellite.Metainfo.Metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{ + StreamID: queueSegment.StreamID, + Position: queueSegment.Position, + }) require.NoError(t, err) - badPiece := pointer.Remote.RemotePieces[0] - badNode := badPiece.NodeId + badPiece := segment.Pieces[0] + badNode := badPiece.StorageNode - randomIndex, err := audit.GetRandomStripe(ctx, pointer) + randomIndex, err := audit.GetRandomStripe(ctx, segment) require.NoError(t, err) orders := satellite.Orders.Service containment := satellite.DB.Containment() - bucket := metabase.BucketLocation{ProjectID: ul.Projects[0].ID, BucketName: "testbucket"} - shareSize := pointer.GetRemote().GetRedundancy().GetErasureShareSize() + shareSize := segment.Redundancy.ShareSize + rootPieceID := segment.RootPieceID - pieces := pointer.GetRemote().GetRemotePieces() - rootPieceID := pointer.GetRemote().RootPieceId - limit, privateKey, cachedIPAndPort, err := orders.CreateAuditOrderLimit(ctx, bucket, badNode, badPiece.PieceNum, rootPieceID, shareSize) + limit, privateKey, cachedIPAndPort, err := orders.CreateAuditOrderLimit(ctx, queueSegment.Bucket(), badNode, badPiece.Number, rootPieceID, shareSize) require.NoError(t, err) - share, err := audits.Verifier.GetShare(ctx, limit, privateKey, cachedIPAndPort, randomIndex, shareSize, int(pieces[0].PieceNum)) + share, err := audits.Verifier.GetShare(ctx, limit, privateKey, cachedIPAndPort, randomIndex, shareSize, int(badPiece.Number)) require.NoError(t, err) pending := &audit.PendingAudit{ NodeID: badNode, - PieceID: pointer.Remote.RootPieceId, + PieceID: rootPieceID, StripeIndex: randomIndex, ShareSize: shareSize, ExpectedShareHash: pkcrypto.SHA256Hash(share.Data), ReverifyCount: 0, - Path: path, + Segment: queueSegment.SegmentLocation, } err = containment.IncrementPending(ctx, pending) @@ -1140,7 +1165,7 @@ func TestReverifyUnknownError(t *testing.T) { // return an error when the satellite requests a share badNodeDB.SetError(errs.New("unknown error")) - report, err := audits.Verifier.Reverify(ctx, path) + report, err := audits.Verifier.Reverify(ctx, queueSegment) require.NoError(t, err) require.Len(t, report.Successes, 0) @@ -1150,8 +1175,8 @@ func TestReverifyUnknownError(t *testing.T) { require.Len(t, report.Unknown, 1) require.Equal(t, report.Unknown[0], badNode) - // record audit - _, err = audits.Reporter.RecordAudits(ctx, report, path) + // record audit + _, err = audits.Reporter.RecordAudits(ctx, report) require.NoError(t, err) // make sure that pending audit is removed by the reporter when audit is recorded diff --git a/satellite/audit/verifier.go b/satellite/audit/verifier.go index 8a95ac0e5..281acb99a 100644 --- a/satellite/audit/verifier.go +++ b/satellite/audit/verifier.go @@ -27,7 +27,6 @@ import ( "storj.io/storj/satellite/metainfo/metabase" "storj.io/storj/satellite/orders" "storj.io/storj/satellite/overlay" - "storj.io/uplink/private/eestream" "storj.io/uplink/private/piecestore" ) @@ -55,7 +54,7 @@ type Share struct { // architecture: Worker type Verifier struct { log *zap.Logger - metainfo *metainfo.Service + metabase metainfo.MetabaseDB orders *orders.Service auditor *identity.PeerIdentity dialer rpc.Dialer @@ -64,14 +63,15 @@ type Verifier struct { minBytesPerSecond memory.Size minDownloadTimeout time.Duration + nowFn func() time.Time OnTestingCheckSegmentAlteredHook func() } // NewVerifier creates a Verifier. -func NewVerifier(log *zap.Logger, metainfo *metainfo.Service, dialer rpc.Dialer, overlay *overlay.Service, containment Containment, orders *orders.Service, id *identity.FullIdentity, minBytesPerSecond memory.Size, minDownloadTimeout time.Duration) *Verifier { +func NewVerifier(log *zap.Logger, metabase metainfo.MetabaseDB, dialer rpc.Dialer, overlay *overlay.Service, containment Containment, orders *orders.Service, id *identity.FullIdentity, minBytesPerSecond memory.Size, minDownloadTimeout time.Duration) *Verifier { return &Verifier{ log: log, - metainfo: metainfo, + metabase: metabase, orders: orders, auditor: id.PeerIdentity(), dialer: dialer, @@ -79,33 +79,32 @@ func NewVerifier(log *zap.Logger, metainfo *metainfo.Service, dialer rpc.Dialer, containment: containment, minBytesPerSecond: minBytesPerSecond, minDownloadTimeout: minDownloadTimeout, + nowFn: time.Now, } } // Verify downloads shares then verifies the data correctness at a random stripe. -func (verifier *Verifier) Verify(ctx context.Context, path storj.Path, skip map[storj.NodeID]bool) (report Report, err error) { +func (verifier *Verifier) Verify(ctx context.Context, segment Segment, skip map[storj.NodeID]bool) (report Report, err error) { defer mon.Task()(&ctx)(&err) - pointerBytes, pointer, err := verifier.metainfo.GetWithBytes(ctx, metabase.SegmentKey(path)) + if segment.Expired(verifier.nowFn()) { + verifier.log.Debug("segment expired before Verify") + return Report{}, nil + } + + segmentInfo, err := verifier.metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{ + StreamID: segment.StreamID, + Position: segment.Position, + }) if err != nil { - if storj.ErrObjectNotFound.Has(err) { + if metabase.ErrSegmentNotFound.Has(err) { verifier.log.Debug("segment deleted before Verify") return Report{}, nil } return Report{}, err } - if !pointer.ExpirationDate.IsZero() && pointer.ExpirationDate.Before(time.Now()) { - verifier.log.Debug("segment expired before Verify") - return Report{}, nil - } - randomIndex, err := GetRandomStripe(ctx, pointer) - if err != nil { - return Report{}, err - } - - shareSize := pointer.GetRemote().GetRedundancy().GetErasureShareSize() - segmentLocation, err := metabase.ParseSegmentKey(metabase.SegmentKey(path)) + randomIndex, err := GetRandomStripe(ctx, segmentInfo) if err != nil { return Report{}, err } @@ -116,27 +115,27 @@ func (verifier *Verifier) Verify(ctx context.Context, path storj.Path, skip map[ containedNodes := make(map[int]storj.NodeID) sharesToAudit := make(map[int]Share) - orderLimits, privateKey, cachedIPsAndPorts, err := verifier.orders.CreateAuditOrderLimits(ctx, segmentLocation.Bucket(), pointer, skip) + orderLimits, privateKey, cachedIPsAndPorts, err := verifier.orders.CreateAuditOrderLimits(ctx, segment.Bucket(), segmentInfo, skip) if err != nil { return Report{}, err } // NOTE offlineNodes will include disqualified nodes because they aren't in // the skip list - offlineNodes = getOfflineNodes(pointer, orderLimits, skip) + offlineNodes = getOfflineNodes(segmentInfo, orderLimits, skip) if len(offlineNodes) > 0 { verifier.log.Debug("Verify: order limits not created for some nodes (offline/disqualified)", zap.Strings("Node IDs", offlineNodes.Strings())) } - shares, err := verifier.DownloadShares(ctx, orderLimits, privateKey, cachedIPsAndPorts, randomIndex, shareSize) + shares, err := verifier.DownloadShares(ctx, orderLimits, privateKey, cachedIPsAndPorts, randomIndex, segmentInfo.Redundancy.ShareSize) if err != nil { return Report{ Offlines: offlineNodes, }, err } - err = verifier.checkIfSegmentAltered(ctx, path, pointer, pointerBytes) + err = verifier.checkIfSegmentAltered(ctx, segment.SegmentLocation, segmentInfo) if err != nil { if ErrSegmentDeleted.Has(err) { verifier.log.Debug("segment deleted during Verify") @@ -209,10 +208,10 @@ func (verifier *Verifier) Verify(ctx context.Context, path storj.Path, skip map[ mon.IntVal("verify_shares_downloaded_successfully").Observe(int64(len(sharesToAudit))) //mon:locked - required := int(pointer.Remote.Redundancy.GetMinReq()) - total := int(pointer.Remote.Redundancy.GetTotal()) + required := segmentInfo.Redundancy.RequiredShares + total := segmentInfo.Redundancy.TotalShares - if len(sharesToAudit) < required { + if len(sharesToAudit) < int(required) { mon.Counter("not_enough_shares_for_audit").Inc(1) return Report{ Fails: failedNodes, @@ -238,14 +237,14 @@ func (verifier *Verifier) Verify(ctx context.Context, path storj.Path, skip map[ successNodes := getSuccessNodes(ctx, shares, failedNodes, offlineNodes, unknownNodes, containedNodes) - totalInPointer := len(pointer.GetRemote().GetRemotePieces()) + totalInSegment := len(segmentInfo.Pieces) numOffline := len(offlineNodes) numSuccessful := len(successNodes) numFailed := len(failedNodes) numContained := len(containedNodes) numUnknown := len(unknownNodes) totalAudited := numSuccessful + numFailed + numOffline + numContained - auditedPercentage := float64(totalAudited) / float64(totalInPointer) + auditedPercentage := float64(totalAudited) / float64(totalInSegment) offlinePercentage := float64(0) successfulPercentage := float64(0) failedPercentage := float64(0) @@ -265,7 +264,7 @@ func (verifier *Verifier) Verify(ctx context.Context, path storj.Path, skip map[ mon.Meter("audit_contained_nodes_global").Mark(numContained) //mon:locked mon.Meter("audit_unknown_nodes_global").Mark(numUnknown) //mon:locked mon.Meter("audit_total_nodes_global").Mark(totalAudited) //mon:locked - mon.Meter("audit_total_pointer_nodes_global").Mark(totalInPointer) //mon:locked + mon.Meter("audit_total_pointer_nodes_global").Mark(totalInSegment) //mon:locked mon.IntVal("audit_success_nodes").Observe(int64(numSuccessful)) //mon:locked mon.IntVal("audit_fail_nodes").Observe(int64(numFailed)) //mon:locked @@ -273,7 +272,7 @@ func (verifier *Verifier) Verify(ctx context.Context, path storj.Path, skip map[ mon.IntVal("audit_contained_nodes").Observe(int64(numContained)) //mon:locked mon.IntVal("audit_unknown_nodes").Observe(int64(numUnknown)) //mon:locked mon.IntVal("audit_total_nodes").Observe(int64(totalAudited)) //mon:locked - mon.IntVal("audit_total_pointer_nodes").Observe(int64(totalInPointer)) //mon:locked + mon.IntVal("audit_total_pointer_nodes").Observe(int64(totalInSegment)) //mon:locked mon.FloatVal("audited_percentage").Observe(auditedPercentage) //mon:locked mon.FloatVal("audit_offline_percentage").Observe(offlinePercentage) //mon:locked mon.FloatVal("audit_successful_percentage").Observe(successfulPercentage) //mon:locked @@ -281,7 +280,7 @@ func (verifier *Verifier) Verify(ctx context.Context, path storj.Path, skip map[ mon.FloatVal("audit_contained_percentage").Observe(containedPercentage) //mon:locked mon.FloatVal("audit_unknown_percentage").Observe(unknownPercentage) //mon:locked - pendingAudits, err := createPendingAudits(ctx, containedNodes, correctedShares, pointer, randomIndex, path) + pendingAudits, err := createPendingAudits(ctx, containedNodes, correctedShares, segment, segmentInfo, randomIndex) if err != nil { return Report{ Successes: successNodes, @@ -301,7 +300,7 @@ func (verifier *Verifier) Verify(ctx context.Context, path storj.Path, skip map[ } // DownloadShares downloads shares from the nodes where remote pieces are located. -func (verifier *Verifier) DownloadShares(ctx context.Context, limits []*pb.AddressedOrderLimit, piecePrivateKey storj.PiecePrivateKey, cachedIPsAndPorts map[storj.NodeID]string, stripeIndex int64, shareSize int32) (shares map[int]Share, err error) { +func (verifier *Verifier) DownloadShares(ctx context.Context, limits []*pb.AddressedOrderLimit, piecePrivateKey storj.PiecePrivateKey, cachedIPsAndPorts map[storj.NodeID]string, stripeIndex int32, shareSize int32) (shares map[int]Share, err error) { defer mon.Task()(&ctx)(&err) shares = make(map[int]Share, len(limits)) @@ -339,7 +338,7 @@ func (verifier *Verifier) DownloadShares(ctx context.Context, limits []*pb.Addre } // Reverify reverifies the contained nodes in the stripe. -func (verifier *Verifier) Reverify(ctx context.Context, path storj.Path) (report Report, err error) { +func (verifier *Verifier) Reverify(ctx context.Context, segment Segment) (report Report, err error) { defer mon.Task()(&ctx)(&err) // result status enum @@ -360,38 +359,45 @@ func (verifier *Verifier) Reverify(ctx context.Context, path storj.Path) (report err error } - pointer, err := verifier.metainfo.Get(ctx, metabase.SegmentKey(path)) + if segment.Expired(verifier.nowFn()) { + verifier.log.Debug("segment expired before Reverify") + return Report{}, nil + } + + segmentInfo, err := verifier.metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{ + StreamID: segment.StreamID, + Position: segment.Position, + }) if err != nil { - if storj.ErrObjectNotFound.Has(err) { + if metabase.ErrSegmentNotFound.Has(err) { verifier.log.Debug("segment deleted before Reverify") return Report{}, nil } return Report{}, err } - if !pointer.ExpirationDate.IsZero() && pointer.ExpirationDate.Before(time.Now()) { - verifier.log.Debug("Segment expired before Reverify") - return Report{}, nil - } - pieces := pointer.GetRemote().GetRemotePieces() + pieces := segmentInfo.Pieces ch := make(chan result, len(pieces)) var containedInSegment int64 for _, piece := range pieces { - pending, err := verifier.containment.Get(ctx, piece.NodeId) + pending, err := verifier.containment.Get(ctx, piece.StorageNode) if err != nil { if ErrContainedNotFound.Has(err) { - ch <- result{nodeID: piece.NodeId, status: skipped} + ch <- result{nodeID: piece.StorageNode, status: skipped} continue } - ch <- result{nodeID: piece.NodeId, status: erred, err: err} - verifier.log.Debug("Reverify: error getting from containment db", zap.Stringer("Node ID", piece.NodeId), zap.Error(err)) + ch <- result{nodeID: piece.StorageNode, status: erred, err: err} + verifier.log.Debug("Reverify: error getting from containment db", zap.Stringer("Node ID", piece.StorageNode), zap.Error(err)) continue } containedInSegment++ go func(pending *PendingAudit) { - pendingPointerBytes, pendingPointer, err := verifier.metainfo.GetWithBytes(ctx, metabase.SegmentKey(pending.Path)) + // TODO: Get the exact version of the object. But where to take the version from the pending audit? + pendingObject, err := verifier.metabase.GetObjectLatestVersion(ctx, metabase.GetObjectLatestVersion{ + ObjectLocation: pending.Segment.Object(), + }) if err != nil { if storj.ErrObjectNotFound.Has(err) { ch <- result{nodeID: pending.NodeID, status: skipped} @@ -399,24 +405,41 @@ func (verifier *Verifier) Reverify(ctx context.Context, path storj.Path) (report } ch <- result{nodeID: pending.NodeID, status: erred, err: err} - verifier.log.Debug("Reverify: error getting pending pointer from metainfo", zap.Stringer("Node ID", pending.NodeID), zap.Error(err)) + verifier.log.Debug("Reverify: error getting pending segment's object from metabase", zap.Stringer("Node ID", pending.NodeID), zap.Error(err)) return } - if !pendingPointer.ExpirationDate.IsZero() && pendingPointer.ExpirationDate.Before(time.Now().UTC()) { + + if pendingObject.ExpiresAt != nil && !pendingObject.ExpiresAt.IsZero() && pendingObject.ExpiresAt.Before(verifier.nowFn()) { verifier.log.Debug("Reverify: segment already expired", zap.Stringer("Node ID", pending.NodeID)) ch <- result{nodeID: pending.NodeID, status: skipped} return } - if pendingPointer.GetRemote().RootPieceId != pending.PieceID { + pendingSegmentInfo, err := verifier.metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{ + StreamID: pendingObject.StreamID, + Position: pending.Segment.Position, + }) + if err != nil { + if metabase.ErrSegmentNotFound.Has(err) { + ch <- result{nodeID: pending.NodeID, status: skipped} + return + } + + ch <- result{nodeID: pending.NodeID, status: erred, err: err} + verifier.log.Debug("Reverify: error getting pending segment from metabase", zap.Stringer("Node ID", pending.NodeID), zap.Error(err)) + return + } + + // TODO: is this check still necessary? If the segment was found by its StreamID and position, the RootPieceID should not had changed. + if pendingSegmentInfo.RootPieceID != pending.PieceID { ch <- result{nodeID: pending.NodeID, status: skipped} return } - var pieceNum int32 + var pieceNum uint16 found := false - for _, piece := range pendingPointer.GetRemote().GetRemotePieces() { - if piece.NodeId == pending.NodeID { - pieceNum = piece.PieceNum + for _, piece := range pendingSegmentInfo.Pieces { + if piece.StorageNode == pending.NodeID { + pieceNum = piece.Number found = true } } @@ -425,13 +448,7 @@ func (verifier *Verifier) Reverify(ctx context.Context, path storj.Path) (report return } - segmentLocation, err := metabase.ParseSegmentKey(metabase.SegmentKey(pending.Path)) // TODO: this should be parsed in pending - if err != nil { - ch <- result{nodeID: pending.NodeID, status: skipped} - return - } - - limit, piecePrivateKey, cachedIPAndPort, err := verifier.orders.CreateAuditOrderLimit(ctx, segmentLocation.Bucket(), pending.NodeID, pieceNum, pending.PieceID, pending.ShareSize) + limit, piecePrivateKey, cachedIPAndPort, err := verifier.orders.CreateAuditOrderLimit(ctx, pending.Segment.Bucket(), pending.NodeID, pieceNum, pending.PieceID, pending.ShareSize) if err != nil { if overlay.ErrNodeDisqualified.Has(err) { _, errDelete := verifier.containment.Delete(ctx, pending.NodeID) @@ -497,8 +514,8 @@ func (verifier *Verifier) Reverify(ctx context.Context, path storj.Path) (report return } if errs2.IsRPC(err, rpcstatus.NotFound) { - // Get the original segment pointer in the metainfo - err := verifier.checkIfSegmentAltered(ctx, pending.Path, pendingPointer, pendingPointerBytes) + // Get the original segment + err := verifier.checkIfSegmentAltered(ctx, pending.Segment, pendingSegmentInfo) if err != nil { ch <- result{nodeID: pending.NodeID, status: skipped} verifier.log.Debug("Reverify: audit source changed before reverification", zap.Stringer("Node ID", pending.NodeID), zap.Error(err)) @@ -525,7 +542,7 @@ func (verifier *Verifier) Reverify(ctx context.Context, path storj.Path) (report ch <- result{nodeID: pending.NodeID, status: success} verifier.log.Info("Reverify: hashes match (audit success)", zap.Stringer("Node ID", pending.NodeID)) } else { - err := verifier.checkIfSegmentAltered(ctx, pending.Path, pendingPointer, pendingPointerBytes) + err := verifier.checkIfSegmentAltered(ctx, pending.Segment, pendingSegmentInfo) if err != nil { ch <- result{nodeID: pending.NodeID, status: skipped} verifier.log.Debug("Reverify: audit source changed before reverification", zap.Stringer("Node ID", pending.NodeID), zap.Error(err)) @@ -580,7 +597,7 @@ func (verifier *Verifier) Reverify(ctx context.Context, path storj.Path) (report } // GetShare use piece store client to download shares from nodes. -func (verifier *Verifier) GetShare(ctx context.Context, limit *pb.AddressedOrderLimit, piecePrivateKey storj.PiecePrivateKey, cachedIPAndPort string, stripeIndex int64, shareSize int32, pieceNum int) (share Share, err error) { +func (verifier *Verifier) GetShare(ctx context.Context, limit *pb.AddressedOrderLimit, piecePrivateKey storj.PiecePrivateKey, cachedIPAndPort string, stripeIndex, shareSize int32, pieceNum int) (share Share, err error) { defer mon.Task()(&ctx)(&err) bandwidthMsgSize := shareSize @@ -632,7 +649,7 @@ func (verifier *Verifier) GetShare(ctx context.Context, limit *pb.AddressedOrder } }() - offset := int64(shareSize) * stripeIndex + offset := int64(shareSize) * int64(stripeIndex) downloader, err := ps.Download(timedCtx, limit.GetLimit(), piecePrivateKey, offset, int64(shareSize)) if err != nil { @@ -654,38 +671,42 @@ func (verifier *Verifier) GetShare(ctx context.Context, limit *pb.AddressedOrder }, nil } -// checkIfSegmentAltered checks if path's pointer has been altered since path was selected. -func (verifier *Verifier) checkIfSegmentAltered(ctx context.Context, segmentKey string, oldPointer *pb.Pointer, oldPointerBytes []byte) (err error) { +// checkIfSegmentAltered checks if oldSegment has been altered since it was selected for audit. +func (verifier *Verifier) checkIfSegmentAltered(ctx context.Context, location metabase.SegmentLocation, oldSegment metabase.Segment) (err error) { defer mon.Task()(&ctx)(&err) if verifier.OnTestingCheckSegmentAlteredHook != nil { verifier.OnTestingCheckSegmentAlteredHook() } - newPointerBytes, newPointer, err := verifier.metainfo.GetWithBytes(ctx, metabase.SegmentKey(segmentKey)) + newSegment, err := verifier.metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{ + StreamID: oldSegment.StreamID, + Position: oldSegment.Position, + }) if err != nil { - if storj.ErrObjectNotFound.Has(err) { - return ErrSegmentDeleted.New("%q", segmentKey) + if metabase.ErrSegmentNotFound.Has(err) { + return ErrSegmentDeleted.New("%q", location.Encode()) } return err } - if oldPointer != nil && oldPointer.CreationDate != newPointer.CreationDate { - return ErrSegmentDeleted.New("%q", segmentKey) - } - - if !bytes.Equal(oldPointerBytes, newPointerBytes) { - return ErrSegmentModified.New("%q", segmentKey) + if !oldSegment.Pieces.Equal(newSegment.Pieces) { + return ErrSegmentModified.New("%q", location.Encode()) } return nil } +// SetNow allows tests to have the server act as if the current time is whatever they want. +func (verifier *Verifier) SetNow(nowFn func() time.Time) { + verifier.nowFn = nowFn +} + // auditShares takes the downloaded shares and uses infectious's Correct function to check that they // haven't been altered. auditShares returns a slice containing the piece numbers of altered shares, // and a slice of the corrected shares. -func auditShares(ctx context.Context, required, total int, originals map[int]Share) (pieceNums []int, corrected []infectious.Share, err error) { +func auditShares(ctx context.Context, required, total int16, originals map[int]Share) (pieceNums []int, corrected []infectious.Share, err error) { defer mon.Task()(&ctx)(&err) - f, err := infectious.NewFEC(required, total) + f, err := infectious.NewFEC(int(required), int(total)) if err != nil { return nil, nil, err } @@ -720,9 +741,9 @@ func makeCopies(ctx context.Context, originals map[int]Share) (copies []infectio return copies, nil } -// getOfflines nodes returns these storage nodes from pointer which have no +// getOfflines nodes returns these storage nodes from the segment which have no // order limit nor are skipped. -func getOfflineNodes(pointer *pb.Pointer, limits []*pb.AddressedOrderLimit, skip map[storj.NodeID]bool) storj.NodeIDList { +func getOfflineNodes(segment metabase.Segment, limits []*pb.AddressedOrderLimit, skip map[storj.NodeID]bool) storj.NodeIDList { var offlines storj.NodeIDList nodesWithLimit := make(map[storj.NodeID]bool, len(limits)) @@ -732,9 +753,9 @@ func getOfflineNodes(pointer *pb.Pointer, limits []*pb.AddressedOrderLimit, skip } } - for _, piece := range pointer.GetRemote().GetRemotePieces() { - if !nodesWithLimit[piece.NodeId] && !skip[piece.NodeId] { - offlines = append(offlines, piece.NodeId) + for _, piece := range segment.Pieces { + if !nodesWithLimit[piece.StorageNode] && !skip[piece.StorageNode] { + offlines = append(offlines, piece.StorageNode) } } @@ -767,17 +788,16 @@ func getSuccessNodes(ctx context.Context, shares map[int]Share, failedNodes, off return successNodes } -func createPendingAudits(ctx context.Context, containedNodes map[int]storj.NodeID, correctedShares []infectious.Share, pointer *pb.Pointer, randomIndex int64, path storj.Path) (pending []*PendingAudit, err error) { +func createPendingAudits(ctx context.Context, containedNodes map[int]storj.NodeID, correctedShares []infectious.Share, segment Segment, segmentInfo metabase.Segment, randomIndex int32) (pending []*PendingAudit, err error) { defer mon.Task()(&ctx)(&err) if len(containedNodes) == 0 { return nil, nil } - redundancy := pointer.GetRemote().GetRedundancy() - required := int(redundancy.GetMinReq()) - total := int(redundancy.GetTotal()) - shareSize := redundancy.GetErasureShareSize() + required := int(segmentInfo.Redundancy.RequiredShares) + total := int(segmentInfo.Redundancy.TotalShares) + shareSize := segmentInfo.Redundancy.ShareSize fec, err := infectious.NewFEC(required, total) if err != nil { @@ -797,11 +817,11 @@ func createPendingAudits(ctx context.Context, containedNodes map[int]storj.NodeI } pending = append(pending, &PendingAudit{ NodeID: nodeID, - PieceID: pointer.GetRemote().RootPieceId, + PieceID: segmentInfo.RootPieceID, StripeIndex: randomIndex, ShareSize: shareSize, ExpectedShareHash: pkcrypto.SHA256Hash(share), - Path: path, + Segment: segment.SegmentLocation, }) } @@ -820,23 +840,19 @@ func rebuildStripe(ctx context.Context, fec *infectious.FEC, corrected []infecti return stripe, nil } -// GetRandomStripe takes a pointer and returns a random stripe index within that pointer. -func GetRandomStripe(ctx context.Context, pointer *pb.Pointer) (index int64, err error) { +// GetRandomStripe takes a segment and returns a random stripe index within that segment. +func GetRandomStripe(ctx context.Context, segment metabase.Segment) (index int32, err error) { defer mon.Task()(&ctx)(&err) - redundancy, err := eestream.NewRedundancyStrategyFromProto(pointer.GetRemote().GetRedundancy()) - if err != nil { - return 0, err - } // the last segment could be smaller than stripe size - if pointer.GetSegmentSize() < int64(redundancy.StripeSize()) { + if segment.EncryptedSize < segment.Redundancy.StripeSize() { return 0, nil } var src cryptoSource rnd := rand.New(src) - numStripes := pointer.GetSegmentSize() / int64(redundancy.StripeSize()) - randomStripeIndex := rnd.Int63n(numStripes) + numStripes := segment.EncryptedSize / segment.Redundancy.StripeSize() + randomStripeIndex := rnd.Int31n(numStripes) return randomStripeIndex, nil } diff --git a/satellite/audit/verifier_test.go b/satellite/audit/verifier_test.go index fc67b4448..c2472023f 100644 --- a/satellite/audit/verifier_test.go +++ b/satellite/audit/verifier_test.go @@ -15,7 +15,6 @@ import ( "storj.io/common/errs2" "storj.io/common/memory" - "storj.io/common/pb" "storj.io/common/peertls/tlsopts" "storj.io/common/rpc" "storj.io/common/rpc/rpcstatus" @@ -27,7 +26,6 @@ import ( "storj.io/storj/satellite" "storj.io/storj/satellite/audit" "storj.io/storj/satellite/metainfo/metabase" - "storj.io/storj/storage" "storj.io/storj/storagenode" ) @@ -50,21 +48,23 @@ func TestDownloadSharesHappyPath(t *testing.T) { err := uplink.Upload(ctx, satellite, "testbucket", "test/path", testData) require.NoError(t, err) - bucket := metabase.BucketLocation{ProjectID: uplink.Projects[0].ID, BucketName: "testbucket"} - audits.Chore.Loop.TriggerWait() queue := audits.Queues.Fetch() - path, err := queue.Next() + queueSegment, err := queue.Next() require.NoError(t, err) - pointer, err := satellite.Metainfo.Service.Get(ctx, metabase.SegmentKey(path)) + segment, err := satellite.Metainfo.Metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{ + StreamID: queueSegment.StreamID, + Position: queueSegment.Position, + }) require.NoError(t, err) - randomIndex, err := audit.GetRandomStripe(ctx, pointer) + randomIndex, err := audit.GetRandomStripe(ctx, segment) require.NoError(t, err) - shareSize := pointer.GetRemote().GetRedundancy().GetErasureShareSize() - limits, privateKey, cachedIPsAndPorts, err := satellite.Orders.Service.CreateAuditOrderLimits(ctx, bucket, pointer, nil) + shareSize := segment.Redundancy.ShareSize + + limits, privateKey, cachedIPsAndPorts, err := satellite.Orders.Service.CreateAuditOrderLimits(ctx, queueSegment.Bucket(), segment, nil) require.NoError(t, err) shares, err := audits.Verifier.DownloadShares(ctx, limits, privateKey, cachedIPsAndPorts, randomIndex, shareSize) @@ -101,25 +101,27 @@ func TestDownloadSharesOfflineNode(t *testing.T) { err := uplink.Upload(ctx, satellite, "testbucket", "test/path", testData) require.NoError(t, err) - bucket := metabase.BucketLocation{ProjectID: uplink.Projects[0].ID, BucketName: "testbucket"} - audits.Chore.Loop.TriggerWait() queue := audits.Queues.Fetch() - path, err := queue.Next() + queueSegment, err := queue.Next() require.NoError(t, err) - pointer, err := satellite.Metainfo.Service.Get(ctx, metabase.SegmentKey(path)) + segment, err := satellite.Metainfo.Metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{ + StreamID: queueSegment.StreamID, + Position: queueSegment.Position, + }) require.NoError(t, err) - randomIndex, err := audit.GetRandomStripe(ctx, pointer) + randomIndex, err := audit.GetRandomStripe(ctx, segment) require.NoError(t, err) - shareSize := pointer.GetRemote().GetRedundancy().GetErasureShareSize() - limits, privateKey, cachedIPsAndPorts, err := satellite.Orders.Service.CreateAuditOrderLimits(ctx, bucket, pointer, nil) + shareSize := segment.Redundancy.ShareSize + + limits, privateKey, cachedIPsAndPorts, err := satellite.Orders.Service.CreateAuditOrderLimits(ctx, queueSegment.Bucket(), segment, nil) require.NoError(t, err) - // stop the first node in the pointer - stoppedNodeID := pointer.GetRemote().GetRemotePieces()[0].NodeId + // stop the first node in the segment + stoppedNodeID := segment.Pieces[0].StorageNode err = planet.StopNodeAndUpdate(ctx, planet.FindNode(stoppedNodeID)) require.NoError(t, err) @@ -162,23 +164,25 @@ func TestDownloadSharesMissingPiece(t *testing.T) { audits.Chore.Loop.TriggerWait() queue := audits.Queues.Fetch() - path, err := queue.Next() + queueSegment, err := queue.Next() require.NoError(t, err) - pointer, err := satellite.Metainfo.Service.Get(ctx, metabase.SegmentKey(path)) + segment, err := satellite.Metainfo.Metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{ + StreamID: queueSegment.StreamID, + Position: queueSegment.Position, + }) require.NoError(t, err) - randomIndex, err := audit.GetRandomStripe(ctx, pointer) + randomIndex, err := audit.GetRandomStripe(ctx, segment) require.NoError(t, err) - bucket := metabase.BucketLocation{ProjectID: uplink.Projects[0].ID, BucketName: "testbucket"} - // replace the piece id of the selected stripe with a new random one // to simulate missing piece on the storage nodes - pointer.GetRemote().RootPieceId = storj.NewPieceID() + segment.RootPieceID = storj.NewPieceID() - shareSize := pointer.GetRemote().GetRedundancy().GetErasureShareSize() - limits, privateKey, cachedIPsAndPorts, err := satellite.Orders.Service.CreateAuditOrderLimits(ctx, bucket, pointer, nil) + shareSize := segment.Redundancy.ShareSize + + limits, privateKey, cachedIPsAndPorts, err := satellite.Orders.Service.CreateAuditOrderLimits(ctx, queueSegment.Bucket(), segment, nil) require.NoError(t, err) shares, err := audits.Verifier.DownloadShares(ctx, limits, privateKey, cachedIPsAndPorts, randomIndex, shareSize) @@ -217,17 +221,18 @@ func TestDownloadSharesDialTimeout(t *testing.T) { audits.Chore.Loop.TriggerWait() queue := audits.Queues.Fetch() - path, err := queue.Next() + queueSegment, err := queue.Next() require.NoError(t, err) - pointer, err := satellite.Metainfo.Service.Get(ctx, metabase.SegmentKey(path)) + segment, err := satellite.Metainfo.Metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{ + StreamID: queueSegment.StreamID, + Position: queueSegment.Position, + }) require.NoError(t, err) - randomIndex, err := audit.GetRandomStripe(ctx, pointer) + randomIndex, err := audit.GetRandomStripe(ctx, segment) require.NoError(t, err) - bucket := metabase.BucketLocation{ProjectID: upl.Projects[0].ID, BucketName: "testbucket"} - tlsOptions, err := tlsopts.NewOptions(satellite.Identity, tlsopts.Config{}, nil) require.NoError(t, err) @@ -245,7 +250,7 @@ func TestDownloadSharesDialTimeout(t *testing.T) { verifier := audit.NewVerifier( satellite.Log.Named("verifier"), - satellite.Metainfo.Service, + satellite.Metainfo.Metabase, dialer, satellite.Overlay.Service, satellite.DB.Containment(), @@ -254,8 +259,9 @@ func TestDownloadSharesDialTimeout(t *testing.T) { minBytesPerSecond, 5*time.Second) - shareSize := pointer.GetRemote().GetRedundancy().GetErasureShareSize() - limits, privateKey, cachedIPsAndPorts, err := satellite.Orders.Service.CreateAuditOrderLimits(ctx, bucket, pointer, nil) + shareSize := segment.Redundancy.ShareSize + + limits, privateKey, cachedIPsAndPorts, err := satellite.Orders.Service.CreateAuditOrderLimits(ctx, queueSegment.Bucket(), segment, nil) require.NoError(t, err) shares, err := verifier.DownloadShares(ctx, limits, privateKey, cachedIPsAndPorts, randomIndex, shareSize) @@ -299,17 +305,18 @@ func TestDownloadSharesDownloadTimeout(t *testing.T) { err := upl.Upload(ctx, satellite, "testbucket", "test/path", testData) require.NoError(t, err) - bucket := metabase.BucketLocation{ProjectID: upl.Projects[0].ID, BucketName: "testbucket"} - audits.Chore.Loop.TriggerWait() queue := audits.Queues.Fetch() - path, err := queue.Next() + queueSegment, err := queue.Next() require.NoError(t, err) - pointer, err := satellite.Metainfo.Service.Get(ctx, metabase.SegmentKey(path)) + segment, err := satellite.Metainfo.Metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{ + StreamID: queueSegment.StreamID, + Position: queueSegment.Position, + }) require.NoError(t, err) - randomIndex, err := audit.GetRandomStripe(ctx, pointer) + randomIndex, err := audit.GetRandomStripe(ctx, segment) require.NoError(t, err) // This config value will create a very short timeframe allowed for receiving @@ -318,7 +325,7 @@ func TestDownloadSharesDownloadTimeout(t *testing.T) { verifier := audit.NewVerifier( satellite.Log.Named("verifier"), - satellite.Metainfo.Service, + satellite.Metainfo.Metabase, satellite.Dialer, satellite.Overlay.Service, satellite.DB.Containment(), @@ -327,8 +334,9 @@ func TestDownloadSharesDownloadTimeout(t *testing.T) { minBytesPerSecond, 150*time.Millisecond) - shareSize := pointer.GetRemote().GetRedundancy().GetErasureShareSize() - limits, privateKey, cachedIPsAndPorts, err := satellite.Orders.Service.CreateAuditOrderLimits(ctx, bucket, pointer, nil) + shareSize := segment.Redundancy.ShareSize + + limits, privateKey, cachedIPsAndPorts, err := satellite.Orders.Service.CreateAuditOrderLimits(ctx, queueSegment.Bucket(), segment, nil) require.NoError(t, err) // make downloads on storage node slower than the timeout on the satellite for downloading shares @@ -363,16 +371,19 @@ func TestVerifierHappyPath(t *testing.T) { audits.Chore.Loop.TriggerWait() queue := audits.Queues.Fetch() - path, err := queue.Next() + queueSegment, err := queue.Next() require.NoError(t, err) - pointer, err := satellite.Metainfo.Service.Get(ctx, metabase.SegmentKey(path)) + segment, err := satellite.Metainfo.Metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{ + StreamID: queueSegment.StreamID, + Position: queueSegment.Position, + }) require.NoError(t, err) - report, err := audits.Verifier.Verify(ctx, path, nil) + report, err := audits.Verifier.Verify(ctx, queueSegment, nil) require.NoError(t, err) - assert.Len(t, report.Successes, len(pointer.GetRemote().GetRemotePieces())) + assert.Len(t, report.Successes, len(segment.Pieces)) assert.Len(t, report.Fails, 0) assert.Len(t, report.Offlines, 0) assert.Len(t, report.PendingAudits, 0) @@ -392,30 +403,21 @@ func TestVerifierExpired(t *testing.T) { ul := planet.Uplinks[0] testData := testrand.Bytes(8 * memory.KiB) - err := ul.Upload(ctx, satellite, "testbucket", "test/path", testData) + err := ul.UploadWithExpiration(ctx, satellite, "testbucket", "test/path", testData, time.Now().Add(1*time.Hour)) require.NoError(t, err) audits.Chore.Loop.TriggerWait() queue := audits.Queues.Fetch() - path, err := queue.Next() + queueSegment, err := queue.Next() require.NoError(t, err) - // set pointer's expiration date to be already expired - pointer, err := satellite.Metainfo.Service.Get(ctx, metabase.SegmentKey(path)) - require.NoError(t, err) - oldPointerBytes, err := pb.Marshal(pointer) - require.NoError(t, err) - newPointer := &pb.Pointer{} - err = pb.Unmarshal(oldPointerBytes, newPointer) - require.NoError(t, err) - newPointer.ExpirationDate = time.Now().Add(-1 * time.Hour) - newPointerBytes, err := pb.Marshal(newPointer) - require.NoError(t, err) - err = satellite.Metainfo.Database.CompareAndSwap(ctx, storage.Key(path), oldPointerBytes, newPointerBytes) - require.NoError(t, err) + // move time into the future so the segment is expired + audits.Verifier.SetNow(func() time.Time { + return time.Now().Add(2 * time.Hour) + }) // Verify should not return an error - report, err := audits.Verifier.Verify(ctx, path, nil) + report, err := audits.Verifier.Verify(ctx, queueSegment, nil) require.NoError(t, err) assert.Len(t, report.Successes, 0) @@ -444,21 +446,24 @@ func TestVerifierOfflineNode(t *testing.T) { audits.Chore.Loop.TriggerWait() queue := audits.Queues.Fetch() - path, err := queue.Next() + queueSegment, err := queue.Next() require.NoError(t, err) - pointer, err := satellite.Metainfo.Service.Get(ctx, metabase.SegmentKey(path)) + segment, err := satellite.Metainfo.Metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{ + StreamID: queueSegment.StreamID, + Position: queueSegment.Position, + }) require.NoError(t, err) - // stop the first node in the pointer - stoppedNodeID := pointer.GetRemote().GetRemotePieces()[0].NodeId + // stop the first node in the segment + stoppedNodeID := segment.Pieces[0].StorageNode err = planet.StopNodeAndUpdate(ctx, planet.FindNode(stoppedNodeID)) require.NoError(t, err) - report, err := audits.Verifier.Verify(ctx, path, nil) + report, err := audits.Verifier.Verify(ctx, queueSegment, nil) require.NoError(t, err) - assert.Len(t, report.Successes, len(pointer.GetRemote().GetRemotePieces())-1) + assert.Len(t, report.Successes, len(segment.Pieces)-1) assert.Len(t, report.Fails, 0) assert.Len(t, report.Offlines, 1) assert.Len(t, report.PendingAudits, 0) @@ -483,21 +488,24 @@ func TestVerifierMissingPiece(t *testing.T) { audits.Chore.Loop.TriggerWait() queue := audits.Queues.Fetch() - path, err := queue.Next() + queueSegment, err := queue.Next() require.NoError(t, err) - pointer, err := satellite.Metainfo.Service.Get(ctx, metabase.SegmentKey(path)) + segment, err := satellite.Metainfo.Metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{ + StreamID: queueSegment.StreamID, + Position: queueSegment.Position, + }) require.NoError(t, err) // delete the piece from the first node - origNumPieces := len(pointer.GetRemote().GetRemotePieces()) - piece := pointer.GetRemote().GetRemotePieces()[0] - pieceID := pointer.GetRemote().RootPieceId.Derive(piece.NodeId, piece.PieceNum) - node := planet.FindNode(piece.NodeId) + origNumPieces := len(segment.Pieces) + piece := segment.Pieces[0] + pieceID := segment.RootPieceID.Derive(piece.StorageNode, int32(piece.Number)) + node := planet.FindNode(piece.StorageNode) err = node.Storage2.Store.Delete(ctx, satellite.ID(), pieceID) require.NoError(t, err) - report, err := audits.Verifier.Verify(ctx, path, nil) + report, err := audits.Verifier.Verify(ctx, queueSegment, nil) require.NoError(t, err) assert.Len(t, report.Successes, origNumPieces-1) @@ -525,10 +533,13 @@ func TestVerifierDialTimeout(t *testing.T) { audits.Chore.Loop.TriggerWait() queue := audits.Queues.Fetch() - path, err := queue.Next() + queueSegment, err := queue.Next() require.NoError(t, err) - pointer, err := satellite.Metainfo.Service.Get(ctx, metabase.SegmentKey(path)) + segment, err := satellite.Metainfo.Metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{ + StreamID: queueSegment.StreamID, + Position: queueSegment.Position, + }) require.NoError(t, err) tlsOptions, err := tlsopts.NewOptions(satellite.Identity, tlsopts.Config{}, nil) @@ -548,7 +559,7 @@ func TestVerifierDialTimeout(t *testing.T) { verifier := audit.NewVerifier( satellite.Log.Named("verifier"), - satellite.Metainfo.Service, + satellite.Metainfo.Metabase, dialer, satellite.Overlay.Service, satellite.DB.Containment(), @@ -557,12 +568,12 @@ func TestVerifierDialTimeout(t *testing.T) { minBytesPerSecond, 5*time.Second) - report, err := verifier.Verify(ctx, path, nil) + report, err := verifier.Verify(ctx, queueSegment, nil) require.True(t, audit.ErrNotEnoughShares.Has(err), "unexpected error: %+v", err) assert.Len(t, report.Successes, 0) assert.Len(t, report.Fails, 0) - assert.Len(t, report.Offlines, len(pointer.GetRemote().GetRemotePieces())) + assert.Len(t, report.Offlines, len(segment.Pieces)) assert.Len(t, report.PendingAudits, 0) }) } @@ -585,7 +596,7 @@ func TestVerifierDeletedSegment(t *testing.T) { audits.Chore.Loop.TriggerWait() queue := audits.Queues.Fetch() - path, err := queue.Next() + segment, err := queue.Next() require.NoError(t, err) // delete the file @@ -593,7 +604,7 @@ func TestVerifierDeletedSegment(t *testing.T) { require.NoError(t, err) // Verify should not return an error, but report should be empty - report, err := audits.Verifier.Verify(ctx, path, nil) + report, err := audits.Verifier.Verify(ctx, segment, nil) require.NoError(t, err) assert.Empty(t, report) }) @@ -605,7 +616,6 @@ func TestVerifierModifiedSegment(t *testing.T) { }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { satellite := planet.Satellites[0] audits := satellite.Audit - metainfo := satellite.Metainfo.Service audits.Worker.Loop.Pause() audits.Chore.Loop.Pause() @@ -618,20 +628,28 @@ func TestVerifierModifiedSegment(t *testing.T) { audits.Chore.Loop.TriggerWait() queue := audits.Queues.Fetch() - path, err := queue.Next() + queueSegment, err := queue.Next() require.NoError(t, err) audits.Verifier.OnTestingCheckSegmentAlteredHook = func() { // remove one piece from the segment so that checkIfSegmentAltered fails - pointer, err := metainfo.Get(ctx, metabase.SegmentKey(path)) + segment, err := satellite.Metainfo.Metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{ + StreamID: queueSegment.StreamID, + Position: queueSegment.Position, + }) require.NoError(t, err) - pieceToRemove := pointer.Remote.RemotePieces[0] - _, err = metainfo.UpdatePieces(ctx, metabase.SegmentKey(path), pointer, nil, []*pb.RemotePiece{pieceToRemove}) + + err = satellite.Metainfo.Metabase.UpdateSegmentPieces(ctx, metabase.UpdateSegmentPieces{ + StreamID: queueSegment.StreamID, + Position: queueSegment.Position, + OldPieces: segment.Pieces, + NewPieces: append([]metabase.Piece{segment.Pieces[0]}, segment.Pieces[2:]...), + }) require.NoError(t, err) } // Verify should not return an error, but report should be empty - report, err := audits.Verifier.Verify(ctx, path, nil) + report, err := audits.Verifier.Verify(ctx, queueSegment, nil) require.NoError(t, err) assert.Empty(t, report) }) @@ -655,7 +673,7 @@ func TestVerifierReplacedSegment(t *testing.T) { audits.Chore.Loop.TriggerWait() queue := audits.Queues.Fetch() - path, err := queue.Next() + segment, err := queue.Next() require.NoError(t, err) audits.Verifier.OnTestingCheckSegmentAlteredHook = func() { @@ -665,7 +683,7 @@ func TestVerifierReplacedSegment(t *testing.T) { } // Verify should not return an error, but report should be empty - report, err := audits.Verifier.Verify(ctx, path, nil) + report, err := audits.Verifier.Verify(ctx, segment, nil) require.NoError(t, err) assert.Empty(t, report) }) @@ -689,26 +707,29 @@ func TestVerifierModifiedSegmentFailsOnce(t *testing.T) { audits.Chore.Loop.TriggerWait() queue := audits.Queues.Fetch() - path, err := queue.Next() + queueSegment, err := queue.Next() require.NoError(t, err) - pointer, err := satellite.Metainfo.Service.Get(ctx, metabase.SegmentKey(path)) + segment, err := satellite.Metainfo.Metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{ + StreamID: queueSegment.StreamID, + Position: queueSegment.Position, + }) require.NoError(t, err) // delete the piece from the first node - origNumPieces := len(pointer.GetRemote().GetRemotePieces()) - piece := pointer.GetRemote().GetRemotePieces()[0] - pieceID := pointer.GetRemote().RootPieceId.Derive(piece.NodeId, piece.PieceNum) - node := planet.FindNode(piece.NodeId) + origNumPieces := len(segment.Pieces) + piece := segment.Pieces[0] + pieceID := segment.RootPieceID.Derive(piece.StorageNode, int32(piece.Number)) + node := planet.FindNode(piece.StorageNode) err = node.Storage2.Store.Delete(ctx, satellite.ID(), pieceID) require.NoError(t, err) - report, err := audits.Verifier.Verify(ctx, path, nil) + report, err := audits.Verifier.Verify(ctx, queueSegment, nil) require.NoError(t, err) assert.Len(t, report.Successes, origNumPieces-1) assert.Len(t, report.Fails, 1) - assert.Equal(t, report.Fails[0], piece.NodeId) + assert.Equal(t, report.Fails[0], piece.StorageNode) assert.Len(t, report.Offlines, 0) require.Len(t, report.PendingAudits, 0) }) @@ -747,19 +768,22 @@ func TestVerifierSlowDownload(t *testing.T) { audits.Chore.Loop.TriggerWait() queue := audits.Queues.Fetch() - path, err := queue.Next() + queueSegment, err := queue.Next() require.NoError(t, err) - pointer, err := satellite.Metainfo.Service.Get(ctx, metabase.SegmentKey(path)) + segment, err := satellite.Metainfo.Metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{ + StreamID: queueSegment.StreamID, + Position: queueSegment.Position, + }) require.NoError(t, err) - slowNode := planet.FindNode(pointer.Remote.RemotePieces[0].NodeId) + slowNode := planet.FindNode(segment.Pieces[0].StorageNode) slowNodeDB := slowNode.DB.(*testblobs.SlowDB) // make downloads on storage node slower than the timeout on the satellite for downloading shares delay := 1 * time.Second slowNodeDB.SetLatency(delay) - report, err := audits.Verifier.Verify(ctx, path, nil) + report, err := audits.Verifier.Verify(ctx, queueSegment, nil) require.NoError(t, err) assert.NotContains(t, report.Successes, slowNode.ID()) @@ -797,18 +821,21 @@ func TestVerifierUnknownError(t *testing.T) { audits.Chore.Loop.TriggerWait() queue := audits.Queues.Fetch() - path, err := queue.Next() + queueSegment, err := queue.Next() require.NoError(t, err) - pointer, err := satellite.Metainfo.Service.Get(ctx, metabase.SegmentKey(path)) + segment, err := satellite.Metainfo.Metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{ + StreamID: queueSegment.StreamID, + Position: queueSegment.Position, + }) require.NoError(t, err) - badNode := planet.FindNode(pointer.Remote.RemotePieces[0].NodeId) + badNode := planet.FindNode(segment.Pieces[0].StorageNode) badNodeDB := badNode.DB.(*testblobs.BadDB) // return an error when the verifier attempts to download from this node badNodeDB.SetError(errs.New("unknown error")) - report, err := audits.Verifier.Verify(ctx, path, nil) + report, err := audits.Verifier.Verify(ctx, queueSegment, nil) require.NoError(t, err) require.Len(t, report.Successes, 3) diff --git a/satellite/audit/verifier_unit_test.go b/satellite/audit/verifier_unit_test.go index 9910cd7c9..603539b8f 100644 --- a/satellite/audit/verifier_unit_test.go +++ b/satellite/audit/verifier_unit_test.go @@ -7,16 +7,15 @@ import ( "context" "math/rand" "testing" - "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/vivint/infectious" - "storj.io/common/pb" "storj.io/common/pkcrypto" "storj.io/common/storj" "storj.io/common/testrand" + "storj.io/storj/satellite/metainfo/metabase" ) func TestFailingAudit(t *testing.T) { @@ -130,28 +129,26 @@ func TestCreatePendingAudits(t *testing.T) { contained := make(map[int]storj.NodeID) contained[1] = testNodeID - pointer := &pb.Pointer{ - CreationDate: time.Now(), - Type: pb.Pointer_REMOTE, - Remote: &pb.RemoteSegment{ - RootPieceId: storj.NewPieceID(), - Redundancy: &pb.RedundancyScheme{ - MinReq: 8, - Total: 14, - ErasureShareSize: int32(len(shares[0].Data)), - }, + segment := testSegment("test") + segmentInfo := metabase.Segment{ + StreamID: segment.StreamID, + RootPieceID: testrand.PieceID(), + Redundancy: storj.RedundancyScheme{ + Algorithm: storj.ReedSolomon, + RequiredShares: required, + TotalShares: total, + ShareSize: int32(len(shares[0].Data)), }, } + randomIndex := rand.Int31n(10) - randomIndex := rand.Int63n(10) - - pending, err := createPendingAudits(ctx, contained, shares, pointer, randomIndex, "") + pending, err := createPendingAudits(ctx, contained, shares, segment, segmentInfo, randomIndex) require.NoError(t, err) require.Equal(t, 1, len(pending)) assert.Equal(t, testNodeID, pending[0].NodeID) - assert.Equal(t, pointer.Remote.RootPieceId, pending[0].PieceID) + assert.Equal(t, segmentInfo.RootPieceID, pending[0].PieceID) assert.Equal(t, randomIndex, pending[0].StripeIndex) - assert.Equal(t, pointer.Remote.Redundancy.ErasureShareSize, pending[0].ShareSize) + assert.Equal(t, segmentInfo.Redundancy.ShareSize, pending[0].ShareSize) assert.Equal(t, pkcrypto.SHA256Hash(shares[1].Data), pending[0].ExpectedShareHash) assert.EqualValues(t, 0, pending[0].ReverifyCount) } diff --git a/satellite/audit/worker.go b/satellite/audit/worker.go index 2d9f5ff66..28f3d558b 100644 --- a/satellite/audit/worker.go +++ b/satellite/audit/worker.go @@ -28,7 +28,7 @@ type Config struct { ChoreInterval time.Duration `help:"how often to run the reservoir chore" releaseDefault:"24h" devDefault:"1m"` QueueInterval time.Duration `help:"how often to recheck an empty audit queue" releaseDefault:"1h" devDefault:"1m"` Slots int `help:"number of reservoir slots allotted for nodes, currently capped at 3" default:"3"` - WorkerConcurrency int `help:"number of workers to run audits on paths" default:"2"` + WorkerConcurrency int `help:"number of workers to run audits on segments" default:"2"` } // Worker contains information for populating audit queue and processing audits. @@ -86,7 +86,7 @@ func (worker *Worker) process(ctx context.Context) (err error) { worker.limiter.Wait() for { - path, err := queue.Next() + segment, err := queue.Next() if err != nil { if ErrEmptyQueue.Has(err) { // get a new queue and return if empty; otherwise continue working. @@ -100,27 +100,27 @@ func (worker *Worker) process(ctx context.Context) (err error) { } worker.limiter.Go(ctx, func() { - err := worker.work(ctx, path) + err := worker.work(ctx, segment) if err != nil { - worker.log.Error("audit failed", zap.Binary("Segment", []byte(path)), zap.Error(err)) + worker.log.Error("audit failed", zap.ByteString("Segment", []byte(segment.Encode())), zap.Error(err)) } }) } } -func (worker *Worker) work(ctx context.Context, path storj.Path) (err error) { +func (worker *Worker) work(ctx context.Context, segment Segment) (err error) { defer mon.Task()(&ctx)(&err) var errlist errs.Group // First, attempt to reverify nodes for this segment that are in containment mode. - report, err := worker.verifier.Reverify(ctx, path) + report, err := worker.verifier.Reverify(ctx, segment) if err != nil { errlist.Add(err) } // TODO(moby) we need to decide if we want to do something with nodes that the reporter failed to update - _, err = worker.reporter.RecordAudits(ctx, report, path) + _, err = worker.reporter.RecordAudits(ctx, report) if err != nil { errlist.Add(err) } @@ -144,13 +144,13 @@ func (worker *Worker) work(ctx context.Context, path storj.Path) (err error) { } // Next, audit the the remaining nodes that are not in containment mode. - report, err = worker.verifier.Verify(ctx, path, skip) + report, err = worker.verifier.Verify(ctx, segment, skip) if err != nil { errlist.Add(err) } // TODO(moby) we need to decide if we want to do something with nodes that the reporter failed to update - _, err = worker.reporter.RecordAudits(ctx, report, path) + _, err = worker.reporter.RecordAudits(ctx, report) if err != nil { errlist.Add(err) } diff --git a/satellite/core.go b/satellite/core.go index a2e4be25c..7afdaf5a2 100644 --- a/satellite/core.go +++ b/satellite/core.go @@ -311,7 +311,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, peer.Audit.Queues = audit.NewQueues() peer.Audit.Verifier = audit.NewVerifier(log.Named("audit:verifier"), - peer.Metainfo.Service, + peer.Metainfo.Metabase, peer.Dialer, peer.Overlay.Service, peer.DB.Containment(), diff --git a/satellite/metainfo/config.go b/satellite/metainfo/config.go index 80082f095..0cb547b3e 100644 --- a/satellite/metainfo/config.go +++ b/satellite/metainfo/config.go @@ -202,8 +202,7 @@ type MetabaseDB interface { // BucketEmpty returns true if bucket does not contain objects (pending or committed). // This method doesn't check bucket existence. BucketEmpty(ctx context.Context, opts metabase.BucketEmpty) (empty bool, err error) - // UpdateSegmentPieces updates pieces for specified segment. If provided old pieces - // won't match current database state update will fail. + // UpdateSegmentPieces updates pieces for specified segment. If provided old pieces won't match current database state update will fail. UpdateSegmentPieces(ctx context.Context, opts metabase.UpdateSegmentPieces) (err error) // TestingAllCommittedObjects gets all committed objects from bucket. Use only for testing purposes. @@ -212,7 +211,6 @@ type MetabaseDB interface { TestingAllPendingObjects(ctx context.Context, projectID uuid.UUID, bucketName string) (objects []metabase.ObjectEntry, err error) // TestingAllObjectSegments gets all segments for given object. Use only for testing purposes. TestingAllObjectSegments(ctx context.Context, objectLocation metabase.ObjectLocation) (segments []metabase.Segment, err error) - // TestingAllObjects gets all objects. Use only for testing purposes. TestingAllObjects(ctx context.Context) (segments []metabase.Object, err error) // TestingAllSegments gets all segments. Use only for testing purposes. diff --git a/satellite/metainfo/loop.go b/satellite/metainfo/loop.go index b2fb42ad1..eab06abf1 100644 --- a/satellite/metainfo/loop.go +++ b/satellite/metainfo/loop.go @@ -49,14 +49,14 @@ type Segment struct { RootPieceID storj.PieceID // gc, graceful exit Pieces metabase.Pieces // tally, audit, gc, graceful exit, repair CreationDate time.Time // repair - expirationDate time.Time // tally, repair + ExpirationDate time.Time // tally, repair LastRepaired time.Time // repair Pointer *pb.Pointer // repair } // Expired checks if segment is expired relative to now. func (segment *Segment) Expired(now time.Time) bool { - return !segment.expirationDate.IsZero() && segment.expirationDate.Before(now) + return !segment.ExpirationDate.IsZero() && segment.ExpirationDate.Before(now) } // Observer is an interface defining an observer that can subscribe to the metainfo loop. @@ -485,7 +485,7 @@ func handleSegment(ctx context.Context, observer *observerContext, location meta } if expiresAt != nil { - loopSegment.expirationDate = *expiresAt + loopSegment.ExpirationDate = *expiresAt } loopSegment.StreamID = segment.StreamID diff --git a/satellite/orders/service.go b/satellite/orders/service.go index e96fe70b2..cd8b5d574 100644 --- a/satellite/orders/service.go +++ b/satellite/orders/service.go @@ -137,19 +137,19 @@ func (service *Service) updateBandwidth(ctx context.Context, bucket metabase.Buc return nil } -// CreateGetOrderLimits creates the order limits for downloading the pieces of pointer. -func (service *Service) CreateGetOrderLimits(ctx context.Context, bucket metabase.BucketLocation, pointer *pb.Pointer) (_ []*pb.AddressedOrderLimit, privateKey storj.PiecePrivateKey, err error) { +// CreateGetOrderLimits creates the order limits for downloading the pieces of a segment. +func (service *Service) CreateGetOrderLimits(ctx context.Context, bucket metabase.BucketLocation, segment metabase.Segment) (_ []*pb.AddressedOrderLimit, privateKey storj.PiecePrivateKey, err error) { defer mon.Task()(&ctx)(&err) - redundancy, err := eestream.NewRedundancyStrategyFromProto(pointer.GetRemote().GetRedundancy()) + redundancy, err := eestream.NewRedundancyStrategyFromStorj(segment.Redundancy) if err != nil { return nil, storj.PiecePrivateKey{}, Error.Wrap(err) } - pieceSize := eestream.CalcPieceSize(pointer.GetSegmentSize(), redundancy) + pieceSize := eestream.CalcPieceSize(int64(segment.EncryptedSize), redundancy) - nodeIDs := make([]storj.NodeID, len(pointer.GetRemote().GetRemotePieces())) - for i, piece := range pointer.GetRemote().GetRemotePieces() { - nodeIDs[i] = piece.NodeId + nodeIDs := make([]storj.NodeID, len(segment.Pieces)) + for i, piece := range segment.Pieces { + nodeIDs[i] = piece.StorageNode } nodes, err := service.overlay.GetOnlineNodesForGetDelete(ctx, nodeIDs) @@ -158,17 +158,17 @@ func (service *Service) CreateGetOrderLimits(ctx context.Context, bucket metabas return nil, storj.PiecePrivateKey{}, Error.Wrap(err) } - signer, err := NewSignerGet(service, pointer.GetRemote().RootPieceId, time.Now(), pieceSize, bucket) + signer, err := NewSignerGet(service, segment.RootPieceID, time.Now(), pieceSize, bucket) if err != nil { return nil, storj.PiecePrivateKey{}, Error.Wrap(err) } - neededLimits := pb.NewRedundancySchemeToStorj(pointer.GetRemote().GetRedundancy()).DownloadNodes() + neededLimits := segment.Redundancy.DownloadNodes() - pieces := pointer.GetRemote().GetRemotePieces() + pieces := segment.Pieces for _, pieceIndex := range service.perm(len(pieces)) { piece := pieces[pieceIndex] - node, ok := nodes[piece.NodeId] + node, ok := nodes[piece.StorageNode] if !ok { continue } @@ -179,9 +179,9 @@ func (service *Service) CreateGetOrderLimits(ctx context.Context, bucket metabas } _, err := signer.Sign(ctx, storj.NodeURL{ - ID: piece.NodeId, + ID: piece.StorageNode, Address: address, - }, piece.PieceNum) + }, int32(piece.Number)) if err != nil { return nil, storj.PiecePrivateKey{}, Error.Wrap(err) } @@ -207,7 +207,7 @@ func (service *Service) CreateGetOrderLimits(ctx context.Context, bucket metabas return signer.AddressedLimits, signer.PrivateKey, nil } -// CreateGetOrderLimits2 creates the order limits for downloading the pieces of pointer. +// CreateGetOrderLimits2 creates the order limits for downloading the pieces of a segment. func (service *Service) CreateGetOrderLimits2(ctx context.Context, bucket metabase.BucketLocation, segment metabase.Segment) (_ []*pb.AddressedOrderLimit, privateKey storj.PiecePrivateKey, err error) { defer mon.Task()(&ctx)(&err) @@ -365,17 +365,13 @@ func (service *Service) CreateDeleteOrderLimits(ctx context.Context, bucket meta return signer.AddressedLimits, signer.PrivateKey, nil } -// CreateAuditOrderLimits creates the order limits for auditing the pieces of pointer. -func (service *Service) CreateAuditOrderLimits(ctx context.Context, bucket metabase.BucketLocation, pointer *pb.Pointer, skip map[storj.NodeID]bool) (_ []*pb.AddressedOrderLimit, _ storj.PiecePrivateKey, cachedIPsAndPorts map[storj.NodeID]string, err error) { +// CreateAuditOrderLimits creates the order limits for auditing the pieces of a segment. +func (service *Service) CreateAuditOrderLimits(ctx context.Context, bucket metabase.BucketLocation, segment metabase.Segment, skip map[storj.NodeID]bool) (_ []*pb.AddressedOrderLimit, _ storj.PiecePrivateKey, cachedIPsAndPorts map[storj.NodeID]string, err error) { defer mon.Task()(&ctx)(&err) - redundancy := pointer.GetRemote().GetRedundancy() - shareSize := redundancy.GetErasureShareSize() - totalPieces := redundancy.GetTotal() - - nodeIDs := make([]storj.NodeID, len(pointer.GetRemote().GetRemotePieces())) - for i, piece := range pointer.GetRemote().GetRemotePieces() { - nodeIDs[i] = piece.NodeId + nodeIDs := make([]storj.NodeID, len(segment.Pieces)) + for i, piece := range segment.Pieces { + nodeIDs[i] = piece.StorageNode } nodes, err := service.overlay.GetOnlineNodesForGetDelete(ctx, nodeIDs) @@ -384,43 +380,43 @@ func (service *Service) CreateAuditOrderLimits(ctx context.Context, bucket metab return nil, storj.PiecePrivateKey{}, nil, Error.Wrap(err) } - signer, err := NewSignerAudit(service, pointer.GetRemote().RootPieceId, time.Now(), int64(shareSize), bucket) + signer, err := NewSignerAudit(service, segment.RootPieceID, time.Now(), int64(segment.Redundancy.ShareSize), bucket) if err != nil { return nil, storj.PiecePrivateKey{}, nil, Error.Wrap(err) } cachedIPsAndPorts = make(map[storj.NodeID]string) var nodeErrors errs.Group - var limitsCount int32 - limits := make([]*pb.AddressedOrderLimit, totalPieces) - for _, piece := range pointer.GetRemote().GetRemotePieces() { - if skip[piece.NodeId] { + var limitsCount int16 + limits := make([]*pb.AddressedOrderLimit, segment.Redundancy.TotalShares) + for _, piece := range segment.Pieces { + if skip[piece.StorageNode] { continue } - node, ok := nodes[piece.NodeId] + node, ok := nodes[piece.StorageNode] if !ok { - nodeErrors.Add(errs.New("node %q is not reliable", piece.NodeId)) + nodeErrors.Add(errs.New("node %q is not reliable", piece.StorageNode)) continue } address := node.Address.Address if node.LastIPPort != "" { - cachedIPsAndPorts[piece.NodeId] = node.LastIPPort + cachedIPsAndPorts[piece.StorageNode] = node.LastIPPort } limit, err := signer.Sign(ctx, storj.NodeURL{ - ID: piece.NodeId, + ID: piece.StorageNode, Address: address, - }, piece.PieceNum) + }, int32(piece.Number)) if err != nil { return nil, storj.PiecePrivateKey{}, nil, Error.Wrap(err) } - limits[piece.GetPieceNum()] = limit + limits[piece.Number] = limit limitsCount++ } - if limitsCount < redundancy.GetMinReq() { - err = Error.New("not enough nodes available: got %d, required %d", limitsCount, redundancy.GetMinReq()) + if limitsCount < segment.Redundancy.RequiredShares { + err = Error.New("not enough nodes available: got %d, required %d", limitsCount, segment.Redundancy.RequiredShares) return nil, storj.PiecePrivateKey{}, nil, errs.Combine(err, nodeErrors.Err()) } @@ -435,8 +431,8 @@ func (service *Service) CreateAuditOrderLimits(ctx context.Context, bucket metab return limits, signer.PrivateKey, cachedIPsAndPorts, nil } -// CreateAuditOrderLimit creates an order limit for auditing a single the piece from a pointer. -func (service *Service) CreateAuditOrderLimit(ctx context.Context, bucket metabase.BucketLocation, nodeID storj.NodeID, pieceNum int32, rootPieceID storj.PieceID, shareSize int32) (limit *pb.AddressedOrderLimit, _ storj.PiecePrivateKey, cachedIPAndPort string, err error) { +// CreateAuditOrderLimit creates an order limit for auditing a single the piece from a segment. +func (service *Service) CreateAuditOrderLimit(ctx context.Context, bucket metabase.BucketLocation, nodeID storj.NodeID, pieceNum uint16, rootPieceID storj.PieceID, shareSize int32) (limit *pb.AddressedOrderLimit, _ storj.PiecePrivateKey, cachedIPAndPort string, err error) { // TODO reduce number of params ? defer mon.Task()(&ctx)(&err) @@ -462,7 +458,7 @@ func (service *Service) CreateAuditOrderLimit(ctx context.Context, bucket metaba orderLimit, err := signer.Sign(ctx, storj.NodeURL{ ID: nodeID, Address: node.Address.Address, - }, pieceNum) + }, int32(pieceNum)) if err != nil { return nil, storj.PiecePrivateKey{}, "", Error.Wrap(err) } diff --git a/satellite/orders/service_test.go b/satellite/orders/service_test.go index 3a557759a..4472b688f 100644 --- a/satellite/orders/service_test.go +++ b/satellite/orders/service_test.go @@ -31,13 +31,14 @@ func TestOrderLimitsEncryptedMetadata(t *testing.T) { ) // Setup: Upload an object and create order limits require.NoError(t, uplinkPeer.Upload(ctx, satellitePeer, bucketName, filePath, testrand.Bytes(5*memory.KiB))) + bucket := metabase.BucketLocation{ProjectID: projectID, BucketName: bucketName} - items, _, err := satellitePeer.Metainfo.Service.List(ctx, metabase.SegmentKey{}, "", true, 10, ^uint32(0)) + + segments, err := satellitePeer.Metainfo.Metabase.TestingAllSegments(ctx) require.NoError(t, err) - require.Equal(t, 1, len(items)) - pointer, err := satellitePeer.Metainfo.Service.Get(ctx, metabase.SegmentKey(items[0].Path)) - require.NoError(t, err) - limits, _, err := satellitePeer.Orders.Service.CreateGetOrderLimits(ctx, bucket, pointer) + require.Equal(t, 1, len(segments)) + + limits, _, err := satellitePeer.Orders.Service.CreateGetOrderLimits(ctx, bucket, segments[0]) require.NoError(t, err) require.Equal(t, 2, len(limits)) diff --git a/satellite/overlay/suspension_test.go b/satellite/overlay/suspension_test.go index bba9595be..f5fa2db05 100644 --- a/satellite/overlay/suspension_test.go +++ b/satellite/overlay/suspension_test.go @@ -172,7 +172,7 @@ func TestAuditSuspendExceedGracePeriod(t *testing.T) { Unknown: storj.NodeIDList{unknownNodeID}, } auditService := planet.Satellites[0].Audit - _, err := auditService.Reporter.RecordAudits(ctx, report, "") + _, err := auditService.Reporter.RecordAudits(ctx, report) require.NoError(t, err) // success and offline nodes should not be disqualified @@ -228,7 +228,7 @@ func TestAuditSuspendDQDisabled(t *testing.T) { Unknown: storj.NodeIDList{unknownNodeID}, } auditService := planet.Satellites[0].Audit - _, err := auditService.Reporter.RecordAudits(ctx, report, "") + _, err := auditService.Reporter.RecordAudits(ctx, report) require.NoError(t, err) // successful node should not be suspended or disqualified diff --git a/satellite/repair/repair_test.go b/satellite/repair/repair_test.go index c11ac3667..5d3ec204b 100644 --- a/satellite/repair/repair_test.go +++ b/satellite/repair/repair_test.go @@ -454,15 +454,15 @@ func TestRemoveExpiredSegmentFromQueue(t *testing.T) { satellite.Audit.Chore.Loop.TriggerWait() queue := satellite.Audit.Queues.Fetch() require.EqualValues(t, queue.Size(), 1) - encryptedPath, err := queue.Next() + queueSegment, err := queue.Next() require.NoError(t, err) // replace pointer with one that is already expired pointer := &pb.Pointer{} pointer.ExpirationDate = time.Now().Add(-time.Hour) - err = satellite.Metainfo.Service.UnsynchronizedDelete(ctx, metabase.SegmentKey(encryptedPath)) + err = satellite.Metainfo.Service.UnsynchronizedDelete(ctx, queueSegment.Encode()) require.NoError(t, err) - err = satellite.Metainfo.Service.UnsynchronizedPut(ctx, metabase.SegmentKey(encryptedPath), pointer) + err = satellite.Metainfo.Service.UnsynchronizedPut(ctx, queueSegment.Encode(), pointer) require.NoError(t, err) // Verify that the segment is on the repair queue diff --git a/satellite/satellitedb/containment.go b/satellite/satellitedb/containment.go index 1ecaaa11d..b21022def 100644 --- a/satellite/satellitedb/containment.go +++ b/satellite/satellitedb/containment.go @@ -14,6 +14,7 @@ import ( "storj.io/common/pb" "storj.io/common/storj" "storj.io/storj/satellite/audit" + "storj.io/storj/satellite/metainfo/metabase" "storj.io/storj/satellite/satellitedb/dbx" ) @@ -51,13 +52,13 @@ func (containment *containment) IncrementPending(ctx context.Context, pendingAud VALUES (?, ?, ?, ?, ?, ?, ?)`, ) _, err = tx.Tx.ExecContext(ctx, statement, pendingAudit.NodeID.Bytes(), pendingAudit.PieceID.Bytes(), pendingAudit.StripeIndex, - pendingAudit.ShareSize, pendingAudit.ExpectedShareHash, pendingAudit.ReverifyCount, []byte(pendingAudit.Path)) + pendingAudit.ShareSize, pendingAudit.ExpectedShareHash, pendingAudit.ReverifyCount, []byte(pendingAudit.Segment.Encode())) if err != nil { return err } case nil: if !bytes.Equal(existingAudit.ExpectedShareHash, pendingAudit.ExpectedShareHash) { - containment.db.log.Info("pending audit already exists", zap.String("node id", pendingAudit.NodeID.String()), zap.Binary("segment", []byte(pendingAudit.Path))) + containment.db.log.Info("pending audit already exists", zap.String("node id", pendingAudit.NodeID.String()), zap.ByteString("segment", []byte(pendingAudit.Segment.Encode()))) return nil } statement := containment.db.Rebind( @@ -119,14 +120,19 @@ func convertDBPending(ctx context.Context, info *dbx.PendingAudits) (_ *audit.Pe return nil, audit.ContainError.Wrap(err) } + segment, err := metabase.ParseSegmentKey(info.Path) + if err != nil { + return nil, audit.ContainError.Wrap(err) + } + pending := &audit.PendingAudit{ NodeID: nodeID, PieceID: pieceID, - StripeIndex: info.StripeIndex, + StripeIndex: int32(info.StripeIndex), ShareSize: int32(info.ShareSize), ExpectedShareHash: info.ExpectedShareHash, ReverifyCount: int32(info.ReverifyCount), - Path: string(info.Path), + Segment: segment, } return pending, nil }