satellite/audits: migrate to metabase

Change-Id: I480c941820c5b0bd3af0539d92b548189211acb2
This commit is contained in:
Kaloyan Raev 2020-12-14 14:54:22 +02:00
parent 2381ca2810
commit 9aa61245d0
25 changed files with 711 additions and 620 deletions

View File

@ -10,7 +10,6 @@ import (
"go.uber.org/zap"
"storj.io/common/storj"
"storj.io/common/sync2"
"storj.io/storj/satellite/metainfo"
)
@ -53,30 +52,30 @@ func (chore *Chore) Run(ctx context.Context) (err error) {
return err
}
pathCollector := NewPathCollector(chore.config.Slots, chore.rand)
err = chore.metainfoLoop.Join(ctx, pathCollector)
collector := NewCollector(chore.config.Slots, chore.rand)
err = chore.metainfoLoop.Join(ctx, collector)
if err != nil {
chore.log.Error("error joining metainfoloop", zap.Error(err))
return nil
}
var newQueue []storj.Path
queuePaths := make(map[storj.Path]struct{})
var newQueue []Segment
queueSegments := make(map[Segment]struct{})
// Add reservoir paths to queue in pseudorandom order.
// Add reservoir segments to queue in pseudorandom order.
for i := 0; i < chore.config.Slots; i++ {
for _, res := range pathCollector.Reservoirs {
// Skip reservoir if no path at this index.
if len(res.Paths) <= i {
for _, res := range collector.Reservoirs {
// Skip reservoir if no segment at this index.
if len(res.Segments) <= i {
continue
}
path := res.Paths[i]
if path == "" {
segment := res.Segments[i]
if segment == (Segment{}) {
continue
}
if _, ok := queuePaths[path]; !ok {
newQueue = append(newQueue, path)
queuePaths[path] = struct{}{}
if _, ok := queueSegments[segment]; !ok {
newQueue = append(newQueue, segment)
queueSegments[segment] = struct{}{}
}
}
}

View File

@ -11,20 +11,18 @@ import (
"storj.io/storj/satellite/metainfo"
)
var _ metainfo.Observer = (*PathCollector)(nil)
var _ metainfo.Observer = (*Collector)(nil)
// PathCollector uses the metainfo loop to add paths to node reservoirs.
//
// architecture: Observer
type PathCollector struct {
// Collector uses the metainfo loop to add segments to node reservoirs.
type Collector struct {
Reservoirs map[storj.NodeID]*Reservoir
slotCount int
rand *rand.Rand
}
// NewPathCollector instantiates a path collector.
func NewPathCollector(reservoirSlots int, r *rand.Rand) *PathCollector {
return &PathCollector{
// NewCollector instantiates a segment collector.
func NewCollector(reservoirSlots int, r *rand.Rand) *Collector {
return &Collector{
Reservoirs: make(map[storj.NodeID]*Reservoir),
slotCount: reservoirSlots,
rand: r,
@ -32,24 +30,22 @@ func NewPathCollector(reservoirSlots int, r *rand.Rand) *PathCollector {
}
// RemoteSegment takes a remote segment found in metainfo and creates a reservoir for it if it doesn't exist already.
func (collector *PathCollector) RemoteSegment(ctx context.Context, segment *metainfo.Segment) (err error) {
// TODO change Sample to accept SegmentLocation
key := string(segment.Location.Encode())
func (collector *Collector) RemoteSegment(ctx context.Context, segment *metainfo.Segment) (err error) {
for _, piece := range segment.Pieces {
if _, ok := collector.Reservoirs[piece.StorageNode]; !ok {
collector.Reservoirs[piece.StorageNode] = NewReservoir(collector.slotCount)
}
collector.Reservoirs[piece.StorageNode].Sample(collector.rand, key)
collector.Reservoirs[piece.StorageNode].Sample(collector.rand, NewSegment(segment))
}
return nil
}
// Object returns nil because the audit service does not interact with objects.
func (collector *PathCollector) Object(ctx context.Context, object *metainfo.Object) (err error) {
func (collector *Collector) Object(ctx context.Context, object *metainfo.Object) (err error) {
return nil
}
// InlineSegment returns nil because we're only auditing for storage nodes for now.
func (collector *PathCollector) InlineSegment(ctx context.Context, segment *metainfo.Segment) (err error) {
func (collector *Collector) InlineSegment(ctx context.Context, segment *metainfo.Segment) (err error) {
return nil
}

View File

@ -13,14 +13,13 @@ import (
"github.com/stretchr/testify/require"
"storj.io/common/memory"
"storj.io/common/storj"
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/storj/private/testplanet"
"storj.io/storj/satellite/audit"
)
// TestAuditPathCollector does the following:
// TestAuditCollector does the following:
// - start testplanet with 5 nodes and a reservoir size of 3
// - upload 5 files
// - iterate over all the segments in satellite.Metainfo and store them in allPieces map
@ -30,7 +29,7 @@ import (
// - expect that there is a reservoir for that node on the audit observer
// - that the reservoir size is <= 2 (the maxReservoirSize)
// - that every item in the reservoir is unique
func TestAuditPathCollector(t *testing.T) {
func TestAuditCollector(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 5, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
@ -51,23 +50,23 @@ func TestAuditPathCollector(t *testing.T) {
}
r := rand.New(rand.NewSource(time.Now().Unix()))
observer := audit.NewPathCollector(4, r)
observer := audit.NewCollector(4, r)
err := satellite.Metainfo.Loop.Join(ctx, observer)
require.NoError(t, err)
for _, node := range planet.StorageNodes {
// expect a reservoir for every node
require.NotNil(t, observer.Reservoirs[node.ID()])
require.True(t, len(observer.Reservoirs[node.ID()].Paths) > 1)
require.True(t, len(observer.Reservoirs[node.ID()].Segments) > 1)
// Require that len paths are <= 3 even though the PathCollector was instantiated with 4
// Require that len segments are <= 3 even though the Collector was instantiated with 4
// because the maxReservoirSize is currently 3.
require.True(t, len(observer.Reservoirs[node.ID()].Paths) <= 3)
require.True(t, len(observer.Reservoirs[node.ID()].Segments) <= 3)
repeats := make(map[storj.Path]bool)
for _, path := range observer.Reservoirs[node.ID()].Paths {
assert.False(t, repeats[path], "expected every item in reservoir to be unique")
repeats[path] = true
repeats := make(map[audit.Segment]bool)
for _, segment := range observer.Reservoirs[node.ID()].Segments {
assert.False(t, repeats[segment], "expected every item in reservoir to be unique")
repeats[segment] = true
}
}
})

View File

@ -10,6 +10,7 @@ import (
"storj.io/common/pb"
"storj.io/common/storj"
"storj.io/storj/satellite/metainfo/metabase"
)
var (
@ -27,11 +28,11 @@ var (
type PendingAudit struct {
NodeID storj.NodeID
PieceID storj.PieceID
StripeIndex int64
StripeIndex int32
ShareSize int32
ExpectedShareHash []byte
ReverifyCount int32
Path storj.Path
Segment metabase.SegmentLocation
}
// Containment holds information about pending audits for contained nodes.

View File

@ -21,7 +21,6 @@ import (
"storj.io/storj/satellite/audit"
"storj.io/storj/satellite/metainfo/metabase"
"storj.io/storj/satellite/overlay"
"storj.io/uplink/private/storage/meta"
)
// TestDisqualificationTooManyFailedAudits does the following:
@ -63,7 +62,7 @@ func TestDisqualificationTooManyFailedAudits(t *testing.T) {
// failed audits.
iterations := 1
for ; ; iterations++ {
_, err := satellitePeer.Audit.Reporter.RecordAudits(ctx, report, "")
_, err := satellitePeer.Audit.Reporter.RecordAudits(ctx, report)
require.NoError(t, err)
dossier, err := satellitePeer.Overlay.Service.Get(ctx, nodeID)
@ -124,20 +123,19 @@ func TestDisqualifiedNodesGetNoDownload(t *testing.T) {
bucket := metabase.BucketLocation{ProjectID: uplinkPeer.Projects[0].ID, BucketName: "testbucket"}
items, _, err := satellitePeer.Metainfo.Service.List(ctx, metabase.SegmentKey{}, "", true, 10, meta.All)
segments, err := satellitePeer.Metainfo.Metabase.TestingAllSegments(ctx)
require.NoError(t, err)
require.Equal(t, 1, len(items))
require.Equal(t, 1, len(segments))
pointer, err := satellitePeer.Metainfo.Service.Get(ctx, metabase.SegmentKey(items[0].Path))
require.NoError(t, err)
segment := segments[0]
disqualifiedNode := segment.Pieces[0].StorageNode
disqualifiedNode := pointer.GetRemote().GetRemotePieces()[0].NodeId
err = satellitePeer.DB.OverlayCache().DisqualifyNode(ctx, disqualifiedNode)
require.NoError(t, err)
limits, _, err := satellitePeer.Orders.Service.CreateGetOrderLimits(ctx, bucket, pointer)
limits, _, err := satellitePeer.Orders.Service.CreateGetOrderLimits(ctx, bucket, segment)
require.NoError(t, err)
assert.Len(t, limits, len(pointer.GetRemote().GetRemotePieces())-1)
assert.Len(t, limits, len(segment.Pieces)-1)
for _, orderLimit := range limits {
assert.False(t, isDisqualified(t, ctx, satellitePeer, orderLimit.Limit.StorageNodeId))

View File

@ -58,7 +58,7 @@ func reformVerifierWithMockConnector(t testing.TB, sat *testplanet.Satellite, mo
verifier := audit.NewVerifier(
zaptest.NewLogger(t).Named("a-special-verifier"),
sat.Metainfo.Service,
sat.Metainfo.Metabase,
newDialer,
sat.Overlay.Service,
sat.DB.Containment(),
@ -89,16 +89,16 @@ func TestGetShareDoesNameLookupIfNecessary(t *testing.T) {
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
path, err := queue.Next()
queueSegment, err := queue.Next()
require.NoError(t, err)
pointer, err := testSatellite.Metainfo.Service.Get(ctx, metabase.SegmentKey(path))
require.NoError(t, err)
shareSize := pointer.GetRemote().GetRedundancy().GetErasureShareSize()
segmentLocation, err := metabase.ParseSegmentKey(metabase.SegmentKey(path))
segment, err := testSatellite.Metainfo.Metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
StreamID: queueSegment.StreamID,
Position: queueSegment.Position,
})
require.NoError(t, err)
orderLimits, privateKey, _, err := testSatellite.Orders.Service.CreateAuditOrderLimits(ctx, segmentLocation.Bucket(), pointer, nil)
orderLimits, privateKey, _, err := testSatellite.Orders.Service.CreateAuditOrderLimits(ctx, queueSegment.Bucket(), segment, nil)
require.NoError(t, err)
// find any non-nil limit
@ -116,7 +116,7 @@ func TestGetShareDoesNameLookupIfNecessary(t *testing.T) {
mock := &mockConnector{}
verifier := reformVerifierWithMockConnector(t, testSatellite, mock)
share, err := verifier.GetShare(ctx, limit, privateKey, cachedIPAndPort, 0, shareSize, orderNum)
share, err := verifier.GetShare(ctx, limit, privateKey, cachedIPAndPort, 0, segment.Redundancy.ShareSize, orderNum)
require.NoError(t, err)
require.NoError(t, share.Error)
@ -143,16 +143,16 @@ func TestGetSharePrefers(t *testing.T) {
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
path, err := queue.Next()
queueSegment, err := queue.Next()
require.NoError(t, err)
pointer, err := testSatellite.Metainfo.Service.Get(ctx, metabase.SegmentKey(path))
require.NoError(t, err)
shareSize := pointer.GetRemote().GetRedundancy().GetErasureShareSize()
segmentLocation, err := metabase.ParseSegmentKey(metabase.SegmentKey(path))
segment, err := testSatellite.Metainfo.Metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
StreamID: queueSegment.StreamID,
Position: queueSegment.Position,
})
require.NoError(t, err)
orderLimits, privateKey, _, err := testSatellite.Orders.Service.CreateAuditOrderLimits(ctx, segmentLocation.Bucket(), pointer, nil)
orderLimits, privateKey, _, err := testSatellite.Orders.Service.CreateAuditOrderLimits(ctx, queueSegment.Bucket(), segment, nil)
require.NoError(t, err)
require.GreaterOrEqual(t, len(orderLimits), 1)
@ -179,7 +179,7 @@ func TestGetSharePrefers(t *testing.T) {
}
verifier := reformVerifierWithMockConnector(t, testSatellite, mock)
share, err := verifier.GetShare(ctx, limit, privateKey, cachedIPAndPort, 0, shareSize, orderNum)
share, err := verifier.GetShare(ctx, limit, privateKey, cachedIPAndPort, 0, segment.Redundancy.ShareSize, orderNum)
require.NoError(t, err)
require.NoError(t, share.Error)

View File

@ -10,7 +10,6 @@ import (
"github.com/stretchr/testify/require"
"storj.io/common/memory"
"storj.io/common/storj"
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/storj/private/testplanet"
@ -40,23 +39,23 @@ func TestChoreAndWorkerIntegration(t *testing.T) {
queue := audits.Queues.Fetch()
require.EqualValues(t, 2, queue.Size(), "audit queue")
uniquePaths := make(map[storj.Path]struct{})
uniqueSegments := make(map[audit.Segment]struct{})
var err error
var path storj.Path
var pathCount int
var segment audit.Segment
var segmentCount int
for {
path, err = queue.Next()
segment, err = queue.Next()
if err != nil {
break
}
pathCount++
_, ok := uniquePaths[path]
require.False(t, ok, "expected unique path in chore queue")
segmentCount++
_, ok := uniqueSegments[segment]
require.False(t, ok, "expected unique segment in chore queue")
uniquePaths[path] = struct{}{}
uniqueSegments[segment] = struct{}{}
}
require.True(t, audit.ErrEmptyQueue.Has(err))
require.Equal(t, 2, pathCount)
require.Equal(t, 2, segmentCount)
require.Equal(t, 0, queue.Size())
// Repopulate the queue for the worker.

View File

@ -8,30 +8,28 @@ import (
"sync"
"github.com/zeebo/errs"
"storj.io/common/storj"
)
// ErrEmptyQueue is used to indicate that the queue is empty.
var ErrEmptyQueue = errs.Class("empty audit queue")
// Queue is a list of paths to audit, shared between the reservoir chore and audit workers.
// Queue is a list of segments to audit, shared between the reservoir chore and audit workers.
// It is not safe for concurrent use.
type Queue struct {
queue []storj.Path
queue []Segment
}
// NewQueue creates a new audit queue.
func NewQueue(paths []storj.Path) *Queue {
func NewQueue(segments []Segment) *Queue {
return &Queue{
queue: paths,
queue: segments,
}
}
// Next gets the next item in the queue.
func (q *Queue) Next() (storj.Path, error) {
func (q *Queue) Next() (Segment, error) {
if len(q.queue) == 0 {
return "", ErrEmptyQueue.New("")
return Segment{}, ErrEmptyQueue.New("")
}
next := q.queue[0]
@ -60,7 +58,7 @@ type Queues struct {
// NewQueues creates a new Queues object.
func NewQueues() *Queues {
queues := &Queues{
nextQueue: NewQueue([]storj.Path{}),
nextQueue: NewQueue([]Segment{}),
}
return queues
}
@ -78,7 +76,7 @@ func (queues *Queues) Fetch() *Queue {
if queues.swapQueue != nil {
queues.swapQueue()
} else {
queues.nextQueue = NewQueue([]storj.Path{})
queues.nextQueue = NewQueue([]Segment{})
}
return active
@ -88,7 +86,7 @@ func (queues *Queues) Fetch() *Queue {
// Push adds a pending queue to be swapped in when ready.
// If nextQueue is empty, it immediately replaces the queue. Otherwise it creates a swapQueue callback to be called when nextQueue is fetched.
// Only one call to Push is permitted at a time, otherwise it will return ErrPendingQueueInProgress.
func (queues *Queues) Push(pendingQueue []storj.Path) error {
func (queues *Queues) Push(pendingQueue []Segment) error {
queues.mu.Lock()
defer queues.mu.Unlock()

View File

@ -11,8 +11,9 @@ import (
"golang.org/x/sync/errgroup"
"storj.io/common/errs2"
"storj.io/common/storj"
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/storj/satellite/metainfo/metabase"
)
func TestQueues(t *testing.T) {
@ -25,7 +26,7 @@ func TestQueues(t *testing.T) {
_, err := q.Next()
require.True(t, ErrEmptyQueue.Has(err), "required ErrEmptyQueue error")
testQueue1 := []storj.Path{"a", "b", "c"}
testQueue1 := []Segment{testSegment("a"), testSegment("b"), testSegment("c")}
err = queues.Push(testQueue1)
require.NoError(t, err)
err = queues.WaitForSwap(ctx)
@ -47,14 +48,14 @@ func TestQueuesPush(t *testing.T) {
queues := NewQueues()
// when next queue is empty, WaitForSwap should return immediately
testQueue1 := []storj.Path{"a", "b", "c"}
testQueue1 := []Segment{testSegment("a"), testSegment("b"), testSegment("c")}
err := queues.Push(testQueue1)
require.NoError(t, err)
err = queues.WaitForSwap(ctx)
require.NoError(t, err)
// second call to WaitForSwap should block until Fetch is called the first time
testQueue2 := []storj.Path{"d", "e"}
testQueue2 := []Segment{testSegment("d"), testSegment("e")}
err = queues.Push(testQueue2)
require.NoError(t, err)
var group errgroup.Group
@ -86,14 +87,14 @@ func TestQueuesPushCancel(t *testing.T) {
queues := NewQueues()
// when queue is empty, WaitForSwap should return immediately
testQueue1 := []storj.Path{"a", "b", "c"}
testQueue1 := []Segment{testSegment("a"), testSegment("b"), testSegment("c")}
err := queues.Push(testQueue1)
require.NoError(t, err)
err = queues.WaitForSwap(ctx)
require.NoError(t, err)
ctxWithCancel, cancel := context.WithCancel(ctx)
testQueue2 := []storj.Path{"d", "e"}
testQueue2 := []Segment{testSegment("d"), testSegment("e")}
err = queues.Push(testQueue2)
require.NoError(t, err)
var group errgroup.Group
@ -112,3 +113,14 @@ func TestQueuesPushCancel(t *testing.T) {
err = group.Wait()
require.NoError(t, err)
}
func testSegment(objectKey string) Segment {
return Segment{
SegmentLocation: metabase.SegmentLocation{
ProjectID: testrand.UUID(),
BucketName: "test",
ObjectKey: metabase.ObjectKey(objectKey),
},
StreamID: testrand.UUID(),
}
}

View File

@ -46,7 +46,7 @@ func NewReporter(log *zap.Logger, overlay *overlay.Service, containment Containm
// RecordAudits saves audit results to overlay. When no error, it returns
// nil for both return values, otherwise it returns the report with the fields
// set to the values which have been saved and the error.
func (reporter *Reporter) RecordAudits(ctx context.Context, req Report, path storj.Path) (_ Report, err error) {
func (reporter *Reporter) RecordAudits(ctx context.Context, req Report) (_ Report, err error) {
defer mon.Task()(&ctx)(&err)
successes := req.Successes
@ -241,7 +241,7 @@ func (reporter *Reporter) recordPendingAudits(ctx context.Context, pendingAudits
if len(failed) > 0 {
for _, v := range failed {
reporter.log.Debug("failed to record Pending Nodes ", zap.Stringer("NodeID", v.NodeID), zap.String("Path", v.Path))
reporter.log.Debug("failed to record Pending Nodes ", zap.Stringer("NodeID", v.NodeID), zap.ByteString("Segment Location", []byte(v.Segment.Encode())))
}
return failed, errs.Combine(Error.New("failed to record some pending audits"), errlist.Err())
}

View File

@ -42,7 +42,7 @@ func TestReportPendingAudits(t *testing.T) {
overlay := satellite.Overlay.Service
containment := satellite.DB.Containment()
failed, err := audits.Reporter.RecordAudits(ctx, report, "")
failed, err := audits.Reporter.RecordAudits(ctx, report)
require.NoError(t, err)
assert.Zero(t, failed)
@ -69,7 +69,7 @@ func TestRecordAuditsAtLeastOnce(t *testing.T) {
report := audit.Report{Successes: []storj.NodeID{nodeID}}
// expect RecordAudits to try recording at least once (maxRetries is set to 0)
failed, err := audits.Reporter.RecordAudits(ctx, report, "")
failed, err := audits.Reporter.RecordAudits(ctx, report)
require.NoError(t, err)
require.Zero(t, failed)
@ -107,13 +107,12 @@ func TestRecordAuditsCorrectOutcome(t *testing.T) {
ShareSize: 10,
ExpectedShareHash: []byte{},
ReverifyCount: 0,
Path: "",
},
},
Offlines: []storj.NodeID{offlineNode},
}
failed, err := audits.Reporter.RecordAudits(ctx, report, "")
failed, err := audits.Reporter.RecordAudits(ctx, report)
require.NoError(t, err)
require.Zero(t, failed)
@ -155,7 +154,7 @@ func TestSuspensionTimeNotResetBySuccessiveAudit(t *testing.T) {
suspendedNode := planet.StorageNodes[0].ID()
failed, err := audits.Reporter.RecordAudits(ctx, audit.Report{Unknown: []storj.NodeID{suspendedNode}}, "")
failed, err := audits.Reporter.RecordAudits(ctx, audit.Report{Unknown: []storj.NodeID{suspendedNode}})
require.NoError(t, err)
require.Zero(t, failed)
@ -168,7 +167,7 @@ func TestSuspensionTimeNotResetBySuccessiveAudit(t *testing.T) {
suspendedAt := node.UnknownAuditSuspended
failed, err = audits.Reporter.RecordAudits(ctx, audit.Report{Unknown: []storj.NodeID{suspendedNode}}, "")
failed, err = audits.Reporter.RecordAudits(ctx, audit.Report{Unknown: []storj.NodeID{suspendedNode}})
require.NoError(t, err)
require.Zero(t, failed)
@ -224,7 +223,7 @@ func TestGracefullyExitedNotUpdated(t *testing.T) {
PendingAudits: []*audit.PendingAudit{&pending},
Unknown: storj.NodeIDList{unknownNode.ID()},
}
failed, err := audits.Reporter.RecordAudits(ctx, report, "")
failed, err := audits.Reporter.RecordAudits(ctx, report)
require.NoError(t, err)
assert.Zero(t, failed)
@ -253,7 +252,7 @@ func TestReportOfflineAudits(t *testing.T) {
audits.Worker.Loop.Pause()
cache := satellite.Overlay.DB
_, err := audits.Reporter.RecordAudits(ctx, audit.Report{Offlines: storj.NodeIDList{node.ID()}}, "")
_, err := audits.Reporter.RecordAudits(ctx, audit.Report{Offlines: storj.NodeIDList{node.ID()}})
require.NoError(t, err)
d, err := cache.Get(ctx, node.ID())

View File

@ -5,17 +5,20 @@ package audit
import (
"math/rand"
"time"
"storj.io/common/storj"
"storj.io/common/uuid"
"storj.io/storj/satellite/metainfo"
"storj.io/storj/satellite/metainfo/metabase"
)
const maxReservoirSize = 3
// Reservoir holds a certain number of segments to reflect a random sample.
type Reservoir struct {
Paths [maxReservoirSize]storj.Path
size int8
index int64
Segments [maxReservoirSize]Segment
size int8
index int64
}
// NewReservoir instantiates a Reservoir.
@ -33,14 +36,35 @@ func NewReservoir(size int) *Reservoir {
// Sample makes sure that for every segment in metainfo from index i=size..n-1,
// pick a random number r = rand(0..i), and if r < size, replace reservoir.Segments[r] with segment.
func (reservoir *Reservoir) Sample(r *rand.Rand, path storj.Path) {
func (reservoir *Reservoir) Sample(r *rand.Rand, segment Segment) {
reservoir.index++
if reservoir.index < int64(reservoir.size) {
reservoir.Paths[reservoir.index] = path
reservoir.Segments[reservoir.index] = segment
} else {
random := r.Int63n(reservoir.index)
if random < int64(reservoir.size) {
reservoir.Paths[random] = path
reservoir.Segments[random] = segment
}
}
}
// Segment is a segment to audit.
type Segment struct {
metabase.SegmentLocation
StreamID uuid.UUID
ExpirationDate time.Time
}
// NewSegment creates a new segment to audit from a metainfo loop segment.
func NewSegment(loopSegment *metainfo.Segment) Segment {
return Segment{
SegmentLocation: loopSegment.Location,
StreamID: loopSegment.StreamID,
ExpirationDate: loopSegment.ExpirationDate,
}
}
// Expired checks if segment is expired relative to now.
func (segment *Segment) Expired(now time.Time) bool {
return !segment.ExpirationDate.IsZero() && segment.ExpirationDate.Before(now)
}

View File

@ -13,7 +13,6 @@ import (
"go.uber.org/zap"
"storj.io/common/memory"
"storj.io/common/pb"
"storj.io/common/peertls/tlsopts"
"storj.io/common/pkcrypto"
"storj.io/common/rpc"
@ -25,7 +24,6 @@ import (
"storj.io/storj/satellite"
"storj.io/storj/satellite/audit"
"storj.io/storj/satellite/metainfo/metabase"
"storj.io/storj/storage"
"storj.io/storj/storagenode"
)
@ -55,53 +53,55 @@ func TestReverifySuccess(t *testing.T) {
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
path, err := queue.Next()
queueSegment, err := queue.Next()
require.NoError(t, err)
pointer, err := satellite.Metainfo.Service.Get(ctx, metabase.SegmentKey(path))
segment, err := satellite.Metainfo.Metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
StreamID: queueSegment.StreamID,
Position: queueSegment.Position,
})
require.NoError(t, err)
randomIndex, err := audit.GetRandomStripe(ctx, pointer)
randomIndex, err := audit.GetRandomStripe(ctx, segment)
require.NoError(t, err)
orders := satellite.Orders.Service
containment := satellite.DB.Containment()
bucket := metabase.BucketLocation{ProjectID: ul.Projects[0].ID, BucketName: "testbucket"}
shareSize := pointer.GetRemote().GetRedundancy().GetErasureShareSize()
shareSize := segment.Redundancy.ShareSize
pieces := segment.Pieces
rootPieceID := segment.RootPieceID
pieces := pointer.GetRemote().GetRemotePieces()
rootPieceID := pointer.GetRemote().RootPieceId
limit, privateKey, cachedIPAndPort, err := orders.CreateAuditOrderLimit(ctx, bucket, pieces[0].NodeId, pieces[0].PieceNum, rootPieceID, shareSize)
limit, privateKey, cachedIPAndPort, err := orders.CreateAuditOrderLimit(ctx, queueSegment.Bucket(), pieces[0].StorageNode, pieces[0].Number, rootPieceID, shareSize)
require.NoError(t, err)
share, err := audits.Verifier.GetShare(ctx, limit, privateKey, cachedIPAndPort, randomIndex, shareSize, int(pieces[0].PieceNum))
share, err := audits.Verifier.GetShare(ctx, limit, privateKey, cachedIPAndPort, randomIndex, shareSize, int(pieces[0].Number))
require.NoError(t, err)
pending := &audit.PendingAudit{
NodeID: pieces[0].NodeId,
NodeID: pieces[0].StorageNode,
PieceID: rootPieceID,
StripeIndex: randomIndex,
ShareSize: shareSize,
ExpectedShareHash: pkcrypto.SHA256Hash(share.Data),
ReverifyCount: 0,
Path: path,
Segment: queueSegment.SegmentLocation,
}
err = containment.IncrementPending(ctx, pending)
require.NoError(t, err)
report, err := audits.Verifier.Reverify(ctx, path)
report, err := audits.Verifier.Reverify(ctx, queueSegment)
require.NoError(t, err)
require.Len(t, report.Fails, 0)
require.Len(t, report.Offlines, 0)
require.Len(t, report.PendingAudits, 0)
require.Len(t, report.Successes, 1)
require.Equal(t, report.Successes[0], pieces[0].NodeId)
require.Equal(t, report.Successes[0], pieces[0].StorageNode)
// record audit
_, err = audits.Reporter.RecordAudits(ctx, report, path)
_, err = audits.Reporter.RecordAudits(ctx, report)
require.NoError(t, err)
// make sure that pending audit is removed by the reporter when audit is recorded
@ -135,60 +135,62 @@ func TestReverifyFailMissingShare(t *testing.T) {
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
path, err := queue.Next()
queueSegment, err := queue.Next()
require.NoError(t, err)
pointer, err := satellite.Metainfo.Service.Get(ctx, metabase.SegmentKey(path))
segment, err := satellite.Metainfo.Metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
StreamID: queueSegment.StreamID,
Position: queueSegment.Position,
})
require.NoError(t, err)
randomIndex, err := audit.GetRandomStripe(ctx, pointer)
randomIndex, err := audit.GetRandomStripe(ctx, segment)
require.NoError(t, err)
orders := satellite.Orders.Service
containment := satellite.DB.Containment()
bucket := metabase.BucketLocation{ProjectID: ul.Projects[0].ID, BucketName: "testbucket"}
shareSize := pointer.GetRemote().GetRedundancy().GetErasureShareSize()
shareSize := segment.Redundancy.ShareSize
pieces := segment.Pieces
rootPieceID := segment.RootPieceID
pieces := pointer.GetRemote().GetRemotePieces()
rootPieceID := pointer.GetRemote().RootPieceId
limit, privateKey, cachedIPAndPort, err := orders.CreateAuditOrderLimit(ctx, bucket, pieces[0].NodeId, pieces[0].PieceNum, rootPieceID, shareSize)
limit, privateKey, cachedIPAndPort, err := orders.CreateAuditOrderLimit(ctx, queueSegment.Bucket(), pieces[0].StorageNode, pieces[0].Number, rootPieceID, shareSize)
require.NoError(t, err)
share, err := audits.Verifier.GetShare(ctx, limit, privateKey, cachedIPAndPort, randomIndex, shareSize, int(pieces[0].PieceNum))
share, err := audits.Verifier.GetShare(ctx, limit, privateKey, cachedIPAndPort, randomIndex, shareSize, int(pieces[0].Number))
require.NoError(t, err)
pending := &audit.PendingAudit{
NodeID: pieces[0].NodeId,
NodeID: pieces[0].StorageNode,
PieceID: rootPieceID,
StripeIndex: randomIndex,
ShareSize: shareSize,
ExpectedShareHash: pkcrypto.SHA256Hash(share.Data),
ReverifyCount: 0,
Path: path,
Segment: queueSegment.SegmentLocation,
}
err = containment.IncrementPending(ctx, pending)
require.NoError(t, err)
// delete the piece from the first node
piece := pointer.GetRemote().GetRemotePieces()[0]
pieceID := pointer.GetRemote().RootPieceId.Derive(piece.NodeId, piece.PieceNum)
node := planet.FindNode(piece.NodeId)
piece := pieces[0]
pieceID := rootPieceID.Derive(piece.StorageNode, int32(piece.Number))
node := planet.FindNode(piece.StorageNode)
err = node.Storage2.Store.Delete(ctx, satellite.ID(), pieceID)
require.NoError(t, err)
report, err := audits.Verifier.Reverify(ctx, path)
report, err := audits.Verifier.Reverify(ctx, queueSegment)
require.NoError(t, err)
require.Len(t, report.Successes, 0)
require.Len(t, report.Offlines, 0)
require.Len(t, report.PendingAudits, 0)
require.Len(t, report.Fails, 1)
require.Equal(t, report.Fails[0], pieces[0].NodeId)
require.Equal(t, report.Fails[0], pieces[0].StorageNode)
// record audit
_, err = audits.Reporter.RecordAudits(ctx, report, path)
_, err = audits.Reporter.RecordAudits(ctx, report)
require.NoError(t, err)
// make sure that pending audit is removed by the reporter when audit is recorded
@ -222,34 +224,37 @@ func TestReverifyFailBadData(t *testing.T) {
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
path, err := queue.Next()
queueSegment, err := queue.Next()
require.NoError(t, err)
pointer, err := satellite.Metainfo.Service.Get(ctx, metabase.SegmentKey(path))
segment, err := satellite.Metainfo.Metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
StreamID: queueSegment.StreamID,
Position: queueSegment.Position,
})
require.NoError(t, err)
randomIndex, err := audit.GetRandomStripe(ctx, pointer)
randomIndex, err := audit.GetRandomStripe(ctx, segment)
require.NoError(t, err)
pieces := pointer.GetRemote().GetRemotePieces()
rootPieceID := pointer.GetRemote().RootPieceId
redundancy := pointer.GetRemote().GetRedundancy()
pieces := segment.Pieces
rootPieceID := segment.RootPieceID
redundancy := segment.Redundancy
pending := &audit.PendingAudit{
NodeID: pieces[0].NodeId,
NodeID: pieces[0].StorageNode,
PieceID: rootPieceID,
StripeIndex: randomIndex,
ShareSize: redundancy.ErasureShareSize,
ShareSize: redundancy.ShareSize,
ExpectedShareHash: pkcrypto.SHA256Hash(nil),
ReverifyCount: 0,
Path: path,
Segment: queueSegment.SegmentLocation,
}
err = satellite.DB.Containment().IncrementPending(ctx, pending)
require.NoError(t, err)
nodeID := pieces[0].NodeId
report, err := audits.Verifier.Reverify(ctx, path)
nodeID := pieces[0].StorageNode
report, err := audits.Verifier.Reverify(ctx, queueSegment)
require.NoError(t, err)
require.Len(t, report.Successes, 0)
@ -259,7 +264,7 @@ func TestReverifyFailBadData(t *testing.T) {
require.Equal(t, report.Fails[0], nodeID)
// record audit
_, err = audits.Reporter.RecordAudits(ctx, report, path)
_, err = audits.Reporter.RecordAudits(ctx, report)
require.NoError(t, err)
// make sure that pending audit is removed by the reporter when audit is recorded
@ -294,43 +299,46 @@ func TestReverifyOffline(t *testing.T) {
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
path, err := queue.Next()
queueSegment, err := queue.Next()
require.NoError(t, err)
pointer, err := satellite.Metainfo.Service.Get(ctx, metabase.SegmentKey(path))
segment, err := satellite.Metainfo.Metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
StreamID: queueSegment.StreamID,
Position: queueSegment.Position,
})
require.NoError(t, err)
randomIndex, err := audit.GetRandomStripe(ctx, pointer)
randomIndex, err := audit.GetRandomStripe(ctx, segment)
require.NoError(t, err)
pieces := pointer.GetRemote().GetRemotePieces()
rootPieceID := pointer.GetRemote().RootPieceId
redundancy := pointer.GetRemote().GetRedundancy()
pieces := segment.Pieces
rootPieceID := segment.RootPieceID
redundancy := segment.Redundancy
pending := &audit.PendingAudit{
NodeID: pieces[0].NodeId,
NodeID: pieces[0].StorageNode,
PieceID: rootPieceID,
StripeIndex: randomIndex,
ShareSize: redundancy.ErasureShareSize,
ShareSize: redundancy.ShareSize,
ExpectedShareHash: pkcrypto.SHA256Hash(testrand.Bytes(10)),
ReverifyCount: 0,
Path: path,
Segment: queueSegment.SegmentLocation,
}
err = satellite.DB.Containment().IncrementPending(ctx, pending)
require.NoError(t, err)
err = planet.StopNodeAndUpdate(ctx, planet.FindNode(pieces[0].NodeId))
err = planet.StopNodeAndUpdate(ctx, planet.FindNode(pieces[0].StorageNode))
require.NoError(t, err)
report, err := audits.Verifier.Reverify(ctx, path)
report, err := audits.Verifier.Reverify(ctx, queueSegment)
require.NoError(t, err)
require.Len(t, report.Successes, 0)
require.Len(t, report.Fails, 0)
require.Len(t, report.PendingAudits, 0)
require.Len(t, report.Offlines, 1)
require.Equal(t, report.Offlines[0], pieces[0].NodeId)
require.Equal(t, report.Offlines[0], pieces[0].StorageNode)
// make sure that pending audit is not removed
containment := satellite.DB.Containment()
@ -364,13 +372,16 @@ func TestReverifyOfflineDialTimeout(t *testing.T) {
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
path, err := queue.Next()
queueSegment, err := queue.Next()
require.NoError(t, err)
pointer, err := satellite.Metainfo.Service.Get(ctx, metabase.SegmentKey(path))
segment, err := satellite.Metainfo.Metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
StreamID: queueSegment.StreamID,
Position: queueSegment.Position,
})
require.NoError(t, err)
randomIndex, err := audit.GetRandomStripe(ctx, pointer)
randomIndex, err := audit.GetRandomStripe(ctx, segment)
require.NoError(t, err)
tlsOptions, err := tlsopts.NewOptions(satellite.Identity, tlsopts.Config{}, nil)
@ -391,7 +402,7 @@ func TestReverifyOfflineDialTimeout(t *testing.T) {
verifier := audit.NewVerifier(
satellite.Log.Named("verifier"),
satellite.Metainfo.Service,
satellite.Metainfo.Metabase,
dialer,
satellite.Overlay.Service,
satellite.DB.Containment(),
@ -400,25 +411,24 @@ func TestReverifyOfflineDialTimeout(t *testing.T) {
minBytesPerSecond,
5*time.Second)
pieces := pointer.GetRemote().GetRemotePieces()
rootPieceID := pointer.GetRemote().RootPieceId
redundancy := pointer.GetRemote().GetRedundancy()
pieces := segment.Pieces
rootPieceID := segment.RootPieceID
redundancy := segment.Redundancy
pending := &audit.PendingAudit{
NodeID: pieces[0].NodeId,
NodeID: pieces[0].StorageNode,
PieceID: rootPieceID,
StripeIndex: randomIndex,
ShareSize: redundancy.ErasureShareSize,
ShareSize: redundancy.ShareSize,
ExpectedShareHash: pkcrypto.SHA256Hash(nil),
ReverifyCount: 0,
Path: path,
Segment: queueSegment.SegmentLocation,
}
err = satellite.DB.Containment().IncrementPending(ctx, pending)
require.NoError(t, err)
report, err := verifier.Reverify(ctx, path)
report, err := verifier.Reverify(ctx, queueSegment)
require.NoError(t, err)
require.Len(t, report.Successes, 0)
@ -442,7 +452,7 @@ func TestReverifyDeletedSegment(t *testing.T) {
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
// - uploads random data to all nodes
// - gets a path from the audit queue
// - gets a segment from the audit queue
// - creates one pending audit for a node holding a piece for that segment
// - deletes the file
// - calls reverify on the deleted file
@ -463,24 +473,27 @@ func TestReverifyDeletedSegment(t *testing.T) {
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
path, err := queue.Next()
queueSegment, err := queue.Next()
require.NoError(t, err)
pointer, err := satellite.Metainfo.Service.Get(ctx, metabase.SegmentKey(path))
segment, err := satellite.Metainfo.Metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
StreamID: queueSegment.StreamID,
Position: queueSegment.Position,
})
require.NoError(t, err)
randomIndex, err := audit.GetRandomStripe(ctx, pointer)
randomIndex, err := audit.GetRandomStripe(ctx, segment)
require.NoError(t, err)
nodeID := pointer.GetRemote().GetRemotePieces()[0].NodeId
nodeID := segment.Pieces[0].StorageNode
pending := &audit.PendingAudit{
NodeID: nodeID,
PieceID: pointer.GetRemote().RootPieceId,
PieceID: segment.RootPieceID,
StripeIndex: randomIndex,
ShareSize: pointer.GetRemote().GetRedundancy().GetErasureShareSize(),
ShareSize: segment.Redundancy.ShareSize,
ExpectedShareHash: pkcrypto.SHA256Hash(nil),
ReverifyCount: 0,
Path: path,
Segment: queueSegment.SegmentLocation,
}
containment := satellite.DB.Containment()
@ -493,7 +506,7 @@ func TestReverifyDeletedSegment(t *testing.T) {
// call reverify on the deleted file and expect no error
// but expect that the node is still in containment
report, err := audits.Verifier.Reverify(ctx, path)
report, err := audits.Verifier.Reverify(ctx, queueSegment)
require.NoError(t, err)
assert.Empty(t, report)
@ -507,11 +520,11 @@ func TestReverifyDeletedSegment(t *testing.T) {
audits.Chore.Loop.TriggerWait()
queue = audits.Queues.Fetch()
path, err = queue.Next()
queueSegment, err = queue.Next()
require.NoError(t, err)
// reverify the new path
report, err = audits.Verifier.Reverify(ctx, path)
// reverify the new segment
report, err = audits.Verifier.Reverify(ctx, queueSegment)
require.NoError(t, err)
assert.Empty(t, report.Fails)
assert.Empty(t, report.Successes)
@ -538,7 +551,6 @@ func TestReverifyModifiedSegment(t *testing.T) {
satellite := planet.Satellites[0]
audits := satellite.Audit
metainfo := satellite.Metainfo.Service
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
@ -550,24 +562,27 @@ func TestReverifyModifiedSegment(t *testing.T) {
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
pendingPath, err := queue.Next()
queueSegment, err := queue.Next()
require.NoError(t, err)
pointer, err := satellite.Metainfo.Service.Get(ctx, metabase.SegmentKey(pendingPath))
segment, err := satellite.Metainfo.Metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
StreamID: queueSegment.StreamID,
Position: queueSegment.Position,
})
require.NoError(t, err)
randomIndex, err := audit.GetRandomStripe(ctx, pointer)
randomIndex, err := audit.GetRandomStripe(ctx, segment)
require.NoError(t, err)
nodeID := pointer.GetRemote().GetRemotePieces()[0].NodeId
nodeID := segment.Pieces[0].StorageNode
pending := &audit.PendingAudit{
NodeID: nodeID,
PieceID: pointer.GetRemote().RootPieceId,
PieceID: segment.RootPieceID,
StripeIndex: randomIndex,
ShareSize: pointer.GetRemote().GetRedundancy().GetErasureShareSize(),
ShareSize: segment.Redundancy.ShareSize,
ExpectedShareHash: pkcrypto.SHA256Hash(nil),
ReverifyCount: 0,
Path: pendingPath,
Segment: queueSegment.SegmentLocation,
}
containment := satellite.DB.Containment()
@ -577,8 +592,12 @@ func TestReverifyModifiedSegment(t *testing.T) {
// remove a piece from the file (a piece that the contained node isn't holding)
audits.Verifier.OnTestingCheckSegmentAlteredHook = func() {
pieceToRemove := pointer.Remote.RemotePieces[1]
_, err = metainfo.UpdatePieces(ctx, metabase.SegmentKey(pendingPath), pointer, nil, []*pb.RemotePiece{pieceToRemove})
err = satellite.Metainfo.Metabase.UpdateSegmentPieces(ctx, metabase.UpdateSegmentPieces{
StreamID: queueSegment.StreamID,
Position: queueSegment.Position,
OldPieces: segment.Pieces,
NewPieces: append([]metabase.Piece{segment.Pieces[0]}, segment.Pieces[2:]...),
})
require.NoError(t, err)
}
@ -587,20 +606,20 @@ func TestReverifyModifiedSegment(t *testing.T) {
err = ul.Upload(ctx, satellite, "testbucket", "test/path2", testData2)
require.NoError(t, err)
// select the encrypted path that was not used for the pending audit
// select the segment that was not used for the pending audit
audits.Chore.Loop.TriggerWait()
queue = audits.Queues.Fetch()
path1, err := queue.Next()
queueSegment1, err := queue.Next()
require.NoError(t, err)
path2, err := queue.Next()
queueSegment2, err := queue.Next()
require.NoError(t, err)
reverifyPath := path1
if path1 == pendingPath {
reverifyPath = path2
reverifySegment := queueSegment1
if queueSegment1 == queueSegment {
reverifySegment = queueSegment2
}
// reverify the path that was not modified
report, err := audits.Verifier.Reverify(ctx, reverifyPath)
// reverify the segment that was not modified
report, err := audits.Verifier.Reverify(ctx, reverifySegment)
require.NoError(t, err)
assert.Empty(t, report.Fails)
assert.Empty(t, report.Successes)
@ -638,24 +657,27 @@ func TestReverifyReplacedSegment(t *testing.T) {
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
pendingPath, err := queue.Next()
queueSegment, err := queue.Next()
require.NoError(t, err)
pointer, err := satellite.Metainfo.Service.Get(ctx, metabase.SegmentKey(pendingPath))
segment, err := satellite.Metainfo.Metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
StreamID: queueSegment.StreamID,
Position: queueSegment.Position,
})
require.NoError(t, err)
randomIndex, err := audit.GetRandomStripe(ctx, pointer)
randomIndex, err := audit.GetRandomStripe(ctx, segment)
require.NoError(t, err)
nodeID := pointer.GetRemote().GetRemotePieces()[0].NodeId
nodeID := segment.Pieces[0].StorageNode
pending := &audit.PendingAudit{
NodeID: nodeID,
PieceID: pointer.GetRemote().RootPieceId,
PieceID: segment.RootPieceID,
StripeIndex: randomIndex,
ShareSize: pointer.GetRemote().GetRedundancy().GetErasureShareSize(),
ShareSize: segment.Redundancy.ShareSize,
ExpectedShareHash: pkcrypto.SHA256Hash(nil),
ReverifyCount: 0,
Path: pendingPath,
Segment: queueSegment.SegmentLocation,
}
containment := satellite.DB.Containment()
@ -672,20 +694,20 @@ func TestReverifyReplacedSegment(t *testing.T) {
err = ul.Upload(ctx, satellite, "testbucket", "test/path2", testData2)
require.NoError(t, err)
// select the encrypted path that was not used for the pending audit
// select the segment that was not used for the pending audit
audits.Chore.Loop.TriggerWait()
queue = audits.Queues.Fetch()
path1, err := queue.Next()
queueSegment1, err := queue.Next()
require.NoError(t, err)
path2, err := queue.Next()
queueSegment2, err := queue.Next()
require.NoError(t, err)
reverifyPath := path1
if path1 == pendingPath {
reverifyPath = path2
reverifySegment := queueSegment1
if queueSegment1 == queueSegment {
reverifySegment = queueSegment2
}
// reverify the path that was not modified
report, err := audits.Verifier.Reverify(ctx, reverifyPath)
// reverify the segment that was not modified
report, err := audits.Verifier.Reverify(ctx, reverifySegment)
require.NoError(t, err)
assert.Empty(t, report.Fails)
assert.Empty(t, report.Successes)
@ -732,46 +754,52 @@ func TestReverifyDifferentShare(t *testing.T) {
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
path1, err := queue.Next()
queueSegment1, err := queue.Next()
require.NoError(t, err)
path2, err := queue.Next()
queueSegment2, err := queue.Next()
require.NoError(t, err)
require.NotEqual(t, path1, path2)
require.NotEqual(t, queueSegment1, queueSegment2)
pointer1, err := satellite.Metainfo.Service.Get(ctx, metabase.SegmentKey(path1))
segment1, err := satellite.Metainfo.Metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
StreamID: queueSegment1.StreamID,
Position: queueSegment1.Position,
})
require.NoError(t, err)
pointer2, err := satellite.Metainfo.Service.Get(ctx, metabase.SegmentKey(path2))
segment2, err := satellite.Metainfo.Metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
StreamID: queueSegment2.StreamID,
Position: queueSegment2.Position,
})
require.NoError(t, err)
// find a node that contains a piece for both files
// save that node ID and the piece number associated with it for pointer1
// save that node ID and the piece number associated with it for segment1
var selectedNode storj.NodeID
var selectedPieceNum int32
p1Nodes := make(map[storj.NodeID]int32)
for _, piece := range pointer1.GetRemote().GetRemotePieces() {
p1Nodes[piece.NodeId] = piece.PieceNum
var selectedPieceNum uint16
p1Nodes := make(map[storj.NodeID]uint16)
for _, piece := range segment1.Pieces {
p1Nodes[piece.StorageNode] = piece.Number
}
for _, piece := range pointer2.GetRemote().GetRemotePieces() {
pieceNum, ok := p1Nodes[piece.NodeId]
for _, piece := range segment2.Pieces {
pieceNum, ok := p1Nodes[piece.StorageNode]
if ok {
selectedNode = piece.NodeId
selectedNode = piece.StorageNode
selectedPieceNum = pieceNum
break
}
}
require.NotEqual(t, selectedNode, storj.NodeID{})
randomIndex, err := audit.GetRandomStripe(ctx, pointer1)
randomIndex, err := audit.GetRandomStripe(ctx, segment1)
require.NoError(t, err)
orders := satellite.Orders.Service
containment := satellite.DB.Containment()
bucket := metabase.BucketLocation{ProjectID: ul.Projects[0].ID, BucketName: "testbucket"}
shareSize := pointer1.GetRemote().GetRedundancy().GetErasureShareSize()
shareSize := segment1.Redundancy.ShareSize
rootPieceID := segment1.RootPieceID
rootPieceID := pointer1.GetRemote().RootPieceId
limit, privateKey, cachedIPAndPort, err := orders.CreateAuditOrderLimit(ctx, bucket, selectedNode, selectedPieceNum, rootPieceID, shareSize)
limit, privateKey, cachedIPAndPort, err := orders.CreateAuditOrderLimit(ctx, queueSegment1.Bucket(), selectedNode, selectedPieceNum, rootPieceID, shareSize)
require.NoError(t, err)
share, err := audits.Verifier.GetShare(ctx, limit, privateKey, cachedIPAndPort, randomIndex, shareSize, int(selectedPieceNum))
@ -784,21 +812,21 @@ func TestReverifyDifferentShare(t *testing.T) {
ShareSize: shareSize,
ExpectedShareHash: pkcrypto.SHA256Hash(share.Data),
ReverifyCount: 0,
Path: path1,
Segment: queueSegment1.SegmentLocation,
}
err = containment.IncrementPending(ctx, pending)
require.NoError(t, err)
// delete the piece for pointer1 from the selected node
pieceID := pointer1.GetRemote().RootPieceId.Derive(selectedNode, selectedPieceNum)
// delete the piece for segment1 from the selected node
pieceID := segment1.RootPieceID.Derive(selectedNode, int32(selectedPieceNum))
node := planet.FindNode(selectedNode)
err = node.Storage2.Store.Delete(ctx, satellite.ID(), pieceID)
require.NoError(t, err)
// reverify with path 2. Since the selected node was put in containment for path1,
// it should be audited for path1 and fail
report, err := audits.Verifier.Reverify(ctx, path2)
// reverify with segment2. Since the selected node was put in containment for segment1,
// it should be audited for segment1 and fail
report, err := audits.Verifier.Reverify(ctx, queueSegment2)
require.NoError(t, err)
require.Len(t, report.Successes, 0)
@ -807,8 +835,8 @@ func TestReverifyDifferentShare(t *testing.T) {
require.Len(t, report.Fails, 1)
require.Equal(t, report.Fails[0], selectedNode)
// record audit
_, err = audits.Reporter.RecordAudits(ctx, report, path2)
// record audit
_, err = audits.Reporter.RecordAudits(ctx, report)
require.NoError(t, err)
// make sure that pending audit is removed by the reporter when audit is recorded
@ -831,30 +859,21 @@ func TestReverifyExpired1(t *testing.T) {
ul := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)
err := ul.Upload(ctx, satellite, "testbucket", "test/path", testData)
err := ul.UploadWithExpiration(ctx, satellite, "testbucket", "test/path", testData, time.Now().Add(1*time.Hour))
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
path, err := queue.Next()
queueSegment, err := queue.Next()
require.NoError(t, err)
// set pointer's expiration date to be already expired
pointer, err := satellite.Metainfo.Service.Get(ctx, metabase.SegmentKey(path))
require.NoError(t, err)
oldPointerBytes, err := pb.Marshal(pointer)
require.NoError(t, err)
newPointer := &pb.Pointer{}
err = pb.Unmarshal(oldPointerBytes, newPointer)
require.NoError(t, err)
newPointer.ExpirationDate = time.Now().Add(-1 * time.Hour)
newPointerBytes, err := pb.Marshal(newPointer)
require.NoError(t, err)
err = satellite.Metainfo.Database.CompareAndSwap(ctx, storage.Key(path), oldPointerBytes, newPointerBytes)
require.NoError(t, err)
// move time into the future so the segment is expired
audits.Verifier.SetNow(func() time.Time {
return time.Now().Add(2 * time.Hour)
})
// Reverify should not return an error
report, err := audits.Verifier.Reverify(ctx, path)
report, err := audits.Verifier.Reverify(ctx, queueSegment)
require.NoError(t, err)
assert.Len(t, report.Successes, 0)
@ -884,7 +903,7 @@ func TestReverifyExpired2(t *testing.T) {
testData1 := testrand.Bytes(8 * memory.KiB)
testData2 := testrand.Bytes(8 * memory.KiB)
err := ul.Upload(ctx, satellite, "testbucket", "test/path1", testData1)
err := ul.UploadWithExpiration(ctx, satellite, "testbucket", "test/path1", testData1, time.Now().Add(1*time.Hour))
require.NoError(t, err)
err = ul.Upload(ctx, satellite, "testbucket", "test/path2", testData2)
@ -892,46 +911,57 @@ func TestReverifyExpired2(t *testing.T) {
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
path1, err := queue.Next()
queueSegment1, err := queue.Next()
require.NoError(t, err)
path2, err := queue.Next()
queueSegment2, err := queue.Next()
require.NoError(t, err)
require.NotEqual(t, path1, path2)
require.NotEqual(t, queueSegment1, queueSegment2)
pointer1, err := satellite.Metainfo.Service.Get(ctx, metabase.SegmentKey(path1))
// make sure queueSegment1 is the one with the expiration date
if queueSegment1.ExpirationDate.IsZero() {
queueSegment1, queueSegment2 = queueSegment2, queueSegment1
}
segment1, err := satellite.Metainfo.Metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
StreamID: queueSegment1.StreamID,
Position: queueSegment1.Position,
})
require.NoError(t, err)
pointer2, err := satellite.Metainfo.Service.Get(ctx, metabase.SegmentKey(path2))
segment2, err := satellite.Metainfo.Metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
StreamID: queueSegment2.StreamID,
Position: queueSegment2.Position,
})
require.NoError(t, err)
// find a node that contains a piece for both files
// save that node ID and the piece number associated with it for pointer1
// save that node ID and the piece number associated with it for segment1
var selectedNode storj.NodeID
var selectedPieceNum int32
p1Nodes := make(map[storj.NodeID]int32)
for _, piece := range pointer1.GetRemote().GetRemotePieces() {
p1Nodes[piece.NodeId] = piece.PieceNum
var selectedPieceNum uint16
p1Nodes := make(map[storj.NodeID]uint16)
for _, piece := range segment1.Pieces {
p1Nodes[piece.StorageNode] = piece.Number
}
for _, piece := range pointer2.GetRemote().GetRemotePieces() {
pieceNum, ok := p1Nodes[piece.NodeId]
for _, piece := range segment2.Pieces {
pieceNum, ok := p1Nodes[piece.StorageNode]
if ok {
selectedNode = piece.NodeId
selectedNode = piece.StorageNode
selectedPieceNum = pieceNum
break
}
}
require.NotEqual(t, selectedNode, storj.NodeID{})
randomIndex, err := audit.GetRandomStripe(ctx, pointer1)
randomIndex, err := audit.GetRandomStripe(ctx, segment1)
require.NoError(t, err)
orders := satellite.Orders.Service
containment := satellite.DB.Containment()
bucket := metabase.BucketLocation{ProjectID: ul.Projects[0].ID, BucketName: "testbucket"}
shareSize := pointer1.GetRemote().GetRedundancy().GetErasureShareSize()
shareSize := segment1.Redundancy.ShareSize
rootPieceID := segment1.RootPieceID
rootPieceID := pointer1.GetRemote().RootPieceId
limit, privateKey, cachedIPAndPort, err := orders.CreateAuditOrderLimit(ctx, bucket, selectedNode, selectedPieceNum, rootPieceID, shareSize)
limit, privateKey, cachedIPAndPort, err := orders.CreateAuditOrderLimit(ctx, queueSegment1.Bucket(), selectedNode, selectedPieceNum, rootPieceID, shareSize)
require.NoError(t, err)
share, err := audits.Verifier.GetShare(ctx, limit, privateKey, cachedIPAndPort, randomIndex, shareSize, int(selectedPieceNum))
@ -944,29 +974,22 @@ func TestReverifyExpired2(t *testing.T) {
ShareSize: shareSize,
ExpectedShareHash: pkcrypto.SHA256Hash(share.Data),
ReverifyCount: 0,
Path: path1,
Segment: queueSegment1.SegmentLocation,
}
err = containment.IncrementPending(ctx, pending)
require.NoError(t, err)
// update pointer1 to be expired
oldPointerBytes, err := pb.Marshal(pointer1)
require.NoError(t, err)
newPointer := &pb.Pointer{}
err = pb.Unmarshal(oldPointerBytes, newPointer)
require.NoError(t, err)
newPointer.ExpirationDate = time.Now().Add(-1 * time.Hour)
newPointerBytes, err := pb.Marshal(newPointer)
require.NoError(t, err)
err = satellite.Metainfo.Database.CompareAndSwap(ctx, storage.Key(path1), oldPointerBytes, newPointerBytes)
require.NoError(t, err)
// move time into the future so segment1 is expired
audits.Verifier.SetNow(func() time.Time {
return time.Now().Add(2 * time.Hour)
})
// reverify with path 2. Since the selected node was put in containment for path1,
// it should be audited for path1
// since path1 has expired, we expect no failure and we expect that the pointer has been deleted
// reverify with segment2. Since the selected node was put in containment for segment1,
// it should be audited for segment1
// since segment1 has expired, we expect no failure and we expect that the segment has been deleted
// and that the selected node has been removed from containment mode
report, err := audits.Verifier.Reverify(ctx, path2)
report, err := audits.Verifier.Reverify(ctx, queueSegment2)
require.NoError(t, err)
require.Len(t, report.Successes, 0)
@ -976,7 +999,7 @@ func TestReverifyExpired2(t *testing.T) {
// Reverify should remove the node from containment mode
_, err = containment.Get(ctx, pending.NodeID)
require.Error(t, err)
require.True(t, audit.ErrContainedNotFound.Has(err))
})
}
@ -1012,40 +1035,41 @@ func TestReverifySlowDownload(t *testing.T) {
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
path, err := queue.Next()
queueSegment, err := queue.Next()
require.NoError(t, err)
pointer, err := satellite.Metainfo.Service.Get(ctx, metabase.SegmentKey(path))
segment, err := satellite.Metainfo.Metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
StreamID: queueSegment.StreamID,
Position: queueSegment.Position,
})
require.NoError(t, err)
slowPiece := pointer.Remote.RemotePieces[0]
slowNode := slowPiece.NodeId
slowPiece := segment.Pieces[0]
slowNode := slowPiece.StorageNode
randomIndex, err := audit.GetRandomStripe(ctx, pointer)
randomIndex, err := audit.GetRandomStripe(ctx, segment)
require.NoError(t, err)
orders := satellite.Orders.Service
containment := satellite.DB.Containment()
bucket := metabase.BucketLocation{ProjectID: ul.Projects[0].ID, BucketName: "testbucket"}
shareSize := pointer.GetRemote().GetRedundancy().GetErasureShareSize()
shareSize := segment.Redundancy.ShareSize
rootPieceID := segment.RootPieceID
pieces := pointer.GetRemote().GetRemotePieces()
rootPieceID := pointer.GetRemote().RootPieceId
limit, privateKey, cachedIPAndPort, err := orders.CreateAuditOrderLimit(ctx, bucket, slowNode, slowPiece.PieceNum, rootPieceID, shareSize)
limit, privateKey, cachedIPAndPort, err := orders.CreateAuditOrderLimit(ctx, queueSegment.Bucket(), slowNode, slowPiece.Number, rootPieceID, shareSize)
require.NoError(t, err)
share, err := audits.Verifier.GetShare(ctx, limit, privateKey, cachedIPAndPort, randomIndex, shareSize, int(pieces[0].PieceNum))
share, err := audits.Verifier.GetShare(ctx, limit, privateKey, cachedIPAndPort, randomIndex, shareSize, int(slowPiece.Number))
require.NoError(t, err)
pending := &audit.PendingAudit{
NodeID: slowNode,
PieceID: pointer.Remote.RootPieceId,
PieceID: rootPieceID,
StripeIndex: randomIndex,
ShareSize: shareSize,
ExpectedShareHash: pkcrypto.SHA256Hash(share.Data),
ReverifyCount: 0,
Path: path,
Segment: queueSegment.SegmentLocation,
}
err = containment.IncrementPending(ctx, pending)
@ -1057,7 +1081,7 @@ func TestReverifySlowDownload(t *testing.T) {
delay := 1 * time.Second
slowNodeDB.SetLatency(delay)
report, err := audits.Verifier.Reverify(ctx, path)
report, err := audits.Verifier.Reverify(ctx, queueSegment)
require.NoError(t, err)
require.Len(t, report.Successes, 0)
@ -1096,40 +1120,41 @@ func TestReverifyUnknownError(t *testing.T) {
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
path, err := queue.Next()
queueSegment, err := queue.Next()
require.NoError(t, err)
pointer, err := satellite.Metainfo.Service.Get(ctx, metabase.SegmentKey(path))
segment, err := satellite.Metainfo.Metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
StreamID: queueSegment.StreamID,
Position: queueSegment.Position,
})
require.NoError(t, err)
badPiece := pointer.Remote.RemotePieces[0]
badNode := badPiece.NodeId
badPiece := segment.Pieces[0]
badNode := badPiece.StorageNode
randomIndex, err := audit.GetRandomStripe(ctx, pointer)
randomIndex, err := audit.GetRandomStripe(ctx, segment)
require.NoError(t, err)
orders := satellite.Orders.Service
containment := satellite.DB.Containment()
bucket := metabase.BucketLocation{ProjectID: ul.Projects[0].ID, BucketName: "testbucket"}
shareSize := pointer.GetRemote().GetRedundancy().GetErasureShareSize()
shareSize := segment.Redundancy.ShareSize
rootPieceID := segment.RootPieceID
pieces := pointer.GetRemote().GetRemotePieces()
rootPieceID := pointer.GetRemote().RootPieceId
limit, privateKey, cachedIPAndPort, err := orders.CreateAuditOrderLimit(ctx, bucket, badNode, badPiece.PieceNum, rootPieceID, shareSize)
limit, privateKey, cachedIPAndPort, err := orders.CreateAuditOrderLimit(ctx, queueSegment.Bucket(), badNode, badPiece.Number, rootPieceID, shareSize)
require.NoError(t, err)
share, err := audits.Verifier.GetShare(ctx, limit, privateKey, cachedIPAndPort, randomIndex, shareSize, int(pieces[0].PieceNum))
share, err := audits.Verifier.GetShare(ctx, limit, privateKey, cachedIPAndPort, randomIndex, shareSize, int(badPiece.Number))
require.NoError(t, err)
pending := &audit.PendingAudit{
NodeID: badNode,
PieceID: pointer.Remote.RootPieceId,
PieceID: rootPieceID,
StripeIndex: randomIndex,
ShareSize: shareSize,
ExpectedShareHash: pkcrypto.SHA256Hash(share.Data),
ReverifyCount: 0,
Path: path,
Segment: queueSegment.SegmentLocation,
}
err = containment.IncrementPending(ctx, pending)
@ -1140,7 +1165,7 @@ func TestReverifyUnknownError(t *testing.T) {
// return an error when the satellite requests a share
badNodeDB.SetError(errs.New("unknown error"))
report, err := audits.Verifier.Reverify(ctx, path)
report, err := audits.Verifier.Reverify(ctx, queueSegment)
require.NoError(t, err)
require.Len(t, report.Successes, 0)
@ -1150,8 +1175,8 @@ func TestReverifyUnknownError(t *testing.T) {
require.Len(t, report.Unknown, 1)
require.Equal(t, report.Unknown[0], badNode)
// record audit
_, err = audits.Reporter.RecordAudits(ctx, report, path)
// record audit
_, err = audits.Reporter.RecordAudits(ctx, report)
require.NoError(t, err)
// make sure that pending audit is removed by the reporter when audit is recorded

View File

@ -27,7 +27,6 @@ import (
"storj.io/storj/satellite/metainfo/metabase"
"storj.io/storj/satellite/orders"
"storj.io/storj/satellite/overlay"
"storj.io/uplink/private/eestream"
"storj.io/uplink/private/piecestore"
)
@ -55,7 +54,7 @@ type Share struct {
// architecture: Worker
type Verifier struct {
log *zap.Logger
metainfo *metainfo.Service
metabase metainfo.MetabaseDB
orders *orders.Service
auditor *identity.PeerIdentity
dialer rpc.Dialer
@ -64,14 +63,15 @@ type Verifier struct {
minBytesPerSecond memory.Size
minDownloadTimeout time.Duration
nowFn func() time.Time
OnTestingCheckSegmentAlteredHook func()
}
// NewVerifier creates a Verifier.
func NewVerifier(log *zap.Logger, metainfo *metainfo.Service, dialer rpc.Dialer, overlay *overlay.Service, containment Containment, orders *orders.Service, id *identity.FullIdentity, minBytesPerSecond memory.Size, minDownloadTimeout time.Duration) *Verifier {
func NewVerifier(log *zap.Logger, metabase metainfo.MetabaseDB, dialer rpc.Dialer, overlay *overlay.Service, containment Containment, orders *orders.Service, id *identity.FullIdentity, minBytesPerSecond memory.Size, minDownloadTimeout time.Duration) *Verifier {
return &Verifier{
log: log,
metainfo: metainfo,
metabase: metabase,
orders: orders,
auditor: id.PeerIdentity(),
dialer: dialer,
@ -79,33 +79,32 @@ func NewVerifier(log *zap.Logger, metainfo *metainfo.Service, dialer rpc.Dialer,
containment: containment,
minBytesPerSecond: minBytesPerSecond,
minDownloadTimeout: minDownloadTimeout,
nowFn: time.Now,
}
}
// Verify downloads shares then verifies the data correctness at a random stripe.
func (verifier *Verifier) Verify(ctx context.Context, path storj.Path, skip map[storj.NodeID]bool) (report Report, err error) {
func (verifier *Verifier) Verify(ctx context.Context, segment Segment, skip map[storj.NodeID]bool) (report Report, err error) {
defer mon.Task()(&ctx)(&err)
pointerBytes, pointer, err := verifier.metainfo.GetWithBytes(ctx, metabase.SegmentKey(path))
if segment.Expired(verifier.nowFn()) {
verifier.log.Debug("segment expired before Verify")
return Report{}, nil
}
segmentInfo, err := verifier.metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
StreamID: segment.StreamID,
Position: segment.Position,
})
if err != nil {
if storj.ErrObjectNotFound.Has(err) {
if metabase.ErrSegmentNotFound.Has(err) {
verifier.log.Debug("segment deleted before Verify")
return Report{}, nil
}
return Report{}, err
}
if !pointer.ExpirationDate.IsZero() && pointer.ExpirationDate.Before(time.Now()) {
verifier.log.Debug("segment expired before Verify")
return Report{}, nil
}
randomIndex, err := GetRandomStripe(ctx, pointer)
if err != nil {
return Report{}, err
}
shareSize := pointer.GetRemote().GetRedundancy().GetErasureShareSize()
segmentLocation, err := metabase.ParseSegmentKey(metabase.SegmentKey(path))
randomIndex, err := GetRandomStripe(ctx, segmentInfo)
if err != nil {
return Report{}, err
}
@ -116,27 +115,27 @@ func (verifier *Verifier) Verify(ctx context.Context, path storj.Path, skip map[
containedNodes := make(map[int]storj.NodeID)
sharesToAudit := make(map[int]Share)
orderLimits, privateKey, cachedIPsAndPorts, err := verifier.orders.CreateAuditOrderLimits(ctx, segmentLocation.Bucket(), pointer, skip)
orderLimits, privateKey, cachedIPsAndPorts, err := verifier.orders.CreateAuditOrderLimits(ctx, segment.Bucket(), segmentInfo, skip)
if err != nil {
return Report{}, err
}
// NOTE offlineNodes will include disqualified nodes because they aren't in
// the skip list
offlineNodes = getOfflineNodes(pointer, orderLimits, skip)
offlineNodes = getOfflineNodes(segmentInfo, orderLimits, skip)
if len(offlineNodes) > 0 {
verifier.log.Debug("Verify: order limits not created for some nodes (offline/disqualified)",
zap.Strings("Node IDs", offlineNodes.Strings()))
}
shares, err := verifier.DownloadShares(ctx, orderLimits, privateKey, cachedIPsAndPorts, randomIndex, shareSize)
shares, err := verifier.DownloadShares(ctx, orderLimits, privateKey, cachedIPsAndPorts, randomIndex, segmentInfo.Redundancy.ShareSize)
if err != nil {
return Report{
Offlines: offlineNodes,
}, err
}
err = verifier.checkIfSegmentAltered(ctx, path, pointer, pointerBytes)
err = verifier.checkIfSegmentAltered(ctx, segment.SegmentLocation, segmentInfo)
if err != nil {
if ErrSegmentDeleted.Has(err) {
verifier.log.Debug("segment deleted during Verify")
@ -209,10 +208,10 @@ func (verifier *Verifier) Verify(ctx context.Context, path storj.Path, skip map[
mon.IntVal("verify_shares_downloaded_successfully").Observe(int64(len(sharesToAudit))) //mon:locked
required := int(pointer.Remote.Redundancy.GetMinReq())
total := int(pointer.Remote.Redundancy.GetTotal())
required := segmentInfo.Redundancy.RequiredShares
total := segmentInfo.Redundancy.TotalShares
if len(sharesToAudit) < required {
if len(sharesToAudit) < int(required) {
mon.Counter("not_enough_shares_for_audit").Inc(1)
return Report{
Fails: failedNodes,
@ -238,14 +237,14 @@ func (verifier *Verifier) Verify(ctx context.Context, path storj.Path, skip map[
successNodes := getSuccessNodes(ctx, shares, failedNodes, offlineNodes, unknownNodes, containedNodes)
totalInPointer := len(pointer.GetRemote().GetRemotePieces())
totalInSegment := len(segmentInfo.Pieces)
numOffline := len(offlineNodes)
numSuccessful := len(successNodes)
numFailed := len(failedNodes)
numContained := len(containedNodes)
numUnknown := len(unknownNodes)
totalAudited := numSuccessful + numFailed + numOffline + numContained
auditedPercentage := float64(totalAudited) / float64(totalInPointer)
auditedPercentage := float64(totalAudited) / float64(totalInSegment)
offlinePercentage := float64(0)
successfulPercentage := float64(0)
failedPercentage := float64(0)
@ -265,7 +264,7 @@ func (verifier *Verifier) Verify(ctx context.Context, path storj.Path, skip map[
mon.Meter("audit_contained_nodes_global").Mark(numContained) //mon:locked
mon.Meter("audit_unknown_nodes_global").Mark(numUnknown) //mon:locked
mon.Meter("audit_total_nodes_global").Mark(totalAudited) //mon:locked
mon.Meter("audit_total_pointer_nodes_global").Mark(totalInPointer) //mon:locked
mon.Meter("audit_total_pointer_nodes_global").Mark(totalInSegment) //mon:locked
mon.IntVal("audit_success_nodes").Observe(int64(numSuccessful)) //mon:locked
mon.IntVal("audit_fail_nodes").Observe(int64(numFailed)) //mon:locked
@ -273,7 +272,7 @@ func (verifier *Verifier) Verify(ctx context.Context, path storj.Path, skip map[
mon.IntVal("audit_contained_nodes").Observe(int64(numContained)) //mon:locked
mon.IntVal("audit_unknown_nodes").Observe(int64(numUnknown)) //mon:locked
mon.IntVal("audit_total_nodes").Observe(int64(totalAudited)) //mon:locked
mon.IntVal("audit_total_pointer_nodes").Observe(int64(totalInPointer)) //mon:locked
mon.IntVal("audit_total_pointer_nodes").Observe(int64(totalInSegment)) //mon:locked
mon.FloatVal("audited_percentage").Observe(auditedPercentage) //mon:locked
mon.FloatVal("audit_offline_percentage").Observe(offlinePercentage) //mon:locked
mon.FloatVal("audit_successful_percentage").Observe(successfulPercentage) //mon:locked
@ -281,7 +280,7 @@ func (verifier *Verifier) Verify(ctx context.Context, path storj.Path, skip map[
mon.FloatVal("audit_contained_percentage").Observe(containedPercentage) //mon:locked
mon.FloatVal("audit_unknown_percentage").Observe(unknownPercentage) //mon:locked
pendingAudits, err := createPendingAudits(ctx, containedNodes, correctedShares, pointer, randomIndex, path)
pendingAudits, err := createPendingAudits(ctx, containedNodes, correctedShares, segment, segmentInfo, randomIndex)
if err != nil {
return Report{
Successes: successNodes,
@ -301,7 +300,7 @@ func (verifier *Verifier) Verify(ctx context.Context, path storj.Path, skip map[
}
// DownloadShares downloads shares from the nodes where remote pieces are located.
func (verifier *Verifier) DownloadShares(ctx context.Context, limits []*pb.AddressedOrderLimit, piecePrivateKey storj.PiecePrivateKey, cachedIPsAndPorts map[storj.NodeID]string, stripeIndex int64, shareSize int32) (shares map[int]Share, err error) {
func (verifier *Verifier) DownloadShares(ctx context.Context, limits []*pb.AddressedOrderLimit, piecePrivateKey storj.PiecePrivateKey, cachedIPsAndPorts map[storj.NodeID]string, stripeIndex int32, shareSize int32) (shares map[int]Share, err error) {
defer mon.Task()(&ctx)(&err)
shares = make(map[int]Share, len(limits))
@ -339,7 +338,7 @@ func (verifier *Verifier) DownloadShares(ctx context.Context, limits []*pb.Addre
}
// Reverify reverifies the contained nodes in the stripe.
func (verifier *Verifier) Reverify(ctx context.Context, path storj.Path) (report Report, err error) {
func (verifier *Verifier) Reverify(ctx context.Context, segment Segment) (report Report, err error) {
defer mon.Task()(&ctx)(&err)
// result status enum
@ -360,38 +359,45 @@ func (verifier *Verifier) Reverify(ctx context.Context, path storj.Path) (report
err error
}
pointer, err := verifier.metainfo.Get(ctx, metabase.SegmentKey(path))
if segment.Expired(verifier.nowFn()) {
verifier.log.Debug("segment expired before Reverify")
return Report{}, nil
}
segmentInfo, err := verifier.metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
StreamID: segment.StreamID,
Position: segment.Position,
})
if err != nil {
if storj.ErrObjectNotFound.Has(err) {
if metabase.ErrSegmentNotFound.Has(err) {
verifier.log.Debug("segment deleted before Reverify")
return Report{}, nil
}
return Report{}, err
}
if !pointer.ExpirationDate.IsZero() && pointer.ExpirationDate.Before(time.Now()) {
verifier.log.Debug("Segment expired before Reverify")
return Report{}, nil
}
pieces := pointer.GetRemote().GetRemotePieces()
pieces := segmentInfo.Pieces
ch := make(chan result, len(pieces))
var containedInSegment int64
for _, piece := range pieces {
pending, err := verifier.containment.Get(ctx, piece.NodeId)
pending, err := verifier.containment.Get(ctx, piece.StorageNode)
if err != nil {
if ErrContainedNotFound.Has(err) {
ch <- result{nodeID: piece.NodeId, status: skipped}
ch <- result{nodeID: piece.StorageNode, status: skipped}
continue
}
ch <- result{nodeID: piece.NodeId, status: erred, err: err}
verifier.log.Debug("Reverify: error getting from containment db", zap.Stringer("Node ID", piece.NodeId), zap.Error(err))
ch <- result{nodeID: piece.StorageNode, status: erred, err: err}
verifier.log.Debug("Reverify: error getting from containment db", zap.Stringer("Node ID", piece.StorageNode), zap.Error(err))
continue
}
containedInSegment++
go func(pending *PendingAudit) {
pendingPointerBytes, pendingPointer, err := verifier.metainfo.GetWithBytes(ctx, metabase.SegmentKey(pending.Path))
// TODO: Get the exact version of the object. But where to take the version from the pending audit?
pendingObject, err := verifier.metabase.GetObjectLatestVersion(ctx, metabase.GetObjectLatestVersion{
ObjectLocation: pending.Segment.Object(),
})
if err != nil {
if storj.ErrObjectNotFound.Has(err) {
ch <- result{nodeID: pending.NodeID, status: skipped}
@ -399,24 +405,41 @@ func (verifier *Verifier) Reverify(ctx context.Context, path storj.Path) (report
}
ch <- result{nodeID: pending.NodeID, status: erred, err: err}
verifier.log.Debug("Reverify: error getting pending pointer from metainfo", zap.Stringer("Node ID", pending.NodeID), zap.Error(err))
verifier.log.Debug("Reverify: error getting pending segment's object from metabase", zap.Stringer("Node ID", pending.NodeID), zap.Error(err))
return
}
if !pendingPointer.ExpirationDate.IsZero() && pendingPointer.ExpirationDate.Before(time.Now().UTC()) {
if pendingObject.ExpiresAt != nil && !pendingObject.ExpiresAt.IsZero() && pendingObject.ExpiresAt.Before(verifier.nowFn()) {
verifier.log.Debug("Reverify: segment already expired", zap.Stringer("Node ID", pending.NodeID))
ch <- result{nodeID: pending.NodeID, status: skipped}
return
}
if pendingPointer.GetRemote().RootPieceId != pending.PieceID {
pendingSegmentInfo, err := verifier.metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
StreamID: pendingObject.StreamID,
Position: pending.Segment.Position,
})
if err != nil {
if metabase.ErrSegmentNotFound.Has(err) {
ch <- result{nodeID: pending.NodeID, status: skipped}
return
}
ch <- result{nodeID: pending.NodeID, status: erred, err: err}
verifier.log.Debug("Reverify: error getting pending segment from metabase", zap.Stringer("Node ID", pending.NodeID), zap.Error(err))
return
}
// TODO: is this check still necessary? If the segment was found by its StreamID and position, the RootPieceID should not had changed.
if pendingSegmentInfo.RootPieceID != pending.PieceID {
ch <- result{nodeID: pending.NodeID, status: skipped}
return
}
var pieceNum int32
var pieceNum uint16
found := false
for _, piece := range pendingPointer.GetRemote().GetRemotePieces() {
if piece.NodeId == pending.NodeID {
pieceNum = piece.PieceNum
for _, piece := range pendingSegmentInfo.Pieces {
if piece.StorageNode == pending.NodeID {
pieceNum = piece.Number
found = true
}
}
@ -425,13 +448,7 @@ func (verifier *Verifier) Reverify(ctx context.Context, path storj.Path) (report
return
}
segmentLocation, err := metabase.ParseSegmentKey(metabase.SegmentKey(pending.Path)) // TODO: this should be parsed in pending
if err != nil {
ch <- result{nodeID: pending.NodeID, status: skipped}
return
}
limit, piecePrivateKey, cachedIPAndPort, err := verifier.orders.CreateAuditOrderLimit(ctx, segmentLocation.Bucket(), pending.NodeID, pieceNum, pending.PieceID, pending.ShareSize)
limit, piecePrivateKey, cachedIPAndPort, err := verifier.orders.CreateAuditOrderLimit(ctx, pending.Segment.Bucket(), pending.NodeID, pieceNum, pending.PieceID, pending.ShareSize)
if err != nil {
if overlay.ErrNodeDisqualified.Has(err) {
_, errDelete := verifier.containment.Delete(ctx, pending.NodeID)
@ -497,8 +514,8 @@ func (verifier *Verifier) Reverify(ctx context.Context, path storj.Path) (report
return
}
if errs2.IsRPC(err, rpcstatus.NotFound) {
// Get the original segment pointer in the metainfo
err := verifier.checkIfSegmentAltered(ctx, pending.Path, pendingPointer, pendingPointerBytes)
// Get the original segment
err := verifier.checkIfSegmentAltered(ctx, pending.Segment, pendingSegmentInfo)
if err != nil {
ch <- result{nodeID: pending.NodeID, status: skipped}
verifier.log.Debug("Reverify: audit source changed before reverification", zap.Stringer("Node ID", pending.NodeID), zap.Error(err))
@ -525,7 +542,7 @@ func (verifier *Verifier) Reverify(ctx context.Context, path storj.Path) (report
ch <- result{nodeID: pending.NodeID, status: success}
verifier.log.Info("Reverify: hashes match (audit success)", zap.Stringer("Node ID", pending.NodeID))
} else {
err := verifier.checkIfSegmentAltered(ctx, pending.Path, pendingPointer, pendingPointerBytes)
err := verifier.checkIfSegmentAltered(ctx, pending.Segment, pendingSegmentInfo)
if err != nil {
ch <- result{nodeID: pending.NodeID, status: skipped}
verifier.log.Debug("Reverify: audit source changed before reverification", zap.Stringer("Node ID", pending.NodeID), zap.Error(err))
@ -580,7 +597,7 @@ func (verifier *Verifier) Reverify(ctx context.Context, path storj.Path) (report
}
// GetShare use piece store client to download shares from nodes.
func (verifier *Verifier) GetShare(ctx context.Context, limit *pb.AddressedOrderLimit, piecePrivateKey storj.PiecePrivateKey, cachedIPAndPort string, stripeIndex int64, shareSize int32, pieceNum int) (share Share, err error) {
func (verifier *Verifier) GetShare(ctx context.Context, limit *pb.AddressedOrderLimit, piecePrivateKey storj.PiecePrivateKey, cachedIPAndPort string, stripeIndex, shareSize int32, pieceNum int) (share Share, err error) {
defer mon.Task()(&ctx)(&err)
bandwidthMsgSize := shareSize
@ -632,7 +649,7 @@ func (verifier *Verifier) GetShare(ctx context.Context, limit *pb.AddressedOrder
}
}()
offset := int64(shareSize) * stripeIndex
offset := int64(shareSize) * int64(stripeIndex)
downloader, err := ps.Download(timedCtx, limit.GetLimit(), piecePrivateKey, offset, int64(shareSize))
if err != nil {
@ -654,38 +671,42 @@ func (verifier *Verifier) GetShare(ctx context.Context, limit *pb.AddressedOrder
}, nil
}
// checkIfSegmentAltered checks if path's pointer has been altered since path was selected.
func (verifier *Verifier) checkIfSegmentAltered(ctx context.Context, segmentKey string, oldPointer *pb.Pointer, oldPointerBytes []byte) (err error) {
// checkIfSegmentAltered checks if oldSegment has been altered since it was selected for audit.
func (verifier *Verifier) checkIfSegmentAltered(ctx context.Context, location metabase.SegmentLocation, oldSegment metabase.Segment) (err error) {
defer mon.Task()(&ctx)(&err)
if verifier.OnTestingCheckSegmentAlteredHook != nil {
verifier.OnTestingCheckSegmentAlteredHook()
}
newPointerBytes, newPointer, err := verifier.metainfo.GetWithBytes(ctx, metabase.SegmentKey(segmentKey))
newSegment, err := verifier.metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
StreamID: oldSegment.StreamID,
Position: oldSegment.Position,
})
if err != nil {
if storj.ErrObjectNotFound.Has(err) {
return ErrSegmentDeleted.New("%q", segmentKey)
if metabase.ErrSegmentNotFound.Has(err) {
return ErrSegmentDeleted.New("%q", location.Encode())
}
return err
}
if oldPointer != nil && oldPointer.CreationDate != newPointer.CreationDate {
return ErrSegmentDeleted.New("%q", segmentKey)
}
if !bytes.Equal(oldPointerBytes, newPointerBytes) {
return ErrSegmentModified.New("%q", segmentKey)
if !oldSegment.Pieces.Equal(newSegment.Pieces) {
return ErrSegmentModified.New("%q", location.Encode())
}
return nil
}
// SetNow allows tests to have the server act as if the current time is whatever they want.
func (verifier *Verifier) SetNow(nowFn func() time.Time) {
verifier.nowFn = nowFn
}
// auditShares takes the downloaded shares and uses infectious's Correct function to check that they
// haven't been altered. auditShares returns a slice containing the piece numbers of altered shares,
// and a slice of the corrected shares.
func auditShares(ctx context.Context, required, total int, originals map[int]Share) (pieceNums []int, corrected []infectious.Share, err error) {
func auditShares(ctx context.Context, required, total int16, originals map[int]Share) (pieceNums []int, corrected []infectious.Share, err error) {
defer mon.Task()(&ctx)(&err)
f, err := infectious.NewFEC(required, total)
f, err := infectious.NewFEC(int(required), int(total))
if err != nil {
return nil, nil, err
}
@ -720,9 +741,9 @@ func makeCopies(ctx context.Context, originals map[int]Share) (copies []infectio
return copies, nil
}
// getOfflines nodes returns these storage nodes from pointer which have no
// getOfflines nodes returns these storage nodes from the segment which have no
// order limit nor are skipped.
func getOfflineNodes(pointer *pb.Pointer, limits []*pb.AddressedOrderLimit, skip map[storj.NodeID]bool) storj.NodeIDList {
func getOfflineNodes(segment metabase.Segment, limits []*pb.AddressedOrderLimit, skip map[storj.NodeID]bool) storj.NodeIDList {
var offlines storj.NodeIDList
nodesWithLimit := make(map[storj.NodeID]bool, len(limits))
@ -732,9 +753,9 @@ func getOfflineNodes(pointer *pb.Pointer, limits []*pb.AddressedOrderLimit, skip
}
}
for _, piece := range pointer.GetRemote().GetRemotePieces() {
if !nodesWithLimit[piece.NodeId] && !skip[piece.NodeId] {
offlines = append(offlines, piece.NodeId)
for _, piece := range segment.Pieces {
if !nodesWithLimit[piece.StorageNode] && !skip[piece.StorageNode] {
offlines = append(offlines, piece.StorageNode)
}
}
@ -767,17 +788,16 @@ func getSuccessNodes(ctx context.Context, shares map[int]Share, failedNodes, off
return successNodes
}
func createPendingAudits(ctx context.Context, containedNodes map[int]storj.NodeID, correctedShares []infectious.Share, pointer *pb.Pointer, randomIndex int64, path storj.Path) (pending []*PendingAudit, err error) {
func createPendingAudits(ctx context.Context, containedNodes map[int]storj.NodeID, correctedShares []infectious.Share, segment Segment, segmentInfo metabase.Segment, randomIndex int32) (pending []*PendingAudit, err error) {
defer mon.Task()(&ctx)(&err)
if len(containedNodes) == 0 {
return nil, nil
}
redundancy := pointer.GetRemote().GetRedundancy()
required := int(redundancy.GetMinReq())
total := int(redundancy.GetTotal())
shareSize := redundancy.GetErasureShareSize()
required := int(segmentInfo.Redundancy.RequiredShares)
total := int(segmentInfo.Redundancy.TotalShares)
shareSize := segmentInfo.Redundancy.ShareSize
fec, err := infectious.NewFEC(required, total)
if err != nil {
@ -797,11 +817,11 @@ func createPendingAudits(ctx context.Context, containedNodes map[int]storj.NodeI
}
pending = append(pending, &PendingAudit{
NodeID: nodeID,
PieceID: pointer.GetRemote().RootPieceId,
PieceID: segmentInfo.RootPieceID,
StripeIndex: randomIndex,
ShareSize: shareSize,
ExpectedShareHash: pkcrypto.SHA256Hash(share),
Path: path,
Segment: segment.SegmentLocation,
})
}
@ -820,23 +840,19 @@ func rebuildStripe(ctx context.Context, fec *infectious.FEC, corrected []infecti
return stripe, nil
}
// GetRandomStripe takes a pointer and returns a random stripe index within that pointer.
func GetRandomStripe(ctx context.Context, pointer *pb.Pointer) (index int64, err error) {
// GetRandomStripe takes a segment and returns a random stripe index within that segment.
func GetRandomStripe(ctx context.Context, segment metabase.Segment) (index int32, err error) {
defer mon.Task()(&ctx)(&err)
redundancy, err := eestream.NewRedundancyStrategyFromProto(pointer.GetRemote().GetRedundancy())
if err != nil {
return 0, err
}
// the last segment could be smaller than stripe size
if pointer.GetSegmentSize() < int64(redundancy.StripeSize()) {
if segment.EncryptedSize < segment.Redundancy.StripeSize() {
return 0, nil
}
var src cryptoSource
rnd := rand.New(src)
numStripes := pointer.GetSegmentSize() / int64(redundancy.StripeSize())
randomStripeIndex := rnd.Int63n(numStripes)
numStripes := segment.EncryptedSize / segment.Redundancy.StripeSize()
randomStripeIndex := rnd.Int31n(numStripes)
return randomStripeIndex, nil
}

View File

@ -15,7 +15,6 @@ import (
"storj.io/common/errs2"
"storj.io/common/memory"
"storj.io/common/pb"
"storj.io/common/peertls/tlsopts"
"storj.io/common/rpc"
"storj.io/common/rpc/rpcstatus"
@ -27,7 +26,6 @@ import (
"storj.io/storj/satellite"
"storj.io/storj/satellite/audit"
"storj.io/storj/satellite/metainfo/metabase"
"storj.io/storj/storage"
"storj.io/storj/storagenode"
)
@ -50,21 +48,23 @@ func TestDownloadSharesHappyPath(t *testing.T) {
err := uplink.Upload(ctx, satellite, "testbucket", "test/path", testData)
require.NoError(t, err)
bucket := metabase.BucketLocation{ProjectID: uplink.Projects[0].ID, BucketName: "testbucket"}
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
path, err := queue.Next()
queueSegment, err := queue.Next()
require.NoError(t, err)
pointer, err := satellite.Metainfo.Service.Get(ctx, metabase.SegmentKey(path))
segment, err := satellite.Metainfo.Metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
StreamID: queueSegment.StreamID,
Position: queueSegment.Position,
})
require.NoError(t, err)
randomIndex, err := audit.GetRandomStripe(ctx, pointer)
randomIndex, err := audit.GetRandomStripe(ctx, segment)
require.NoError(t, err)
shareSize := pointer.GetRemote().GetRedundancy().GetErasureShareSize()
limits, privateKey, cachedIPsAndPorts, err := satellite.Orders.Service.CreateAuditOrderLimits(ctx, bucket, pointer, nil)
shareSize := segment.Redundancy.ShareSize
limits, privateKey, cachedIPsAndPorts, err := satellite.Orders.Service.CreateAuditOrderLimits(ctx, queueSegment.Bucket(), segment, nil)
require.NoError(t, err)
shares, err := audits.Verifier.DownloadShares(ctx, limits, privateKey, cachedIPsAndPorts, randomIndex, shareSize)
@ -101,25 +101,27 @@ func TestDownloadSharesOfflineNode(t *testing.T) {
err := uplink.Upload(ctx, satellite, "testbucket", "test/path", testData)
require.NoError(t, err)
bucket := metabase.BucketLocation{ProjectID: uplink.Projects[0].ID, BucketName: "testbucket"}
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
path, err := queue.Next()
queueSegment, err := queue.Next()
require.NoError(t, err)
pointer, err := satellite.Metainfo.Service.Get(ctx, metabase.SegmentKey(path))
segment, err := satellite.Metainfo.Metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
StreamID: queueSegment.StreamID,
Position: queueSegment.Position,
})
require.NoError(t, err)
randomIndex, err := audit.GetRandomStripe(ctx, pointer)
randomIndex, err := audit.GetRandomStripe(ctx, segment)
require.NoError(t, err)
shareSize := pointer.GetRemote().GetRedundancy().GetErasureShareSize()
limits, privateKey, cachedIPsAndPorts, err := satellite.Orders.Service.CreateAuditOrderLimits(ctx, bucket, pointer, nil)
shareSize := segment.Redundancy.ShareSize
limits, privateKey, cachedIPsAndPorts, err := satellite.Orders.Service.CreateAuditOrderLimits(ctx, queueSegment.Bucket(), segment, nil)
require.NoError(t, err)
// stop the first node in the pointer
stoppedNodeID := pointer.GetRemote().GetRemotePieces()[0].NodeId
// stop the first node in the segment
stoppedNodeID := segment.Pieces[0].StorageNode
err = planet.StopNodeAndUpdate(ctx, planet.FindNode(stoppedNodeID))
require.NoError(t, err)
@ -162,23 +164,25 @@ func TestDownloadSharesMissingPiece(t *testing.T) {
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
path, err := queue.Next()
queueSegment, err := queue.Next()
require.NoError(t, err)
pointer, err := satellite.Metainfo.Service.Get(ctx, metabase.SegmentKey(path))
segment, err := satellite.Metainfo.Metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
StreamID: queueSegment.StreamID,
Position: queueSegment.Position,
})
require.NoError(t, err)
randomIndex, err := audit.GetRandomStripe(ctx, pointer)
randomIndex, err := audit.GetRandomStripe(ctx, segment)
require.NoError(t, err)
bucket := metabase.BucketLocation{ProjectID: uplink.Projects[0].ID, BucketName: "testbucket"}
// replace the piece id of the selected stripe with a new random one
// to simulate missing piece on the storage nodes
pointer.GetRemote().RootPieceId = storj.NewPieceID()
segment.RootPieceID = storj.NewPieceID()
shareSize := pointer.GetRemote().GetRedundancy().GetErasureShareSize()
limits, privateKey, cachedIPsAndPorts, err := satellite.Orders.Service.CreateAuditOrderLimits(ctx, bucket, pointer, nil)
shareSize := segment.Redundancy.ShareSize
limits, privateKey, cachedIPsAndPorts, err := satellite.Orders.Service.CreateAuditOrderLimits(ctx, queueSegment.Bucket(), segment, nil)
require.NoError(t, err)
shares, err := audits.Verifier.DownloadShares(ctx, limits, privateKey, cachedIPsAndPorts, randomIndex, shareSize)
@ -217,17 +221,18 @@ func TestDownloadSharesDialTimeout(t *testing.T) {
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
path, err := queue.Next()
queueSegment, err := queue.Next()
require.NoError(t, err)
pointer, err := satellite.Metainfo.Service.Get(ctx, metabase.SegmentKey(path))
segment, err := satellite.Metainfo.Metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
StreamID: queueSegment.StreamID,
Position: queueSegment.Position,
})
require.NoError(t, err)
randomIndex, err := audit.GetRandomStripe(ctx, pointer)
randomIndex, err := audit.GetRandomStripe(ctx, segment)
require.NoError(t, err)
bucket := metabase.BucketLocation{ProjectID: upl.Projects[0].ID, BucketName: "testbucket"}
tlsOptions, err := tlsopts.NewOptions(satellite.Identity, tlsopts.Config{}, nil)
require.NoError(t, err)
@ -245,7 +250,7 @@ func TestDownloadSharesDialTimeout(t *testing.T) {
verifier := audit.NewVerifier(
satellite.Log.Named("verifier"),
satellite.Metainfo.Service,
satellite.Metainfo.Metabase,
dialer,
satellite.Overlay.Service,
satellite.DB.Containment(),
@ -254,8 +259,9 @@ func TestDownloadSharesDialTimeout(t *testing.T) {
minBytesPerSecond,
5*time.Second)
shareSize := pointer.GetRemote().GetRedundancy().GetErasureShareSize()
limits, privateKey, cachedIPsAndPorts, err := satellite.Orders.Service.CreateAuditOrderLimits(ctx, bucket, pointer, nil)
shareSize := segment.Redundancy.ShareSize
limits, privateKey, cachedIPsAndPorts, err := satellite.Orders.Service.CreateAuditOrderLimits(ctx, queueSegment.Bucket(), segment, nil)
require.NoError(t, err)
shares, err := verifier.DownloadShares(ctx, limits, privateKey, cachedIPsAndPorts, randomIndex, shareSize)
@ -299,17 +305,18 @@ func TestDownloadSharesDownloadTimeout(t *testing.T) {
err := upl.Upload(ctx, satellite, "testbucket", "test/path", testData)
require.NoError(t, err)
bucket := metabase.BucketLocation{ProjectID: upl.Projects[0].ID, BucketName: "testbucket"}
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
path, err := queue.Next()
queueSegment, err := queue.Next()
require.NoError(t, err)
pointer, err := satellite.Metainfo.Service.Get(ctx, metabase.SegmentKey(path))
segment, err := satellite.Metainfo.Metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
StreamID: queueSegment.StreamID,
Position: queueSegment.Position,
})
require.NoError(t, err)
randomIndex, err := audit.GetRandomStripe(ctx, pointer)
randomIndex, err := audit.GetRandomStripe(ctx, segment)
require.NoError(t, err)
// This config value will create a very short timeframe allowed for receiving
@ -318,7 +325,7 @@ func TestDownloadSharesDownloadTimeout(t *testing.T) {
verifier := audit.NewVerifier(
satellite.Log.Named("verifier"),
satellite.Metainfo.Service,
satellite.Metainfo.Metabase,
satellite.Dialer,
satellite.Overlay.Service,
satellite.DB.Containment(),
@ -327,8 +334,9 @@ func TestDownloadSharesDownloadTimeout(t *testing.T) {
minBytesPerSecond,
150*time.Millisecond)
shareSize := pointer.GetRemote().GetRedundancy().GetErasureShareSize()
limits, privateKey, cachedIPsAndPorts, err := satellite.Orders.Service.CreateAuditOrderLimits(ctx, bucket, pointer, nil)
shareSize := segment.Redundancy.ShareSize
limits, privateKey, cachedIPsAndPorts, err := satellite.Orders.Service.CreateAuditOrderLimits(ctx, queueSegment.Bucket(), segment, nil)
require.NoError(t, err)
// make downloads on storage node slower than the timeout on the satellite for downloading shares
@ -363,16 +371,19 @@ func TestVerifierHappyPath(t *testing.T) {
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
path, err := queue.Next()
queueSegment, err := queue.Next()
require.NoError(t, err)
pointer, err := satellite.Metainfo.Service.Get(ctx, metabase.SegmentKey(path))
segment, err := satellite.Metainfo.Metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
StreamID: queueSegment.StreamID,
Position: queueSegment.Position,
})
require.NoError(t, err)
report, err := audits.Verifier.Verify(ctx, path, nil)
report, err := audits.Verifier.Verify(ctx, queueSegment, nil)
require.NoError(t, err)
assert.Len(t, report.Successes, len(pointer.GetRemote().GetRemotePieces()))
assert.Len(t, report.Successes, len(segment.Pieces))
assert.Len(t, report.Fails, 0)
assert.Len(t, report.Offlines, 0)
assert.Len(t, report.PendingAudits, 0)
@ -392,30 +403,21 @@ func TestVerifierExpired(t *testing.T) {
ul := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)
err := ul.Upload(ctx, satellite, "testbucket", "test/path", testData)
err := ul.UploadWithExpiration(ctx, satellite, "testbucket", "test/path", testData, time.Now().Add(1*time.Hour))
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
path, err := queue.Next()
queueSegment, err := queue.Next()
require.NoError(t, err)
// set pointer's expiration date to be already expired
pointer, err := satellite.Metainfo.Service.Get(ctx, metabase.SegmentKey(path))
require.NoError(t, err)
oldPointerBytes, err := pb.Marshal(pointer)
require.NoError(t, err)
newPointer := &pb.Pointer{}
err = pb.Unmarshal(oldPointerBytes, newPointer)
require.NoError(t, err)
newPointer.ExpirationDate = time.Now().Add(-1 * time.Hour)
newPointerBytes, err := pb.Marshal(newPointer)
require.NoError(t, err)
err = satellite.Metainfo.Database.CompareAndSwap(ctx, storage.Key(path), oldPointerBytes, newPointerBytes)
require.NoError(t, err)
// move time into the future so the segment is expired
audits.Verifier.SetNow(func() time.Time {
return time.Now().Add(2 * time.Hour)
})
// Verify should not return an error
report, err := audits.Verifier.Verify(ctx, path, nil)
report, err := audits.Verifier.Verify(ctx, queueSegment, nil)
require.NoError(t, err)
assert.Len(t, report.Successes, 0)
@ -444,21 +446,24 @@ func TestVerifierOfflineNode(t *testing.T) {
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
path, err := queue.Next()
queueSegment, err := queue.Next()
require.NoError(t, err)
pointer, err := satellite.Metainfo.Service.Get(ctx, metabase.SegmentKey(path))
segment, err := satellite.Metainfo.Metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
StreamID: queueSegment.StreamID,
Position: queueSegment.Position,
})
require.NoError(t, err)
// stop the first node in the pointer
stoppedNodeID := pointer.GetRemote().GetRemotePieces()[0].NodeId
// stop the first node in the segment
stoppedNodeID := segment.Pieces[0].StorageNode
err = planet.StopNodeAndUpdate(ctx, planet.FindNode(stoppedNodeID))
require.NoError(t, err)
report, err := audits.Verifier.Verify(ctx, path, nil)
report, err := audits.Verifier.Verify(ctx, queueSegment, nil)
require.NoError(t, err)
assert.Len(t, report.Successes, len(pointer.GetRemote().GetRemotePieces())-1)
assert.Len(t, report.Successes, len(segment.Pieces)-1)
assert.Len(t, report.Fails, 0)
assert.Len(t, report.Offlines, 1)
assert.Len(t, report.PendingAudits, 0)
@ -483,21 +488,24 @@ func TestVerifierMissingPiece(t *testing.T) {
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
path, err := queue.Next()
queueSegment, err := queue.Next()
require.NoError(t, err)
pointer, err := satellite.Metainfo.Service.Get(ctx, metabase.SegmentKey(path))
segment, err := satellite.Metainfo.Metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
StreamID: queueSegment.StreamID,
Position: queueSegment.Position,
})
require.NoError(t, err)
// delete the piece from the first node
origNumPieces := len(pointer.GetRemote().GetRemotePieces())
piece := pointer.GetRemote().GetRemotePieces()[0]
pieceID := pointer.GetRemote().RootPieceId.Derive(piece.NodeId, piece.PieceNum)
node := planet.FindNode(piece.NodeId)
origNumPieces := len(segment.Pieces)
piece := segment.Pieces[0]
pieceID := segment.RootPieceID.Derive(piece.StorageNode, int32(piece.Number))
node := planet.FindNode(piece.StorageNode)
err = node.Storage2.Store.Delete(ctx, satellite.ID(), pieceID)
require.NoError(t, err)
report, err := audits.Verifier.Verify(ctx, path, nil)
report, err := audits.Verifier.Verify(ctx, queueSegment, nil)
require.NoError(t, err)
assert.Len(t, report.Successes, origNumPieces-1)
@ -525,10 +533,13 @@ func TestVerifierDialTimeout(t *testing.T) {
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
path, err := queue.Next()
queueSegment, err := queue.Next()
require.NoError(t, err)
pointer, err := satellite.Metainfo.Service.Get(ctx, metabase.SegmentKey(path))
segment, err := satellite.Metainfo.Metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
StreamID: queueSegment.StreamID,
Position: queueSegment.Position,
})
require.NoError(t, err)
tlsOptions, err := tlsopts.NewOptions(satellite.Identity, tlsopts.Config{}, nil)
@ -548,7 +559,7 @@ func TestVerifierDialTimeout(t *testing.T) {
verifier := audit.NewVerifier(
satellite.Log.Named("verifier"),
satellite.Metainfo.Service,
satellite.Metainfo.Metabase,
dialer,
satellite.Overlay.Service,
satellite.DB.Containment(),
@ -557,12 +568,12 @@ func TestVerifierDialTimeout(t *testing.T) {
minBytesPerSecond,
5*time.Second)
report, err := verifier.Verify(ctx, path, nil)
report, err := verifier.Verify(ctx, queueSegment, nil)
require.True(t, audit.ErrNotEnoughShares.Has(err), "unexpected error: %+v", err)
assert.Len(t, report.Successes, 0)
assert.Len(t, report.Fails, 0)
assert.Len(t, report.Offlines, len(pointer.GetRemote().GetRemotePieces()))
assert.Len(t, report.Offlines, len(segment.Pieces))
assert.Len(t, report.PendingAudits, 0)
})
}
@ -585,7 +596,7 @@ func TestVerifierDeletedSegment(t *testing.T) {
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
path, err := queue.Next()
segment, err := queue.Next()
require.NoError(t, err)
// delete the file
@ -593,7 +604,7 @@ func TestVerifierDeletedSegment(t *testing.T) {
require.NoError(t, err)
// Verify should not return an error, but report should be empty
report, err := audits.Verifier.Verify(ctx, path, nil)
report, err := audits.Verifier.Verify(ctx, segment, nil)
require.NoError(t, err)
assert.Empty(t, report)
})
@ -605,7 +616,6 @@ func TestVerifierModifiedSegment(t *testing.T) {
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
audits := satellite.Audit
metainfo := satellite.Metainfo.Service
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
@ -618,20 +628,28 @@ func TestVerifierModifiedSegment(t *testing.T) {
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
path, err := queue.Next()
queueSegment, err := queue.Next()
require.NoError(t, err)
audits.Verifier.OnTestingCheckSegmentAlteredHook = func() {
// remove one piece from the segment so that checkIfSegmentAltered fails
pointer, err := metainfo.Get(ctx, metabase.SegmentKey(path))
segment, err := satellite.Metainfo.Metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
StreamID: queueSegment.StreamID,
Position: queueSegment.Position,
})
require.NoError(t, err)
pieceToRemove := pointer.Remote.RemotePieces[0]
_, err = metainfo.UpdatePieces(ctx, metabase.SegmentKey(path), pointer, nil, []*pb.RemotePiece{pieceToRemove})
err = satellite.Metainfo.Metabase.UpdateSegmentPieces(ctx, metabase.UpdateSegmentPieces{
StreamID: queueSegment.StreamID,
Position: queueSegment.Position,
OldPieces: segment.Pieces,
NewPieces: append([]metabase.Piece{segment.Pieces[0]}, segment.Pieces[2:]...),
})
require.NoError(t, err)
}
// Verify should not return an error, but report should be empty
report, err := audits.Verifier.Verify(ctx, path, nil)
report, err := audits.Verifier.Verify(ctx, queueSegment, nil)
require.NoError(t, err)
assert.Empty(t, report)
})
@ -655,7 +673,7 @@ func TestVerifierReplacedSegment(t *testing.T) {
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
path, err := queue.Next()
segment, err := queue.Next()
require.NoError(t, err)
audits.Verifier.OnTestingCheckSegmentAlteredHook = func() {
@ -665,7 +683,7 @@ func TestVerifierReplacedSegment(t *testing.T) {
}
// Verify should not return an error, but report should be empty
report, err := audits.Verifier.Verify(ctx, path, nil)
report, err := audits.Verifier.Verify(ctx, segment, nil)
require.NoError(t, err)
assert.Empty(t, report)
})
@ -689,26 +707,29 @@ func TestVerifierModifiedSegmentFailsOnce(t *testing.T) {
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
path, err := queue.Next()
queueSegment, err := queue.Next()
require.NoError(t, err)
pointer, err := satellite.Metainfo.Service.Get(ctx, metabase.SegmentKey(path))
segment, err := satellite.Metainfo.Metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
StreamID: queueSegment.StreamID,
Position: queueSegment.Position,
})
require.NoError(t, err)
// delete the piece from the first node
origNumPieces := len(pointer.GetRemote().GetRemotePieces())
piece := pointer.GetRemote().GetRemotePieces()[0]
pieceID := pointer.GetRemote().RootPieceId.Derive(piece.NodeId, piece.PieceNum)
node := planet.FindNode(piece.NodeId)
origNumPieces := len(segment.Pieces)
piece := segment.Pieces[0]
pieceID := segment.RootPieceID.Derive(piece.StorageNode, int32(piece.Number))
node := planet.FindNode(piece.StorageNode)
err = node.Storage2.Store.Delete(ctx, satellite.ID(), pieceID)
require.NoError(t, err)
report, err := audits.Verifier.Verify(ctx, path, nil)
report, err := audits.Verifier.Verify(ctx, queueSegment, nil)
require.NoError(t, err)
assert.Len(t, report.Successes, origNumPieces-1)
assert.Len(t, report.Fails, 1)
assert.Equal(t, report.Fails[0], piece.NodeId)
assert.Equal(t, report.Fails[0], piece.StorageNode)
assert.Len(t, report.Offlines, 0)
require.Len(t, report.PendingAudits, 0)
})
@ -747,19 +768,22 @@ func TestVerifierSlowDownload(t *testing.T) {
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
path, err := queue.Next()
queueSegment, err := queue.Next()
require.NoError(t, err)
pointer, err := satellite.Metainfo.Service.Get(ctx, metabase.SegmentKey(path))
segment, err := satellite.Metainfo.Metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
StreamID: queueSegment.StreamID,
Position: queueSegment.Position,
})
require.NoError(t, err)
slowNode := planet.FindNode(pointer.Remote.RemotePieces[0].NodeId)
slowNode := planet.FindNode(segment.Pieces[0].StorageNode)
slowNodeDB := slowNode.DB.(*testblobs.SlowDB)
// make downloads on storage node slower than the timeout on the satellite for downloading shares
delay := 1 * time.Second
slowNodeDB.SetLatency(delay)
report, err := audits.Verifier.Verify(ctx, path, nil)
report, err := audits.Verifier.Verify(ctx, queueSegment, nil)
require.NoError(t, err)
assert.NotContains(t, report.Successes, slowNode.ID())
@ -797,18 +821,21 @@ func TestVerifierUnknownError(t *testing.T) {
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
path, err := queue.Next()
queueSegment, err := queue.Next()
require.NoError(t, err)
pointer, err := satellite.Metainfo.Service.Get(ctx, metabase.SegmentKey(path))
segment, err := satellite.Metainfo.Metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
StreamID: queueSegment.StreamID,
Position: queueSegment.Position,
})
require.NoError(t, err)
badNode := planet.FindNode(pointer.Remote.RemotePieces[0].NodeId)
badNode := planet.FindNode(segment.Pieces[0].StorageNode)
badNodeDB := badNode.DB.(*testblobs.BadDB)
// return an error when the verifier attempts to download from this node
badNodeDB.SetError(errs.New("unknown error"))
report, err := audits.Verifier.Verify(ctx, path, nil)
report, err := audits.Verifier.Verify(ctx, queueSegment, nil)
require.NoError(t, err)
require.Len(t, report.Successes, 3)

View File

@ -7,16 +7,15 @@ import (
"context"
"math/rand"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/vivint/infectious"
"storj.io/common/pb"
"storj.io/common/pkcrypto"
"storj.io/common/storj"
"storj.io/common/testrand"
"storj.io/storj/satellite/metainfo/metabase"
)
func TestFailingAudit(t *testing.T) {
@ -130,28 +129,26 @@ func TestCreatePendingAudits(t *testing.T) {
contained := make(map[int]storj.NodeID)
contained[1] = testNodeID
pointer := &pb.Pointer{
CreationDate: time.Now(),
Type: pb.Pointer_REMOTE,
Remote: &pb.RemoteSegment{
RootPieceId: storj.NewPieceID(),
Redundancy: &pb.RedundancyScheme{
MinReq: 8,
Total: 14,
ErasureShareSize: int32(len(shares[0].Data)),
},
segment := testSegment("test")
segmentInfo := metabase.Segment{
StreamID: segment.StreamID,
RootPieceID: testrand.PieceID(),
Redundancy: storj.RedundancyScheme{
Algorithm: storj.ReedSolomon,
RequiredShares: required,
TotalShares: total,
ShareSize: int32(len(shares[0].Data)),
},
}
randomIndex := rand.Int31n(10)
randomIndex := rand.Int63n(10)
pending, err := createPendingAudits(ctx, contained, shares, pointer, randomIndex, "")
pending, err := createPendingAudits(ctx, contained, shares, segment, segmentInfo, randomIndex)
require.NoError(t, err)
require.Equal(t, 1, len(pending))
assert.Equal(t, testNodeID, pending[0].NodeID)
assert.Equal(t, pointer.Remote.RootPieceId, pending[0].PieceID)
assert.Equal(t, segmentInfo.RootPieceID, pending[0].PieceID)
assert.Equal(t, randomIndex, pending[0].StripeIndex)
assert.Equal(t, pointer.Remote.Redundancy.ErasureShareSize, pending[0].ShareSize)
assert.Equal(t, segmentInfo.Redundancy.ShareSize, pending[0].ShareSize)
assert.Equal(t, pkcrypto.SHA256Hash(shares[1].Data), pending[0].ExpectedShareHash)
assert.EqualValues(t, 0, pending[0].ReverifyCount)
}

View File

@ -28,7 +28,7 @@ type Config struct {
ChoreInterval time.Duration `help:"how often to run the reservoir chore" releaseDefault:"24h" devDefault:"1m"`
QueueInterval time.Duration `help:"how often to recheck an empty audit queue" releaseDefault:"1h" devDefault:"1m"`
Slots int `help:"number of reservoir slots allotted for nodes, currently capped at 3" default:"3"`
WorkerConcurrency int `help:"number of workers to run audits on paths" default:"2"`
WorkerConcurrency int `help:"number of workers to run audits on segments" default:"2"`
}
// Worker contains information for populating audit queue and processing audits.
@ -86,7 +86,7 @@ func (worker *Worker) process(ctx context.Context) (err error) {
worker.limiter.Wait()
for {
path, err := queue.Next()
segment, err := queue.Next()
if err != nil {
if ErrEmptyQueue.Has(err) {
// get a new queue and return if empty; otherwise continue working.
@ -100,27 +100,27 @@ func (worker *Worker) process(ctx context.Context) (err error) {
}
worker.limiter.Go(ctx, func() {
err := worker.work(ctx, path)
err := worker.work(ctx, segment)
if err != nil {
worker.log.Error("audit failed", zap.Binary("Segment", []byte(path)), zap.Error(err))
worker.log.Error("audit failed", zap.ByteString("Segment", []byte(segment.Encode())), zap.Error(err))
}
})
}
}
func (worker *Worker) work(ctx context.Context, path storj.Path) (err error) {
func (worker *Worker) work(ctx context.Context, segment Segment) (err error) {
defer mon.Task()(&ctx)(&err)
var errlist errs.Group
// First, attempt to reverify nodes for this segment that are in containment mode.
report, err := worker.verifier.Reverify(ctx, path)
report, err := worker.verifier.Reverify(ctx, segment)
if err != nil {
errlist.Add(err)
}
// TODO(moby) we need to decide if we want to do something with nodes that the reporter failed to update
_, err = worker.reporter.RecordAudits(ctx, report, path)
_, err = worker.reporter.RecordAudits(ctx, report)
if err != nil {
errlist.Add(err)
}
@ -144,13 +144,13 @@ func (worker *Worker) work(ctx context.Context, path storj.Path) (err error) {
}
// Next, audit the the remaining nodes that are not in containment mode.
report, err = worker.verifier.Verify(ctx, path, skip)
report, err = worker.verifier.Verify(ctx, segment, skip)
if err != nil {
errlist.Add(err)
}
// TODO(moby) we need to decide if we want to do something with nodes that the reporter failed to update
_, err = worker.reporter.RecordAudits(ctx, report, path)
_, err = worker.reporter.RecordAudits(ctx, report)
if err != nil {
errlist.Add(err)
}

View File

@ -311,7 +311,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB,
peer.Audit.Queues = audit.NewQueues()
peer.Audit.Verifier = audit.NewVerifier(log.Named("audit:verifier"),
peer.Metainfo.Service,
peer.Metainfo.Metabase,
peer.Dialer,
peer.Overlay.Service,
peer.DB.Containment(),

View File

@ -202,8 +202,7 @@ type MetabaseDB interface {
// BucketEmpty returns true if bucket does not contain objects (pending or committed).
// This method doesn't check bucket existence.
BucketEmpty(ctx context.Context, opts metabase.BucketEmpty) (empty bool, err error)
// UpdateSegmentPieces updates pieces for specified segment. If provided old pieces
// won't match current database state update will fail.
// UpdateSegmentPieces updates pieces for specified segment. If provided old pieces won't match current database state update will fail.
UpdateSegmentPieces(ctx context.Context, opts metabase.UpdateSegmentPieces) (err error)
// TestingAllCommittedObjects gets all committed objects from bucket. Use only for testing purposes.
@ -212,7 +211,6 @@ type MetabaseDB interface {
TestingAllPendingObjects(ctx context.Context, projectID uuid.UUID, bucketName string) (objects []metabase.ObjectEntry, err error)
// TestingAllObjectSegments gets all segments for given object. Use only for testing purposes.
TestingAllObjectSegments(ctx context.Context, objectLocation metabase.ObjectLocation) (segments []metabase.Segment, err error)
// TestingAllObjects gets all objects. Use only for testing purposes.
TestingAllObjects(ctx context.Context) (segments []metabase.Object, err error)
// TestingAllSegments gets all segments. Use only for testing purposes.

View File

@ -49,14 +49,14 @@ type Segment struct {
RootPieceID storj.PieceID // gc, graceful exit
Pieces metabase.Pieces // tally, audit, gc, graceful exit, repair
CreationDate time.Time // repair
expirationDate time.Time // tally, repair
ExpirationDate time.Time // tally, repair
LastRepaired time.Time // repair
Pointer *pb.Pointer // repair
}
// Expired checks if segment is expired relative to now.
func (segment *Segment) Expired(now time.Time) bool {
return !segment.expirationDate.IsZero() && segment.expirationDate.Before(now)
return !segment.ExpirationDate.IsZero() && segment.ExpirationDate.Before(now)
}
// Observer is an interface defining an observer that can subscribe to the metainfo loop.
@ -485,7 +485,7 @@ func handleSegment(ctx context.Context, observer *observerContext, location meta
}
if expiresAt != nil {
loopSegment.expirationDate = *expiresAt
loopSegment.ExpirationDate = *expiresAt
}
loopSegment.StreamID = segment.StreamID

View File

@ -137,19 +137,19 @@ func (service *Service) updateBandwidth(ctx context.Context, bucket metabase.Buc
return nil
}
// CreateGetOrderLimits creates the order limits for downloading the pieces of pointer.
func (service *Service) CreateGetOrderLimits(ctx context.Context, bucket metabase.BucketLocation, pointer *pb.Pointer) (_ []*pb.AddressedOrderLimit, privateKey storj.PiecePrivateKey, err error) {
// CreateGetOrderLimits creates the order limits for downloading the pieces of a segment.
func (service *Service) CreateGetOrderLimits(ctx context.Context, bucket metabase.BucketLocation, segment metabase.Segment) (_ []*pb.AddressedOrderLimit, privateKey storj.PiecePrivateKey, err error) {
defer mon.Task()(&ctx)(&err)
redundancy, err := eestream.NewRedundancyStrategyFromProto(pointer.GetRemote().GetRedundancy())
redundancy, err := eestream.NewRedundancyStrategyFromStorj(segment.Redundancy)
if err != nil {
return nil, storj.PiecePrivateKey{}, Error.Wrap(err)
}
pieceSize := eestream.CalcPieceSize(pointer.GetSegmentSize(), redundancy)
pieceSize := eestream.CalcPieceSize(int64(segment.EncryptedSize), redundancy)
nodeIDs := make([]storj.NodeID, len(pointer.GetRemote().GetRemotePieces()))
for i, piece := range pointer.GetRemote().GetRemotePieces() {
nodeIDs[i] = piece.NodeId
nodeIDs := make([]storj.NodeID, len(segment.Pieces))
for i, piece := range segment.Pieces {
nodeIDs[i] = piece.StorageNode
}
nodes, err := service.overlay.GetOnlineNodesForGetDelete(ctx, nodeIDs)
@ -158,17 +158,17 @@ func (service *Service) CreateGetOrderLimits(ctx context.Context, bucket metabas
return nil, storj.PiecePrivateKey{}, Error.Wrap(err)
}
signer, err := NewSignerGet(service, pointer.GetRemote().RootPieceId, time.Now(), pieceSize, bucket)
signer, err := NewSignerGet(service, segment.RootPieceID, time.Now(), pieceSize, bucket)
if err != nil {
return nil, storj.PiecePrivateKey{}, Error.Wrap(err)
}
neededLimits := pb.NewRedundancySchemeToStorj(pointer.GetRemote().GetRedundancy()).DownloadNodes()
neededLimits := segment.Redundancy.DownloadNodes()
pieces := pointer.GetRemote().GetRemotePieces()
pieces := segment.Pieces
for _, pieceIndex := range service.perm(len(pieces)) {
piece := pieces[pieceIndex]
node, ok := nodes[piece.NodeId]
node, ok := nodes[piece.StorageNode]
if !ok {
continue
}
@ -179,9 +179,9 @@ func (service *Service) CreateGetOrderLimits(ctx context.Context, bucket metabas
}
_, err := signer.Sign(ctx, storj.NodeURL{
ID: piece.NodeId,
ID: piece.StorageNode,
Address: address,
}, piece.PieceNum)
}, int32(piece.Number))
if err != nil {
return nil, storj.PiecePrivateKey{}, Error.Wrap(err)
}
@ -207,7 +207,7 @@ func (service *Service) CreateGetOrderLimits(ctx context.Context, bucket metabas
return signer.AddressedLimits, signer.PrivateKey, nil
}
// CreateGetOrderLimits2 creates the order limits for downloading the pieces of pointer.
// CreateGetOrderLimits2 creates the order limits for downloading the pieces of a segment.
func (service *Service) CreateGetOrderLimits2(ctx context.Context, bucket metabase.BucketLocation, segment metabase.Segment) (_ []*pb.AddressedOrderLimit, privateKey storj.PiecePrivateKey, err error) {
defer mon.Task()(&ctx)(&err)
@ -365,17 +365,13 @@ func (service *Service) CreateDeleteOrderLimits(ctx context.Context, bucket meta
return signer.AddressedLimits, signer.PrivateKey, nil
}
// CreateAuditOrderLimits creates the order limits for auditing the pieces of pointer.
func (service *Service) CreateAuditOrderLimits(ctx context.Context, bucket metabase.BucketLocation, pointer *pb.Pointer, skip map[storj.NodeID]bool) (_ []*pb.AddressedOrderLimit, _ storj.PiecePrivateKey, cachedIPsAndPorts map[storj.NodeID]string, err error) {
// CreateAuditOrderLimits creates the order limits for auditing the pieces of a segment.
func (service *Service) CreateAuditOrderLimits(ctx context.Context, bucket metabase.BucketLocation, segment metabase.Segment, skip map[storj.NodeID]bool) (_ []*pb.AddressedOrderLimit, _ storj.PiecePrivateKey, cachedIPsAndPorts map[storj.NodeID]string, err error) {
defer mon.Task()(&ctx)(&err)
redundancy := pointer.GetRemote().GetRedundancy()
shareSize := redundancy.GetErasureShareSize()
totalPieces := redundancy.GetTotal()
nodeIDs := make([]storj.NodeID, len(pointer.GetRemote().GetRemotePieces()))
for i, piece := range pointer.GetRemote().GetRemotePieces() {
nodeIDs[i] = piece.NodeId
nodeIDs := make([]storj.NodeID, len(segment.Pieces))
for i, piece := range segment.Pieces {
nodeIDs[i] = piece.StorageNode
}
nodes, err := service.overlay.GetOnlineNodesForGetDelete(ctx, nodeIDs)
@ -384,43 +380,43 @@ func (service *Service) CreateAuditOrderLimits(ctx context.Context, bucket metab
return nil, storj.PiecePrivateKey{}, nil, Error.Wrap(err)
}
signer, err := NewSignerAudit(service, pointer.GetRemote().RootPieceId, time.Now(), int64(shareSize), bucket)
signer, err := NewSignerAudit(service, segment.RootPieceID, time.Now(), int64(segment.Redundancy.ShareSize), bucket)
if err != nil {
return nil, storj.PiecePrivateKey{}, nil, Error.Wrap(err)
}
cachedIPsAndPorts = make(map[storj.NodeID]string)
var nodeErrors errs.Group
var limitsCount int32
limits := make([]*pb.AddressedOrderLimit, totalPieces)
for _, piece := range pointer.GetRemote().GetRemotePieces() {
if skip[piece.NodeId] {
var limitsCount int16
limits := make([]*pb.AddressedOrderLimit, segment.Redundancy.TotalShares)
for _, piece := range segment.Pieces {
if skip[piece.StorageNode] {
continue
}
node, ok := nodes[piece.NodeId]
node, ok := nodes[piece.StorageNode]
if !ok {
nodeErrors.Add(errs.New("node %q is not reliable", piece.NodeId))
nodeErrors.Add(errs.New("node %q is not reliable", piece.StorageNode))
continue
}
address := node.Address.Address
if node.LastIPPort != "" {
cachedIPsAndPorts[piece.NodeId] = node.LastIPPort
cachedIPsAndPorts[piece.StorageNode] = node.LastIPPort
}
limit, err := signer.Sign(ctx, storj.NodeURL{
ID: piece.NodeId,
ID: piece.StorageNode,
Address: address,
}, piece.PieceNum)
}, int32(piece.Number))
if err != nil {
return nil, storj.PiecePrivateKey{}, nil, Error.Wrap(err)
}
limits[piece.GetPieceNum()] = limit
limits[piece.Number] = limit
limitsCount++
}
if limitsCount < redundancy.GetMinReq() {
err = Error.New("not enough nodes available: got %d, required %d", limitsCount, redundancy.GetMinReq())
if limitsCount < segment.Redundancy.RequiredShares {
err = Error.New("not enough nodes available: got %d, required %d", limitsCount, segment.Redundancy.RequiredShares)
return nil, storj.PiecePrivateKey{}, nil, errs.Combine(err, nodeErrors.Err())
}
@ -435,8 +431,8 @@ func (service *Service) CreateAuditOrderLimits(ctx context.Context, bucket metab
return limits, signer.PrivateKey, cachedIPsAndPorts, nil
}
// CreateAuditOrderLimit creates an order limit for auditing a single the piece from a pointer.
func (service *Service) CreateAuditOrderLimit(ctx context.Context, bucket metabase.BucketLocation, nodeID storj.NodeID, pieceNum int32, rootPieceID storj.PieceID, shareSize int32) (limit *pb.AddressedOrderLimit, _ storj.PiecePrivateKey, cachedIPAndPort string, err error) {
// CreateAuditOrderLimit creates an order limit for auditing a single the piece from a segment.
func (service *Service) CreateAuditOrderLimit(ctx context.Context, bucket metabase.BucketLocation, nodeID storj.NodeID, pieceNum uint16, rootPieceID storj.PieceID, shareSize int32) (limit *pb.AddressedOrderLimit, _ storj.PiecePrivateKey, cachedIPAndPort string, err error) {
// TODO reduce number of params ?
defer mon.Task()(&ctx)(&err)
@ -462,7 +458,7 @@ func (service *Service) CreateAuditOrderLimit(ctx context.Context, bucket metaba
orderLimit, err := signer.Sign(ctx, storj.NodeURL{
ID: nodeID,
Address: node.Address.Address,
}, pieceNum)
}, int32(pieceNum))
if err != nil {
return nil, storj.PiecePrivateKey{}, "", Error.Wrap(err)
}

View File

@ -31,13 +31,14 @@ func TestOrderLimitsEncryptedMetadata(t *testing.T) {
)
// Setup: Upload an object and create order limits
require.NoError(t, uplinkPeer.Upload(ctx, satellitePeer, bucketName, filePath, testrand.Bytes(5*memory.KiB)))
bucket := metabase.BucketLocation{ProjectID: projectID, BucketName: bucketName}
items, _, err := satellitePeer.Metainfo.Service.List(ctx, metabase.SegmentKey{}, "", true, 10, ^uint32(0))
segments, err := satellitePeer.Metainfo.Metabase.TestingAllSegments(ctx)
require.NoError(t, err)
require.Equal(t, 1, len(items))
pointer, err := satellitePeer.Metainfo.Service.Get(ctx, metabase.SegmentKey(items[0].Path))
require.NoError(t, err)
limits, _, err := satellitePeer.Orders.Service.CreateGetOrderLimits(ctx, bucket, pointer)
require.Equal(t, 1, len(segments))
limits, _, err := satellitePeer.Orders.Service.CreateGetOrderLimits(ctx, bucket, segments[0])
require.NoError(t, err)
require.Equal(t, 2, len(limits))

View File

@ -172,7 +172,7 @@ func TestAuditSuspendExceedGracePeriod(t *testing.T) {
Unknown: storj.NodeIDList{unknownNodeID},
}
auditService := planet.Satellites[0].Audit
_, err := auditService.Reporter.RecordAudits(ctx, report, "")
_, err := auditService.Reporter.RecordAudits(ctx, report)
require.NoError(t, err)
// success and offline nodes should not be disqualified
@ -228,7 +228,7 @@ func TestAuditSuspendDQDisabled(t *testing.T) {
Unknown: storj.NodeIDList{unknownNodeID},
}
auditService := planet.Satellites[0].Audit
_, err := auditService.Reporter.RecordAudits(ctx, report, "")
_, err := auditService.Reporter.RecordAudits(ctx, report)
require.NoError(t, err)
// successful node should not be suspended or disqualified

View File

@ -454,15 +454,15 @@ func TestRemoveExpiredSegmentFromQueue(t *testing.T) {
satellite.Audit.Chore.Loop.TriggerWait()
queue := satellite.Audit.Queues.Fetch()
require.EqualValues(t, queue.Size(), 1)
encryptedPath, err := queue.Next()
queueSegment, err := queue.Next()
require.NoError(t, err)
// replace pointer with one that is already expired
pointer := &pb.Pointer{}
pointer.ExpirationDate = time.Now().Add(-time.Hour)
err = satellite.Metainfo.Service.UnsynchronizedDelete(ctx, metabase.SegmentKey(encryptedPath))
err = satellite.Metainfo.Service.UnsynchronizedDelete(ctx, queueSegment.Encode())
require.NoError(t, err)
err = satellite.Metainfo.Service.UnsynchronizedPut(ctx, metabase.SegmentKey(encryptedPath), pointer)
err = satellite.Metainfo.Service.UnsynchronizedPut(ctx, queueSegment.Encode(), pointer)
require.NoError(t, err)
// Verify that the segment is on the repair queue

View File

@ -14,6 +14,7 @@ import (
"storj.io/common/pb"
"storj.io/common/storj"
"storj.io/storj/satellite/audit"
"storj.io/storj/satellite/metainfo/metabase"
"storj.io/storj/satellite/satellitedb/dbx"
)
@ -51,13 +52,13 @@ func (containment *containment) IncrementPending(ctx context.Context, pendingAud
VALUES (?, ?, ?, ?, ?, ?, ?)`,
)
_, err = tx.Tx.ExecContext(ctx, statement, pendingAudit.NodeID.Bytes(), pendingAudit.PieceID.Bytes(), pendingAudit.StripeIndex,
pendingAudit.ShareSize, pendingAudit.ExpectedShareHash, pendingAudit.ReverifyCount, []byte(pendingAudit.Path))
pendingAudit.ShareSize, pendingAudit.ExpectedShareHash, pendingAudit.ReverifyCount, []byte(pendingAudit.Segment.Encode()))
if err != nil {
return err
}
case nil:
if !bytes.Equal(existingAudit.ExpectedShareHash, pendingAudit.ExpectedShareHash) {
containment.db.log.Info("pending audit already exists", zap.String("node id", pendingAudit.NodeID.String()), zap.Binary("segment", []byte(pendingAudit.Path)))
containment.db.log.Info("pending audit already exists", zap.String("node id", pendingAudit.NodeID.String()), zap.ByteString("segment", []byte(pendingAudit.Segment.Encode())))
return nil
}
statement := containment.db.Rebind(
@ -119,14 +120,19 @@ func convertDBPending(ctx context.Context, info *dbx.PendingAudits) (_ *audit.Pe
return nil, audit.ContainError.Wrap(err)
}
segment, err := metabase.ParseSegmentKey(info.Path)
if err != nil {
return nil, audit.ContainError.Wrap(err)
}
pending := &audit.PendingAudit{
NodeID: nodeID,
PieceID: pieceID,
StripeIndex: info.StripeIndex,
StripeIndex: int32(info.StripeIndex),
ShareSize: int32(info.ShareSize),
ExpectedShareHash: info.ExpectedShareHash,
ReverifyCount: int32(info.ReverifyCount),
Path: string(info.Path),
Segment: segment,
}
return pending, nil
}