satellite/audit: move to segmentloop
Change-Id: I10e63a1e4b6b62f5cd3098f5922ad3de1ec5af51
This commit is contained in:
parent
8ce619706b
commit
70e6cdfd06
78
satellite/audit/audit_test.go
Normal file
78
satellite/audit/audit_test.go
Normal file
@ -0,0 +1,78 @@
|
||||
// Copyright (C) 2021 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package audit_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"storj.io/common/memory"
|
||||
"storj.io/common/pb"
|
||||
"storj.io/common/storj"
|
||||
"storj.io/common/testcontext"
|
||||
"storj.io/common/testrand"
|
||||
"storj.io/storj/private/testplanet"
|
||||
"storj.io/storj/satellite/accounting"
|
||||
)
|
||||
|
||||
// TestAuditOrderLimit tests that while auditing, order limits without
|
||||
// specified bucket are counted correctly for storage node audit bandwidth
|
||||
// usage and the storage nodes will be paid for that.
|
||||
func TestAuditOrderLimit(t *testing.T) {
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
satellite := planet.Satellites[0]
|
||||
audits := satellite.Audit
|
||||
|
||||
audits.Worker.Loop.Pause()
|
||||
audits.Chore.Loop.Pause()
|
||||
|
||||
now := time.Now()
|
||||
|
||||
for _, storageNode := range planet.StorageNodes {
|
||||
storageNode.Storage2.Orders.Sender.Pause()
|
||||
}
|
||||
|
||||
ul := planet.Uplinks[0]
|
||||
testData := testrand.Bytes(8 * memory.KiB)
|
||||
|
||||
err := ul.Upload(ctx, satellite, "testbucket", "test/path", testData)
|
||||
require.NoError(t, err)
|
||||
|
||||
audits.Chore.Loop.TriggerWait()
|
||||
queue := audits.Queues.Fetch()
|
||||
queueSegment, err := queue.Next()
|
||||
require.NoError(t, err)
|
||||
require.False(t, queueSegment.StreamID.IsZero())
|
||||
|
||||
_, err = audits.Verifier.Reverify(ctx, queueSegment)
|
||||
require.NoError(t, err)
|
||||
|
||||
report, err := audits.Verifier.Verify(ctx, queueSegment, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.NoError(t, planet.WaitForStorageNodeEndpoints(ctx))
|
||||
|
||||
for _, storageNode := range planet.StorageNodes {
|
||||
storageNode.Storage2.Orders.SendOrders(ctx, now.Add(24*time.Hour))
|
||||
}
|
||||
|
||||
auditSettled := make(map[storj.NodeID]uint64)
|
||||
err = satellite.DB.StoragenodeAccounting().GetBandwidthSince(ctx, time.Time{}, func(c context.Context, sbr *accounting.StoragenodeBandwidthRollup) error {
|
||||
if sbr.Action == uint(pb.PieceAction_GET_AUDIT) {
|
||||
auditSettled[sbr.NodeID] += sbr.Settled
|
||||
}
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
for _, nodeID := range report.Successes {
|
||||
require.NotZero(t, auditSettled[nodeID])
|
||||
}
|
||||
})
|
||||
}
|
@ -11,7 +11,7 @@ import (
|
||||
"go.uber.org/zap"
|
||||
|
||||
"storj.io/common/sync2"
|
||||
"storj.io/storj/satellite/metabase/metaloop"
|
||||
"storj.io/storj/satellite/metabase/segmentloop"
|
||||
)
|
||||
|
||||
// Chore populates reservoirs and the audit queue.
|
||||
@ -23,20 +23,20 @@ type Chore struct {
|
||||
queues *Queues
|
||||
Loop *sync2.Cycle
|
||||
|
||||
metainfoLoop *metaloop.Service
|
||||
config Config
|
||||
segmentLoop *segmentloop.Service
|
||||
config Config
|
||||
}
|
||||
|
||||
// NewChore instantiates Chore.
|
||||
func NewChore(log *zap.Logger, queues *Queues, metaLoop *metaloop.Service, config Config) *Chore {
|
||||
func NewChore(log *zap.Logger, queues *Queues, loop *segmentloop.Service, config Config) *Chore {
|
||||
return &Chore{
|
||||
log: log,
|
||||
rand: rand.New(rand.NewSource(time.Now().Unix())),
|
||||
queues: queues,
|
||||
Loop: sync2.NewCycle(config.ChoreInterval),
|
||||
|
||||
metainfoLoop: metaLoop,
|
||||
config: config,
|
||||
segmentLoop: loop,
|
||||
config: config,
|
||||
}
|
||||
}
|
||||
|
||||
@ -53,9 +53,9 @@ func (chore *Chore) Run(ctx context.Context) (err error) {
|
||||
}
|
||||
|
||||
collector := NewCollector(chore.config.Slots, chore.rand)
|
||||
err = chore.metainfoLoop.Join(ctx, collector)
|
||||
err = chore.segmentLoop.Join(ctx, collector)
|
||||
if err != nil {
|
||||
chore.log.Error("error joining metainfoloop", zap.Error(err))
|
||||
chore.log.Error("error joining segmentloop", zap.Error(err))
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -8,12 +8,12 @@ import (
|
||||
"math/rand"
|
||||
|
||||
"storj.io/common/storj"
|
||||
"storj.io/storj/satellite/metabase/metaloop"
|
||||
"storj.io/storj/satellite/metabase/segmentloop"
|
||||
)
|
||||
|
||||
var _ metaloop.Observer = (*Collector)(nil)
|
||||
var _ segmentloop.Observer = (*Collector)(nil)
|
||||
|
||||
// Collector uses the metainfo loop to add segments to node reservoirs.
|
||||
// Collector uses the segment loop to add segments to node reservoirs.
|
||||
type Collector struct {
|
||||
Reservoirs map[storj.NodeID]*Reservoir
|
||||
slotCount int
|
||||
@ -30,12 +30,12 @@ func NewCollector(reservoirSlots int, r *rand.Rand) *Collector {
|
||||
}
|
||||
|
||||
// LoopStarted is called at each start of a loop.
|
||||
func (collector *Collector) LoopStarted(context.Context, metaloop.LoopInfo) (err error) {
|
||||
func (collector *Collector) LoopStarted(context.Context, segmentloop.LoopInfo) (err error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// RemoteSegment takes a remote segment found in metainfo and creates a reservoir for it if it doesn't exist already.
|
||||
func (collector *Collector) RemoteSegment(ctx context.Context, segment *metaloop.Segment) (err error) {
|
||||
func (collector *Collector) RemoteSegment(ctx context.Context, segment *segmentloop.Segment) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
for _, piece := range segment.Pieces {
|
||||
@ -47,12 +47,7 @@ func (collector *Collector) RemoteSegment(ctx context.Context, segment *metaloop
|
||||
return nil
|
||||
}
|
||||
|
||||
// Object returns nil because the audit service does not interact with objects.
|
||||
func (collector *Collector) Object(ctx context.Context, object *metaloop.Object) (err error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// InlineSegment returns nil because we're only auditing for storage nodes for now.
|
||||
func (collector *Collector) InlineSegment(ctx context.Context, segment *metaloop.Segment) (err error) {
|
||||
func (collector *Collector) InlineSegment(ctx context.Context, segment *segmentloop.Segment) (err error) {
|
||||
return nil
|
||||
}
|
||||
|
@ -51,7 +51,7 @@ func TestAuditCollector(t *testing.T) {
|
||||
|
||||
r := rand.New(rand.NewSource(time.Now().Unix()))
|
||||
observer := audit.NewCollector(4, r)
|
||||
err := satellite.Metainfo.Loop.Join(ctx, observer)
|
||||
err := satellite.Metainfo.SegmentLoop.Join(ctx, observer)
|
||||
require.NoError(t, err)
|
||||
|
||||
for _, node := range planet.StorageNodes {
|
||||
|
@ -98,7 +98,7 @@ func TestGetShareDoesNameLookupIfNecessary(t *testing.T) {
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
orderLimits, privateKey, _, err := testSatellite.Orders.Service.CreateAuditOrderLimits(ctx, queueSegment.Bucket(), segment, nil)
|
||||
orderLimits, privateKey, _, err := testSatellite.Orders.Service.CreateAuditOrderLimits(ctx, segment, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
// find any non-nil limit
|
||||
@ -152,7 +152,7 @@ func TestGetSharePrefers(t *testing.T) {
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
orderLimits, privateKey, _, err := testSatellite.Orders.Service.CreateAuditOrderLimits(ctx, queueSegment.Bucket(), segment, nil)
|
||||
orderLimits, privateKey, _, err := testSatellite.Orders.Service.CreateAuditOrderLimits(ctx, segment, nil)
|
||||
require.NoError(t, err)
|
||||
require.GreaterOrEqual(t, len(orderLimits), 1)
|
||||
|
||||
|
@ -26,7 +26,7 @@ func TestQueues(t *testing.T) {
|
||||
_, err := q.Next()
|
||||
require.True(t, ErrEmptyQueue.Has(err), "required ErrEmptyQueue error")
|
||||
|
||||
testQueue1 := []Segment{testSegment("a"), testSegment("b"), testSegment("c")}
|
||||
testQueue1 := []Segment{testSegment(), testSegment(), testSegment()}
|
||||
err = queues.Push(testQueue1)
|
||||
require.NoError(t, err)
|
||||
err = queues.WaitForSwap(ctx)
|
||||
@ -48,14 +48,14 @@ func TestQueuesPush(t *testing.T) {
|
||||
|
||||
queues := NewQueues()
|
||||
// when next queue is empty, WaitForSwap should return immediately
|
||||
testQueue1 := []Segment{testSegment("a"), testSegment("b"), testSegment("c")}
|
||||
testQueue1 := []Segment{testSegment(), testSegment(), testSegment()}
|
||||
err := queues.Push(testQueue1)
|
||||
require.NoError(t, err)
|
||||
err = queues.WaitForSwap(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
// second call to WaitForSwap should block until Fetch is called the first time
|
||||
testQueue2 := []Segment{testSegment("d"), testSegment("e")}
|
||||
testQueue2 := []Segment{testSegment(), testSegment()}
|
||||
err = queues.Push(testQueue2)
|
||||
require.NoError(t, err)
|
||||
var group errgroup.Group
|
||||
@ -87,14 +87,14 @@ func TestQueuesPushCancel(t *testing.T) {
|
||||
|
||||
queues := NewQueues()
|
||||
// when queue is empty, WaitForSwap should return immediately
|
||||
testQueue1 := []Segment{testSegment("a"), testSegment("b"), testSegment("c")}
|
||||
testQueue1 := []Segment{testSegment(), testSegment(), testSegment()}
|
||||
err := queues.Push(testQueue1)
|
||||
require.NoError(t, err)
|
||||
err = queues.WaitForSwap(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
ctxWithCancel, cancel := context.WithCancel(ctx)
|
||||
testQueue2 := []Segment{testSegment("d"), testSegment("e")}
|
||||
testQueue2 := []Segment{testSegment(), testSegment()}
|
||||
err = queues.Push(testQueue2)
|
||||
require.NoError(t, err)
|
||||
var group errgroup.Group
|
||||
@ -114,13 +114,11 @@ func TestQueuesPushCancel(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
func testSegment(objectKey string) Segment {
|
||||
func testSegment() Segment {
|
||||
return Segment{
|
||||
SegmentLocation: metabase.SegmentLocation{
|
||||
ProjectID: testrand.UUID(),
|
||||
BucketName: "test",
|
||||
ObjectKey: metabase.ObjectKey(objectKey),
|
||||
},
|
||||
StreamID: testrand.UUID(),
|
||||
Position: metabase.SegmentPosition{
|
||||
Index: uint32(testrand.Intn(100)),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
@ -9,7 +9,7 @@ import (
|
||||
|
||||
"storj.io/common/uuid"
|
||||
"storj.io/storj/satellite/metabase"
|
||||
"storj.io/storj/satellite/metabase/metaloop"
|
||||
"storj.io/storj/satellite/metabase/segmentloop"
|
||||
)
|
||||
|
||||
const maxReservoirSize = 3
|
||||
@ -50,21 +50,21 @@ func (reservoir *Reservoir) Sample(r *rand.Rand, segment Segment) {
|
||||
|
||||
// Segment is a segment to audit.
|
||||
type Segment struct {
|
||||
metabase.SegmentLocation
|
||||
StreamID uuid.UUID
|
||||
ExpirationDate time.Time
|
||||
StreamID uuid.UUID
|
||||
Position metabase.SegmentPosition
|
||||
ExpiresAt *time.Time
|
||||
}
|
||||
|
||||
// NewSegment creates a new segment to audit from a metainfo loop segment.
|
||||
func NewSegment(loopSegment *metaloop.Segment) Segment {
|
||||
func NewSegment(loopSegment *segmentloop.Segment) Segment {
|
||||
return Segment{
|
||||
SegmentLocation: loopSegment.Location,
|
||||
StreamID: loopSegment.StreamID,
|
||||
ExpirationDate: loopSegment.ExpirationDate,
|
||||
StreamID: loopSegment.StreamID,
|
||||
Position: loopSegment.Position,
|
||||
ExpiresAt: loopSegment.ExpiresAt,
|
||||
}
|
||||
}
|
||||
|
||||
// Expired checks if segment is expired relative to now.
|
||||
func (segment *Segment) Expired(now time.Time) bool {
|
||||
return !segment.ExpirationDate.IsZero() && segment.ExpirationDate.Before(now)
|
||||
return segment.ExpiresAt != nil && segment.ExpiresAt.Before(now)
|
||||
}
|
||||
|
@ -72,7 +72,7 @@ func TestReverifySuccess(t *testing.T) {
|
||||
pieces := segment.Pieces
|
||||
rootPieceID := segment.RootPieceID
|
||||
|
||||
limit, privateKey, cachedIPAndPort, err := orders.CreateAuditOrderLimit(ctx, queueSegment.Bucket(), pieces[0].StorageNode, pieces[0].Number, rootPieceID, shareSize)
|
||||
limit, privateKey, cachedIPAndPort, err := orders.CreateAuditOrderLimit(ctx, pieces[0].StorageNode, pieces[0].Number, rootPieceID, shareSize)
|
||||
require.NoError(t, err)
|
||||
|
||||
share, err := audits.Verifier.GetShare(ctx, limit, privateKey, cachedIPAndPort, randomIndex, shareSize, int(pieces[0].Number))
|
||||
@ -151,7 +151,7 @@ func TestReverifyFailMissingShare(t *testing.T) {
|
||||
pieces := segment.Pieces
|
||||
rootPieceID := segment.RootPieceID
|
||||
|
||||
limit, privateKey, cachedIPAndPort, err := orders.CreateAuditOrderLimit(ctx, queueSegment.Bucket(), pieces[0].StorageNode, pieces[0].Number, rootPieceID, shareSize)
|
||||
limit, privateKey, cachedIPAndPort, err := orders.CreateAuditOrderLimit(ctx, pieces[0].StorageNode, pieces[0].Number, rootPieceID, shareSize)
|
||||
require.NoError(t, err)
|
||||
|
||||
share, err := audits.Verifier.GetShare(ctx, limit, privateKey, cachedIPAndPort, randomIndex, shareSize, int(pieces[0].Number))
|
||||
@ -796,7 +796,7 @@ func TestReverifyDifferentShare(t *testing.T) {
|
||||
shareSize := segment1.Redundancy.ShareSize
|
||||
rootPieceID := segment1.RootPieceID
|
||||
|
||||
limit, privateKey, cachedIPAndPort, err := orders.CreateAuditOrderLimit(ctx, queueSegment1.Bucket(), selectedNode, selectedPieceNum, rootPieceID, shareSize)
|
||||
limit, privateKey, cachedIPAndPort, err := orders.CreateAuditOrderLimit(ctx, selectedNode, selectedPieceNum, rootPieceID, shareSize)
|
||||
require.NoError(t, err)
|
||||
|
||||
share, err := audits.Verifier.GetShare(ctx, limit, privateKey, cachedIPAndPort, randomIndex, shareSize, int(selectedPieceNum))
|
||||
@ -912,7 +912,7 @@ func TestReverifyExpired2(t *testing.T) {
|
||||
require.NotEqual(t, queueSegment1, queueSegment2)
|
||||
|
||||
// make sure queueSegment1 is the one with the expiration date
|
||||
if queueSegment1.ExpirationDate.IsZero() {
|
||||
if queueSegment1.ExpiresAt == nil {
|
||||
queueSegment1, queueSegment2 = queueSegment2, queueSegment1
|
||||
}
|
||||
|
||||
@ -955,7 +955,7 @@ func TestReverifyExpired2(t *testing.T) {
|
||||
shareSize := segment1.Redundancy.ShareSize
|
||||
rootPieceID := segment1.RootPieceID
|
||||
|
||||
limit, privateKey, cachedIPAndPort, err := orders.CreateAuditOrderLimit(ctx, queueSegment1.Bucket(), selectedNode, selectedPieceNum, rootPieceID, shareSize)
|
||||
limit, privateKey, cachedIPAndPort, err := orders.CreateAuditOrderLimit(ctx, selectedNode, selectedPieceNum, rootPieceID, shareSize)
|
||||
require.NoError(t, err)
|
||||
|
||||
share, err := audits.Verifier.GetShare(ctx, limit, privateKey, cachedIPAndPort, randomIndex, shareSize, int(selectedPieceNum))
|
||||
@ -1051,7 +1051,7 @@ func TestReverifySlowDownload(t *testing.T) {
|
||||
shareSize := segment.Redundancy.ShareSize
|
||||
rootPieceID := segment.RootPieceID
|
||||
|
||||
limit, privateKey, cachedIPAndPort, err := orders.CreateAuditOrderLimit(ctx, queueSegment.Bucket(), slowNode, slowPiece.Number, rootPieceID, shareSize)
|
||||
limit, privateKey, cachedIPAndPort, err := orders.CreateAuditOrderLimit(ctx, slowNode, slowPiece.Number, rootPieceID, shareSize)
|
||||
require.NoError(t, err)
|
||||
|
||||
share, err := audits.Verifier.GetShare(ctx, limit, privateKey, cachedIPAndPort, randomIndex, shareSize, int(slowPiece.Number))
|
||||
@ -1140,7 +1140,7 @@ func TestReverifyUnknownError(t *testing.T) {
|
||||
shareSize := segment.Redundancy.ShareSize
|
||||
rootPieceID := segment.RootPieceID
|
||||
|
||||
limit, privateKey, cachedIPAndPort, err := orders.CreateAuditOrderLimit(ctx, queueSegment.Bucket(), badNode, badPiece.Number, rootPieceID, shareSize)
|
||||
limit, privateKey, cachedIPAndPort, err := orders.CreateAuditOrderLimit(ctx, badNode, badPiece.Number, rootPieceID, shareSize)
|
||||
require.NoError(t, err)
|
||||
|
||||
share, err := audits.Verifier.GetShare(ctx, limit, privateKey, cachedIPAndPort, randomIndex, shareSize, int(badPiece.Number))
|
||||
|
@ -115,7 +115,7 @@ func (verifier *Verifier) Verify(ctx context.Context, segment Segment, skip map[
|
||||
containedNodes := make(map[int]storj.NodeID)
|
||||
sharesToAudit := make(map[int]Share)
|
||||
|
||||
orderLimits, privateKey, cachedIPsAndPorts, err := verifier.orders.CreateAuditOrderLimits(ctx, segment.Bucket(), segmentInfo, skip)
|
||||
orderLimits, privateKey, cachedIPsAndPorts, err := verifier.orders.CreateAuditOrderLimits(ctx, segmentInfo, skip)
|
||||
if err != nil {
|
||||
return Report{}, err
|
||||
}
|
||||
@ -310,10 +310,7 @@ func (verifier *Verifier) Verify(ctx context.Context, segment Segment, skip map[
|
||||
}
|
||||
|
||||
func segmentInfoString(segment Segment) string {
|
||||
return fmt.Sprintf("%s/%s/%x/%s/%d",
|
||||
segment.ProjectID.String(),
|
||||
segment.BucketName,
|
||||
segment.ObjectKey,
|
||||
return fmt.Sprintf("%s/%d",
|
||||
segment.StreamID.String(),
|
||||
segment.Position.Encode(),
|
||||
)
|
||||
@ -462,8 +459,7 @@ func (verifier *Verifier) Reverify(ctx context.Context, segment Segment) (report
|
||||
return
|
||||
}
|
||||
|
||||
// TODO verify what with empty bucket
|
||||
limit, piecePrivateKey, cachedIPAndPort, err := verifier.orders.CreateAuditOrderLimit(ctx, metabase.BucketLocation{}, pending.NodeID, pieceNum, pending.PieceID, pending.ShareSize)
|
||||
limit, piecePrivateKey, cachedIPAndPort, err := verifier.orders.CreateAuditOrderLimit(ctx, pending.NodeID, pieceNum, pending.PieceID, pending.ShareSize)
|
||||
if err != nil {
|
||||
if overlay.ErrNodeDisqualified.Has(err) {
|
||||
ch <- result{nodeID: pending.NodeID, status: skipped, release: true}
|
||||
|
@ -64,7 +64,7 @@ func TestDownloadSharesHappyPath(t *testing.T) {
|
||||
|
||||
shareSize := segment.Redundancy.ShareSize
|
||||
|
||||
limits, privateKey, cachedIPsAndPorts, err := satellite.Orders.Service.CreateAuditOrderLimits(ctx, queueSegment.Bucket(), segment, nil)
|
||||
limits, privateKey, cachedIPsAndPorts, err := satellite.Orders.Service.CreateAuditOrderLimits(ctx, segment, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
shares, err := audits.Verifier.DownloadShares(ctx, limits, privateKey, cachedIPsAndPorts, randomIndex, shareSize)
|
||||
@ -117,7 +117,7 @@ func TestDownloadSharesOfflineNode(t *testing.T) {
|
||||
|
||||
shareSize := segment.Redundancy.ShareSize
|
||||
|
||||
limits, privateKey, cachedIPsAndPorts, err := satellite.Orders.Service.CreateAuditOrderLimits(ctx, queueSegment.Bucket(), segment, nil)
|
||||
limits, privateKey, cachedIPsAndPorts, err := satellite.Orders.Service.CreateAuditOrderLimits(ctx, segment, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
// stop the first node in the segment
|
||||
@ -182,7 +182,7 @@ func TestDownloadSharesMissingPiece(t *testing.T) {
|
||||
|
||||
shareSize := segment.Redundancy.ShareSize
|
||||
|
||||
limits, privateKey, cachedIPsAndPorts, err := satellite.Orders.Service.CreateAuditOrderLimits(ctx, queueSegment.Bucket(), segment, nil)
|
||||
limits, privateKey, cachedIPsAndPorts, err := satellite.Orders.Service.CreateAuditOrderLimits(ctx, segment, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
shares, err := audits.Verifier.DownloadShares(ctx, limits, privateKey, cachedIPsAndPorts, randomIndex, shareSize)
|
||||
@ -261,7 +261,7 @@ func TestDownloadSharesDialTimeout(t *testing.T) {
|
||||
|
||||
shareSize := segment.Redundancy.ShareSize
|
||||
|
||||
limits, privateKey, cachedIPsAndPorts, err := satellite.Orders.Service.CreateAuditOrderLimits(ctx, queueSegment.Bucket(), segment, nil)
|
||||
limits, privateKey, cachedIPsAndPorts, err := satellite.Orders.Service.CreateAuditOrderLimits(ctx, segment, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
shares, err := verifier.DownloadShares(ctx, limits, privateKey, cachedIPsAndPorts, randomIndex, shareSize)
|
||||
@ -336,7 +336,7 @@ func TestDownloadSharesDownloadTimeout(t *testing.T) {
|
||||
|
||||
shareSize := segment.Redundancy.ShareSize
|
||||
|
||||
limits, privateKey, cachedIPsAndPorts, err := satellite.Orders.Service.CreateAuditOrderLimits(ctx, queueSegment.Bucket(), segment, nil)
|
||||
limits, privateKey, cachedIPsAndPorts, err := satellite.Orders.Service.CreateAuditOrderLimits(ctx, segment, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
// make downloads on storage node slower than the timeout on the satellite for downloading shares
|
||||
|
@ -129,7 +129,7 @@ func TestCreatePendingAudits(t *testing.T) {
|
||||
contained := make(map[int]storj.NodeID)
|
||||
contained[1] = testNodeID
|
||||
|
||||
segment := testSegment("test")
|
||||
segment := testSegment()
|
||||
segmentInfo := metabase.Segment{
|
||||
StreamID: segment.StreamID,
|
||||
RootPieceID: testrand.PieceID(),
|
||||
|
@ -102,7 +102,10 @@ func (worker *Worker) process(ctx context.Context) (err error) {
|
||||
worker.limiter.Go(ctx, func() {
|
||||
err := worker.work(ctx, segment)
|
||||
if err != nil {
|
||||
worker.log.Error("audit failed", zap.ByteString("Segment", []byte(segment.Encode())), zap.Error(err))
|
||||
worker.log.Error("audit failed",
|
||||
zap.String("Segment StreamID", segment.StreamID.String()),
|
||||
zap.Uint64("Segment Position", segment.Position.Encode()),
|
||||
zap.Error(err))
|
||||
}
|
||||
})
|
||||
}
|
||||
|
@ -327,7 +327,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB,
|
||||
|
||||
peer.Audit.Chore = audit.NewChore(peer.Log.Named("audit:chore"),
|
||||
peer.Audit.Queues,
|
||||
peer.Metainfo.Loop,
|
||||
peer.Metainfo.SegmentLoop,
|
||||
config,
|
||||
)
|
||||
peer.Services.Add(lifecycle.Item{
|
||||
|
@ -226,7 +226,7 @@ func (service *Service) CreatePutOrderLimits(ctx context.Context, bucket metabas
|
||||
}
|
||||
|
||||
// CreateAuditOrderLimits creates the order limits for auditing the pieces of a segment.
|
||||
func (service *Service) CreateAuditOrderLimits(ctx context.Context, bucket metabase.BucketLocation, segment metabase.Segment, skip map[storj.NodeID]bool) (_ []*pb.AddressedOrderLimit, _ storj.PiecePrivateKey, cachedIPsAndPorts map[storj.NodeID]string, err error) {
|
||||
func (service *Service) CreateAuditOrderLimits(ctx context.Context, segment metabase.Segment, skip map[storj.NodeID]bool) (_ []*pb.AddressedOrderLimit, _ storj.PiecePrivateKey, cachedIPsAndPorts map[storj.NodeID]string, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
nodeIDs := make([]storj.NodeID, len(segment.Pieces))
|
||||
@ -240,6 +240,7 @@ func (service *Service) CreateAuditOrderLimits(ctx context.Context, bucket metab
|
||||
return nil, storj.PiecePrivateKey{}, nil, Error.Wrap(err)
|
||||
}
|
||||
|
||||
bucket := metabase.BucketLocation{}
|
||||
signer, err := NewSignerAudit(service, segment.RootPieceID, time.Now(), int64(segment.Redundancy.ShareSize), bucket)
|
||||
if err != nil {
|
||||
return nil, storj.PiecePrivateKey{}, nil, Error.Wrap(err)
|
||||
@ -279,15 +280,12 @@ func (service *Service) CreateAuditOrderLimits(ctx context.Context, bucket metab
|
||||
err = Error.New("not enough nodes available: got %d, required %d", limitsCount, segment.Redundancy.RequiredShares)
|
||||
return nil, storj.PiecePrivateKey{}, nil, errs.Combine(err, nodeErrors.Err())
|
||||
}
|
||||
if err := service.updateBandwidth(ctx, bucket, limits...); err != nil {
|
||||
return nil, storj.PiecePrivateKey{}, nil, Error.Wrap(err)
|
||||
}
|
||||
|
||||
return limits, signer.PrivateKey, cachedIPsAndPorts, nil
|
||||
}
|
||||
|
||||
// CreateAuditOrderLimit creates an order limit for auditing a single the piece from a segment.
|
||||
func (service *Service) CreateAuditOrderLimit(ctx context.Context, bucket metabase.BucketLocation, nodeID storj.NodeID, pieceNum uint16, rootPieceID storj.PieceID, shareSize int32) (limit *pb.AddressedOrderLimit, _ storj.PiecePrivateKey, cachedIPAndPort string, err error) {
|
||||
func (service *Service) CreateAuditOrderLimit(ctx context.Context, nodeID storj.NodeID, pieceNum uint16, rootPieceID storj.PieceID, shareSize int32) (limit *pb.AddressedOrderLimit, _ storj.PiecePrivateKey, cachedIPAndPort string, err error) {
|
||||
// TODO reduce number of params ?
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
@ -305,7 +303,7 @@ func (service *Service) CreateAuditOrderLimit(ctx context.Context, bucket metaba
|
||||
return nil, storj.PiecePrivateKey{}, "", overlay.ErrNodeOffline.New("%v", nodeID)
|
||||
}
|
||||
|
||||
signer, err := NewSignerAudit(service, rootPieceID, time.Now(), int64(shareSize), bucket)
|
||||
signer, err := NewSignerAudit(service, rootPieceID, time.Now(), int64(shareSize), metabase.BucketLocation{})
|
||||
if err != nil {
|
||||
return nil, storj.PiecePrivateKey{}, "", Error.Wrap(err)
|
||||
}
|
||||
@ -318,10 +316,6 @@ func (service *Service) CreateAuditOrderLimit(ctx context.Context, bucket metaba
|
||||
return nil, storj.PiecePrivateKey{}, "", Error.Wrap(err)
|
||||
}
|
||||
|
||||
if err := service.updateBandwidth(ctx, bucket, limit); err != nil {
|
||||
return nil, storj.PiecePrivateKey{}, "", Error.Wrap(err)
|
||||
}
|
||||
|
||||
return orderLimit, signer.PrivateKey, node.LastIPPort, nil
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user