diff --git a/satellite/accounting/tally/tally.go b/satellite/accounting/tally/tally.go index c9ee70c55..6fb58b73d 100644 --- a/satellite/accounting/tally/tally.go +++ b/satellite/accounting/tally/tally.go @@ -251,6 +251,7 @@ func (observer *Observer) Object(ctx context.Context, object *metainfo.Object) ( } bucket := observer.ensureBucket(ctx, object.Location) + bucket.MetadataSize += int64(object.MetadataSize) bucket.ObjectCount++ return nil @@ -265,7 +266,6 @@ func (observer *Observer) InlineSegment(ctx context.Context, segment *metainfo.S bucket := observer.ensureBucket(ctx, segment.Location.Object()) bucket.InlineSegments++ bucket.InlineBytes += int64(segment.DataSize) - bucket.MetadataSize += int64(segment.MetadataSize) return nil } @@ -279,7 +279,6 @@ func (observer *Observer) RemoteSegment(ctx context.Context, segment *metainfo.S bucket := observer.ensureBucket(ctx, segment.Location.Object()) bucket.RemoteSegments++ bucket.RemoteBytes += int64(segment.DataSize) - bucket.MetadataSize += int64(segment.MetadataSize) // add node info minimumRequired := segment.Redundancy.RequiredShares diff --git a/satellite/accounting/tally/tally_test.go b/satellite/accounting/tally/tally_test.go index 375cdfd7d..25b4b932c 100644 --- a/satellite/accounting/tally/tally_test.go +++ b/satellite/accounting/tally/tally_test.go @@ -13,7 +13,6 @@ import ( "storj.io/common/encryption" "storj.io/common/memory" - "storj.io/common/pb" "storj.io/common/storj" "storj.io/common/testcontext" "storj.io/common/testrand" @@ -159,63 +158,70 @@ func TestCalculateNodeAtRestData(t *testing.T) { } func TestCalculateBucketAtRestData(t *testing.T) { - var testCases = []struct { - name string - project string - segmentIndex uint32 - bucketName string - objectName string - inline bool - last bool - }{ - {"inline, same project, same bucket", "9656af6e-2d9c-42fa-91f2-bfd516a722d7", metabase.LastSegmentIndex, "mockBucketName", "mockObjectName", true, true}, - {"remote, same project, same bucket", "9656af6e-2d9c-42fa-91f2-bfd516a722d7", 0, "mockBucketName", "mockObjectName1", false, false}, - {"last segment, same project, different bucket", "9656af6e-2d9c-42fa-91f2-bfd516a722d7", metabase.LastSegmentIndex, "mockBucketName1", "mockObjectName2", false, true}, - {"different project", "9656af6e-2d9c-42fa-91f2-bfd516a722d1", 0, "mockBucketName", "mockObjectName", false, false}, - } - testplanet.Run(t, testplanet.Config{ - SatelliteCount: 1, StorageNodeCount: 6, UplinkCount: 1, + SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 2, + Reconfigure: testplanet.Reconfigure{ + Satellite: testplanet.Combine( + testplanet.ReconfigureRS(2, 3, 4, 4), + testplanet.MaxSegmentSize(20*memory.KiB), + ), + }, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { - satellitePeer := planet.Satellites[0] - redundancyScheme := satelliteRS(t, satellitePeer) - expectedBucketTallies := make(map[metabase.BucketLocation]*accounting.BucketTally) - for _, tt := range testCases { - tt := tt // avoid scopelint error + satellite := planet.Satellites[0] - t.Run(tt.name, func(t *testing.T) { - projectID, err := uuid.FromString(tt.project) - require.NoError(t, err) + err := planet.Uplinks[0].Upload(ctx, satellite, "alpha", "inline", make([]byte, 10*memory.KiB)) + require.NoError(t, err) - // setup: create a pointer and save it to pointerDB - pointer, err := makePointer(planet.StorageNodes, redundancyScheme, int64(20), tt.inline) - require.NoError(t, err) + err = planet.Uplinks[0].Upload(ctx, satellite, "alpha", "remote", make([]byte, 30*memory.KiB)) + require.NoError(t, err) - metainfo := satellitePeer.Metainfo.Service - location := metabase.SegmentLocation{ - ProjectID: projectID, - BucketName: tt.bucketName, - Position: metabase.SegmentPosition{Index: tt.segmentIndex}, - ObjectKey: metabase.ObjectKey(tt.objectName), - } - err = metainfo.Put(ctx, location.Encode(), pointer) - require.NoError(t, err) + err = planet.Uplinks[0].Upload(ctx, satellite, "beta", "remote", make([]byte, 30*memory.KiB)) + require.NoError(t, err) - bucketLocation := metabase.BucketLocation{ - ProjectID: projectID, - BucketName: tt.bucketName, - } - newTally := addBucketTally(expectedBucketTallies[bucketLocation], tt.inline, tt.last) - newTally.BucketName = tt.bucketName - newTally.ProjectID = projectID - expectedBucketTallies[bucketLocation] = newTally + err = planet.Uplinks[1].Upload(ctx, satellite, "alpha", "remote", make([]byte, 30*memory.KiB)) + require.NoError(t, err) - obs := tally.NewObserver(satellitePeer.Log.Named("observer"), time.Now()) - err = satellitePeer.Metainfo.Loop.Join(ctx, obs) - require.NoError(t, err) - require.Equal(t, expectedBucketTallies, obs.Bucket) - }) + objects, err := satellite.Metainfo.Metabase.TestingAllObjects(ctx) + require.NoError(t, err) + + segments, err := satellite.Metainfo.Metabase.TestingAllSegments(ctx) + require.NoError(t, err) + + expectedTotal := map[metabase.BucketLocation]*accounting.BucketTally{} + ensure := func(loc metabase.BucketLocation) *accounting.BucketTally { + if t, ok := expectedTotal[loc]; ok { + return t + } + t := &accounting.BucketTally{BucketLocation: loc} + expectedTotal[loc] = t + return t } + + streamLocation := map[uuid.UUID]metabase.BucketLocation{} + for _, object := range objects { + loc := object.Location().Bucket() + streamLocation[object.StreamID] = loc + t := ensure(loc) + t.ObjectCount++ + t.MetadataSize += int64(len(object.EncryptedMetadata)) + } + for _, segment := range segments { + loc := streamLocation[segment.StreamID] + t := ensure(loc) + if len(segment.Pieces) > 0 { + t.RemoteSegments++ + t.RemoteBytes += int64(segment.EncryptedSize) + } else { + t.InlineSegments++ + t.InlineBytes += int64(segment.EncryptedSize) + } + } + require.Len(t, expectedTotal, 3) + + obs := tally.NewObserver(satellite.Log.Named("observer"), time.Now()) + err = satellite.Metainfo.Loop.Join(ctx, obs) + require.NoError(t, err) + require.Equal(t, expectedTotal, obs.Bucket) }) } @@ -223,31 +229,14 @@ func TestTallyIgnoresExpiredPointers(t *testing.T) { testplanet.Run(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 6, UplinkCount: 1, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { - satellitePeer := planet.Satellites[0] - redundancyScheme := satelliteRS(t, satellitePeer) + satellite := planet.Satellites[0] - projectID, err := uuid.FromString("9656af6e-2d9c-42fa-91f2-bfd516a722d7") - require.NoError(t, err) - bucket := "bucket" - - // setup: create an expired pointer and save it to pointerDB - pointer, err := makePointer(planet.StorageNodes, redundancyScheme, int64(2), false) + now := time.Now() + err := planet.Uplinks[0].UploadWithExpiration(ctx, planet.Satellites[0], "bucket", "path", []byte{1}, now.Add(12*time.Hour)) require.NoError(t, err) - pointer.ExpirationDate = time.Now().Add(-24 * time.Hour) - - metainfo := satellitePeer.Metainfo.Service - location := metabase.SegmentLocation{ - ProjectID: projectID, - BucketName: bucket, - Position: metabase.SegmentPosition{Index: metabase.LastSegmentIndex}, - ObjectKey: metabase.ObjectKey("object/name"), - } - err = metainfo.Put(ctx, location.Encode(), pointer) - require.NoError(t, err) - - obs := tally.NewObserver(satellitePeer.Log.Named("observer"), time.Now()) - err = satellitePeer.Metainfo.Loop.Join(ctx, obs) + obs := tally.NewObserver(satellite.Log.Named("observer"), now.Add(24*time.Hour)) + err = satellite.Metainfo.Loop.Join(ctx, obs) require.NoError(t, err) // there should be no observed buckets because all of the pointers are expired @@ -268,15 +257,11 @@ func TestTallyLiveAccounting(t *testing.T) { err := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "testbucket", "test/path", expectedData) require.NoError(t, err) - key, err := planet.Satellites[0].Metainfo.Database.List(ctx, nil, 10) + segments, err := planet.Satellites[0].Metainfo.Metabase.TestingAllSegments(ctx) require.NoError(t, err) - require.Len(t, key, 1) + require.Len(t, segments, 1) - ptr, err := planet.Satellites[0].Metainfo.Service.Get(ctx, metabase.SegmentKey(key[0])) - require.NoError(t, err) - require.NotNil(t, ptr) - - segmentSize := ptr.GetSegmentSize() + segmentSize := int64(segments[0].EncryptedSize) tally.Loop.TriggerWait() @@ -337,88 +322,6 @@ func TestTallyEmptyProjectUpdatesLiveAccounting(t *testing.T) { }) } -// addBucketTally creates a new expected bucket tally based on the -// pointer that was just created for the test case. -func addBucketTally(existingTally *accounting.BucketTally, inline, last bool) *accounting.BucketTally { - // if there is already an existing tally for this project and bucket, then - // add the new pointer data to the existing tally - if existingTally != nil { - existingTally.MetadataSize += int64(2) - existingTally.RemoteSegments++ - existingTally.RemoteBytes += int64(20) - return existingTally - } - - // if the pointer was inline, create a tally with inline info - if inline { - return &accounting.BucketTally{ - ObjectCount: int64(1), - InlineSegments: int64(1), - InlineBytes: int64(20), - MetadataSize: int64(2), - } - } - - // if the pointer was remote, create a tally with remote info - newRemoteTally := &accounting.BucketTally{ - RemoteSegments: int64(1), - RemoteBytes: int64(20), - MetadataSize: int64(2), - } - - if last { - newRemoteTally.ObjectCount++ - } - - return newRemoteTally -} - -// makePointer creates a pointer. -func makePointer(storageNodes []*testplanet.StorageNode, rs storj.RedundancyScheme, segmentSize int64, inline bool) (*pb.Pointer, error) { - metadata, err := pb.Marshal(&pb.StreamMeta{NumberOfSegments: 1}) - if err != nil { - return nil, err - } - - if inline { - inlinePointer := &pb.Pointer{ - CreationDate: time.Now(), - Type: pb.Pointer_INLINE, - InlineSegment: make([]byte, segmentSize), - SegmentSize: segmentSize, - Metadata: metadata, - } - return inlinePointer, nil - } - - pieces := make([]*pb.RemotePiece, rs.TotalShares) - for i := range pieces { - pieces[i] = &pb.RemotePiece{ - PieceNum: int32(i), - NodeId: storageNodes[i].ID(), - } - } - - return &pb.Pointer{ - CreationDate: time.Now(), - Type: pb.Pointer_REMOTE, - Remote: &pb.RemoteSegment{ - RootPieceId: storj.PieceID{0xFF}, - Redundancy: &pb.RedundancyScheme{ - Type: pb.RedundancyScheme_RS, - MinReq: int32(rs.RequiredShares), - Total: int32(rs.TotalShares), - RepairThreshold: int32(rs.RepairShares), - SuccessThreshold: int32(rs.OptimalShares), - ErasureShareSize: rs.ShareSize, - }, - RemotePieces: pieces, - }, - SegmentSize: segmentSize, - Metadata: metadata, - }, nil -} - func correctRedundencyScheme(shareCount int, uplinkRS storj.RedundancyScheme) bool { // The shareCount should be a value between RequiredShares and TotalShares where // RequiredShares is the min number of shares required to recover a segment and diff --git a/satellite/metainfo/loop.go b/satellite/metainfo/loop.go index 8997110d8..efba18cae 100644 --- a/satellite/metainfo/loop.go +++ b/satellite/metainfo/loop.go @@ -29,6 +29,7 @@ type Object struct { Location metabase.ObjectLocation // tally SegmentCount int // metrics LastSegment *Segment // metrics + MetadataSize int // tally expirationDate time.Time // tally } @@ -42,7 +43,6 @@ type Segment struct { Location metabase.SegmentLocation // tally, repair, graceful exit, audit StreamID uuid.UUID // audit DataSize int // tally, graceful exit - MetadataSize int // tally Inline bool // metrics Redundancy storj.RedundancyScheme // tally, graceful exit, repair RootPieceID storj.PieceID // gc, graceful exit @@ -435,6 +435,7 @@ func handleObject(ctx context.Context, observer *observerContext, location metab if observer.HandleError(observer.Object(ctx, &Object{ Location: location, SegmentCount: int(object.SegmentCount), + MetadataSize: len(object.EncryptedMetadata), expirationDate: expirationDate, LastSegment: &Segment{}, // TODO ideally would be to remove this field })) {