satellite/accounting/tally: use metabase
Change-Id: I6d49dc103a18e8a110bfa7775d53a65d208b6c2c
This commit is contained in:
parent
f7a31308db
commit
65b22be417
@ -251,6 +251,7 @@ func (observer *Observer) Object(ctx context.Context, object *metainfo.Object) (
|
||||
}
|
||||
|
||||
bucket := observer.ensureBucket(ctx, object.Location)
|
||||
bucket.MetadataSize += int64(object.MetadataSize)
|
||||
bucket.ObjectCount++
|
||||
|
||||
return nil
|
||||
@ -265,7 +266,6 @@ func (observer *Observer) InlineSegment(ctx context.Context, segment *metainfo.S
|
||||
bucket := observer.ensureBucket(ctx, segment.Location.Object())
|
||||
bucket.InlineSegments++
|
||||
bucket.InlineBytes += int64(segment.DataSize)
|
||||
bucket.MetadataSize += int64(segment.MetadataSize)
|
||||
|
||||
return nil
|
||||
}
|
||||
@ -279,7 +279,6 @@ func (observer *Observer) RemoteSegment(ctx context.Context, segment *metainfo.S
|
||||
bucket := observer.ensureBucket(ctx, segment.Location.Object())
|
||||
bucket.RemoteSegments++
|
||||
bucket.RemoteBytes += int64(segment.DataSize)
|
||||
bucket.MetadataSize += int64(segment.MetadataSize)
|
||||
|
||||
// add node info
|
||||
minimumRequired := segment.Redundancy.RequiredShares
|
||||
|
@ -13,7 +13,6 @@ import (
|
||||
|
||||
"storj.io/common/encryption"
|
||||
"storj.io/common/memory"
|
||||
"storj.io/common/pb"
|
||||
"storj.io/common/storj"
|
||||
"storj.io/common/testcontext"
|
||||
"storj.io/common/testrand"
|
||||
@ -159,63 +158,70 @@ func TestCalculateNodeAtRestData(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestCalculateBucketAtRestData(t *testing.T) {
|
||||
var testCases = []struct {
|
||||
name string
|
||||
project string
|
||||
segmentIndex uint32
|
||||
bucketName string
|
||||
objectName string
|
||||
inline bool
|
||||
last bool
|
||||
}{
|
||||
{"inline, same project, same bucket", "9656af6e-2d9c-42fa-91f2-bfd516a722d7", metabase.LastSegmentIndex, "mockBucketName", "mockObjectName", true, true},
|
||||
{"remote, same project, same bucket", "9656af6e-2d9c-42fa-91f2-bfd516a722d7", 0, "mockBucketName", "mockObjectName1", false, false},
|
||||
{"last segment, same project, different bucket", "9656af6e-2d9c-42fa-91f2-bfd516a722d7", metabase.LastSegmentIndex, "mockBucketName1", "mockObjectName2", false, true},
|
||||
{"different project", "9656af6e-2d9c-42fa-91f2-bfd516a722d1", 0, "mockBucketName", "mockObjectName", false, false},
|
||||
}
|
||||
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 6, UplinkCount: 1,
|
||||
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 2,
|
||||
Reconfigure: testplanet.Reconfigure{
|
||||
Satellite: testplanet.Combine(
|
||||
testplanet.ReconfigureRS(2, 3, 4, 4),
|
||||
testplanet.MaxSegmentSize(20*memory.KiB),
|
||||
),
|
||||
},
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
satellitePeer := planet.Satellites[0]
|
||||
redundancyScheme := satelliteRS(t, satellitePeer)
|
||||
expectedBucketTallies := make(map[metabase.BucketLocation]*accounting.BucketTally)
|
||||
for _, tt := range testCases {
|
||||
tt := tt // avoid scopelint error
|
||||
satellite := planet.Satellites[0]
|
||||
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
projectID, err := uuid.FromString(tt.project)
|
||||
require.NoError(t, err)
|
||||
err := planet.Uplinks[0].Upload(ctx, satellite, "alpha", "inline", make([]byte, 10*memory.KiB))
|
||||
require.NoError(t, err)
|
||||
|
||||
// setup: create a pointer and save it to pointerDB
|
||||
pointer, err := makePointer(planet.StorageNodes, redundancyScheme, int64(20), tt.inline)
|
||||
require.NoError(t, err)
|
||||
err = planet.Uplinks[0].Upload(ctx, satellite, "alpha", "remote", make([]byte, 30*memory.KiB))
|
||||
require.NoError(t, err)
|
||||
|
||||
metainfo := satellitePeer.Metainfo.Service
|
||||
location := metabase.SegmentLocation{
|
||||
ProjectID: projectID,
|
||||
BucketName: tt.bucketName,
|
||||
Position: metabase.SegmentPosition{Index: tt.segmentIndex},
|
||||
ObjectKey: metabase.ObjectKey(tt.objectName),
|
||||
}
|
||||
err = metainfo.Put(ctx, location.Encode(), pointer)
|
||||
require.NoError(t, err)
|
||||
err = planet.Uplinks[0].Upload(ctx, satellite, "beta", "remote", make([]byte, 30*memory.KiB))
|
||||
require.NoError(t, err)
|
||||
|
||||
bucketLocation := metabase.BucketLocation{
|
||||
ProjectID: projectID,
|
||||
BucketName: tt.bucketName,
|
||||
}
|
||||
newTally := addBucketTally(expectedBucketTallies[bucketLocation], tt.inline, tt.last)
|
||||
newTally.BucketName = tt.bucketName
|
||||
newTally.ProjectID = projectID
|
||||
expectedBucketTallies[bucketLocation] = newTally
|
||||
err = planet.Uplinks[1].Upload(ctx, satellite, "alpha", "remote", make([]byte, 30*memory.KiB))
|
||||
require.NoError(t, err)
|
||||
|
||||
obs := tally.NewObserver(satellitePeer.Log.Named("observer"), time.Now())
|
||||
err = satellitePeer.Metainfo.Loop.Join(ctx, obs)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, expectedBucketTallies, obs.Bucket)
|
||||
})
|
||||
objects, err := satellite.Metainfo.Metabase.TestingAllObjects(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
segments, err := satellite.Metainfo.Metabase.TestingAllSegments(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
expectedTotal := map[metabase.BucketLocation]*accounting.BucketTally{}
|
||||
ensure := func(loc metabase.BucketLocation) *accounting.BucketTally {
|
||||
if t, ok := expectedTotal[loc]; ok {
|
||||
return t
|
||||
}
|
||||
t := &accounting.BucketTally{BucketLocation: loc}
|
||||
expectedTotal[loc] = t
|
||||
return t
|
||||
}
|
||||
|
||||
streamLocation := map[uuid.UUID]metabase.BucketLocation{}
|
||||
for _, object := range objects {
|
||||
loc := object.Location().Bucket()
|
||||
streamLocation[object.StreamID] = loc
|
||||
t := ensure(loc)
|
||||
t.ObjectCount++
|
||||
t.MetadataSize += int64(len(object.EncryptedMetadata))
|
||||
}
|
||||
for _, segment := range segments {
|
||||
loc := streamLocation[segment.StreamID]
|
||||
t := ensure(loc)
|
||||
if len(segment.Pieces) > 0 {
|
||||
t.RemoteSegments++
|
||||
t.RemoteBytes += int64(segment.EncryptedSize)
|
||||
} else {
|
||||
t.InlineSegments++
|
||||
t.InlineBytes += int64(segment.EncryptedSize)
|
||||
}
|
||||
}
|
||||
require.Len(t, expectedTotal, 3)
|
||||
|
||||
obs := tally.NewObserver(satellite.Log.Named("observer"), time.Now())
|
||||
err = satellite.Metainfo.Loop.Join(ctx, obs)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, expectedTotal, obs.Bucket)
|
||||
})
|
||||
}
|
||||
|
||||
@ -223,31 +229,14 @@ func TestTallyIgnoresExpiredPointers(t *testing.T) {
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 6, UplinkCount: 1,
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
satellitePeer := planet.Satellites[0]
|
||||
redundancyScheme := satelliteRS(t, satellitePeer)
|
||||
satellite := planet.Satellites[0]
|
||||
|
||||
projectID, err := uuid.FromString("9656af6e-2d9c-42fa-91f2-bfd516a722d7")
|
||||
require.NoError(t, err)
|
||||
bucket := "bucket"
|
||||
|
||||
// setup: create an expired pointer and save it to pointerDB
|
||||
pointer, err := makePointer(planet.StorageNodes, redundancyScheme, int64(2), false)
|
||||
now := time.Now()
|
||||
err := planet.Uplinks[0].UploadWithExpiration(ctx, planet.Satellites[0], "bucket", "path", []byte{1}, now.Add(12*time.Hour))
|
||||
require.NoError(t, err)
|
||||
|
||||
pointer.ExpirationDate = time.Now().Add(-24 * time.Hour)
|
||||
|
||||
metainfo := satellitePeer.Metainfo.Service
|
||||
location := metabase.SegmentLocation{
|
||||
ProjectID: projectID,
|
||||
BucketName: bucket,
|
||||
Position: metabase.SegmentPosition{Index: metabase.LastSegmentIndex},
|
||||
ObjectKey: metabase.ObjectKey("object/name"),
|
||||
}
|
||||
err = metainfo.Put(ctx, location.Encode(), pointer)
|
||||
require.NoError(t, err)
|
||||
|
||||
obs := tally.NewObserver(satellitePeer.Log.Named("observer"), time.Now())
|
||||
err = satellitePeer.Metainfo.Loop.Join(ctx, obs)
|
||||
obs := tally.NewObserver(satellite.Log.Named("observer"), now.Add(24*time.Hour))
|
||||
err = satellite.Metainfo.Loop.Join(ctx, obs)
|
||||
require.NoError(t, err)
|
||||
|
||||
// there should be no observed buckets because all of the pointers are expired
|
||||
@ -268,15 +257,11 @@ func TestTallyLiveAccounting(t *testing.T) {
|
||||
err := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "testbucket", "test/path", expectedData)
|
||||
require.NoError(t, err)
|
||||
|
||||
key, err := planet.Satellites[0].Metainfo.Database.List(ctx, nil, 10)
|
||||
segments, err := planet.Satellites[0].Metainfo.Metabase.TestingAllSegments(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, key, 1)
|
||||
require.Len(t, segments, 1)
|
||||
|
||||
ptr, err := planet.Satellites[0].Metainfo.Service.Get(ctx, metabase.SegmentKey(key[0]))
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, ptr)
|
||||
|
||||
segmentSize := ptr.GetSegmentSize()
|
||||
segmentSize := int64(segments[0].EncryptedSize)
|
||||
|
||||
tally.Loop.TriggerWait()
|
||||
|
||||
@ -337,88 +322,6 @@ func TestTallyEmptyProjectUpdatesLiveAccounting(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
// addBucketTally creates a new expected bucket tally based on the
|
||||
// pointer that was just created for the test case.
|
||||
func addBucketTally(existingTally *accounting.BucketTally, inline, last bool) *accounting.BucketTally {
|
||||
// if there is already an existing tally for this project and bucket, then
|
||||
// add the new pointer data to the existing tally
|
||||
if existingTally != nil {
|
||||
existingTally.MetadataSize += int64(2)
|
||||
existingTally.RemoteSegments++
|
||||
existingTally.RemoteBytes += int64(20)
|
||||
return existingTally
|
||||
}
|
||||
|
||||
// if the pointer was inline, create a tally with inline info
|
||||
if inline {
|
||||
return &accounting.BucketTally{
|
||||
ObjectCount: int64(1),
|
||||
InlineSegments: int64(1),
|
||||
InlineBytes: int64(20),
|
||||
MetadataSize: int64(2),
|
||||
}
|
||||
}
|
||||
|
||||
// if the pointer was remote, create a tally with remote info
|
||||
newRemoteTally := &accounting.BucketTally{
|
||||
RemoteSegments: int64(1),
|
||||
RemoteBytes: int64(20),
|
||||
MetadataSize: int64(2),
|
||||
}
|
||||
|
||||
if last {
|
||||
newRemoteTally.ObjectCount++
|
||||
}
|
||||
|
||||
return newRemoteTally
|
||||
}
|
||||
|
||||
// makePointer creates a pointer.
|
||||
func makePointer(storageNodes []*testplanet.StorageNode, rs storj.RedundancyScheme, segmentSize int64, inline bool) (*pb.Pointer, error) {
|
||||
metadata, err := pb.Marshal(&pb.StreamMeta{NumberOfSegments: 1})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if inline {
|
||||
inlinePointer := &pb.Pointer{
|
||||
CreationDate: time.Now(),
|
||||
Type: pb.Pointer_INLINE,
|
||||
InlineSegment: make([]byte, segmentSize),
|
||||
SegmentSize: segmentSize,
|
||||
Metadata: metadata,
|
||||
}
|
||||
return inlinePointer, nil
|
||||
}
|
||||
|
||||
pieces := make([]*pb.RemotePiece, rs.TotalShares)
|
||||
for i := range pieces {
|
||||
pieces[i] = &pb.RemotePiece{
|
||||
PieceNum: int32(i),
|
||||
NodeId: storageNodes[i].ID(),
|
||||
}
|
||||
}
|
||||
|
||||
return &pb.Pointer{
|
||||
CreationDate: time.Now(),
|
||||
Type: pb.Pointer_REMOTE,
|
||||
Remote: &pb.RemoteSegment{
|
||||
RootPieceId: storj.PieceID{0xFF},
|
||||
Redundancy: &pb.RedundancyScheme{
|
||||
Type: pb.RedundancyScheme_RS,
|
||||
MinReq: int32(rs.RequiredShares),
|
||||
Total: int32(rs.TotalShares),
|
||||
RepairThreshold: int32(rs.RepairShares),
|
||||
SuccessThreshold: int32(rs.OptimalShares),
|
||||
ErasureShareSize: rs.ShareSize,
|
||||
},
|
||||
RemotePieces: pieces,
|
||||
},
|
||||
SegmentSize: segmentSize,
|
||||
Metadata: metadata,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func correctRedundencyScheme(shareCount int, uplinkRS storj.RedundancyScheme) bool {
|
||||
// The shareCount should be a value between RequiredShares and TotalShares where
|
||||
// RequiredShares is the min number of shares required to recover a segment and
|
||||
|
@ -29,6 +29,7 @@ type Object struct {
|
||||
Location metabase.ObjectLocation // tally
|
||||
SegmentCount int // metrics
|
||||
LastSegment *Segment // metrics
|
||||
MetadataSize int // tally
|
||||
expirationDate time.Time // tally
|
||||
}
|
||||
|
||||
@ -42,7 +43,6 @@ type Segment struct {
|
||||
Location metabase.SegmentLocation // tally, repair, graceful exit, audit
|
||||
StreamID uuid.UUID // audit
|
||||
DataSize int // tally, graceful exit
|
||||
MetadataSize int // tally
|
||||
Inline bool // metrics
|
||||
Redundancy storj.RedundancyScheme // tally, graceful exit, repair
|
||||
RootPieceID storj.PieceID // gc, graceful exit
|
||||
@ -435,6 +435,7 @@ func handleObject(ctx context.Context, observer *observerContext, location metab
|
||||
if observer.HandleError(observer.Object(ctx, &Object{
|
||||
Location: location,
|
||||
SegmentCount: int(object.SegmentCount),
|
||||
MetadataSize: len(object.EncryptedMetadata),
|
||||
expirationDate: expirationDate,
|
||||
LastSegment: &Segment{}, // TODO ideally would be to remove this field
|
||||
})) {
|
||||
|
Loading…
Reference in New Issue
Block a user