diff --git a/satellite/accounting/tally/tally.go b/satellite/accounting/tally/tally.go index a4143c193..f788fdf42 100644 --- a/satellite/accounting/tally/tally.go +++ b/satellite/accounting/tally/tally.go @@ -279,8 +279,8 @@ func (observer *Observer) Object(ctx context.Context, object *metainfo.Object) ( return nil } - bucket := observer.ensureBucket(ctx, object.Location) - bucket.MetadataSize += int64(object.MetadataSize) + bucket := observer.ensureBucket(ctx, object.ObjectStream.Location()) + bucket.MetadataSize += int64(object.EncryptedMetadataSize) bucket.ObjectCount++ return nil diff --git a/satellite/metainfo/loop.go b/satellite/metainfo/loop.go index e5af71663..f0a0dce02 100644 --- a/satellite/metainfo/loop.go +++ b/satellite/metainfo/loop.go @@ -28,17 +28,11 @@ var ( ) // Object is the object info passed to Observer by metainfo loop. -type Object struct { - Location metabase.ObjectLocation // tally - StreamID uuid.UUID // metrics, repair - SegmentCount int // metrics - MetadataSize int // tally - expirationDate time.Time // tally -} +type Object metabase.LoopObjectEntry -// Expired checks if object is expired relative to now. +// Expired checks if object expired relative to now. func (object *Object) Expired(now time.Time) bool { - return !object.expirationDate.IsZero() && object.expirationDate.Before(now) + return object.ExpiresAt != nil && object.ExpiresAt.Before(now) } // Segment is the segment info passed to Observer by metainfo loop. @@ -326,21 +320,22 @@ func iterateObjects(ctx context.Context, metabaseDB MetabaseDB, observers []*obs return err } - var lastObject metabase.LoopObjectEntry + var lastEntry metabase.LoopObjectEntry for _, segment := range segments.Segments { - if segment.StreamID != lastObject.StreamID { + if segment.StreamID != lastEntry.StreamID { var ok bool - lastObject, ok = objectsMap[segment.StreamID] + lastEntry, ok = objectsMap[segment.StreamID] if !ok { return errs.New("unable to find corresponding object: %v", segment.StreamID) } - delete(objectsMap, lastObject.StreamID) + delete(objectsMap, lastEntry.StreamID) // TODO should we move this directly to iterator to have object // state as close as possible to time of reading observers = withObservers(observers, func(observer *observerContext) bool { - return handleObject(ctx, observer, lastObject) + object := Object(lastEntry) + return handleObject(ctx, observer, &object) }) if len(observers) == 0 { return noObserversErr @@ -353,14 +348,14 @@ func iterateObjects(ctx context.Context, metabaseDB MetabaseDB, observers []*obs } location := metabase.SegmentLocation{ - ProjectID: lastObject.ProjectID, - BucketName: lastObject.BucketName, - ObjectKey: lastObject.ObjectKey, + ProjectID: lastEntry.ProjectID, + BucketName: lastEntry.BucketName, + ObjectKey: lastEntry.ObjectKey, Position: segment.Position, } segment := segment observers = withObservers(observers, func(observer *observerContext) bool { - return handleSegment(ctx, observer, location, segment, lastObject.ExpiresAt) + return handleSegment(ctx, observer, location, segment, lastEntry.ExpiresAt) }) if len(observers) == 0 { return noObserversErr @@ -373,12 +368,12 @@ func iterateObjects(ctx context.Context, metabaseDB MetabaseDB, observers []*obs } // we have now only objects without segments - for id, object := range objectsMap { + for id, entry := range objectsMap { delete(objectsMap, id) - object := object + object := Object(entry) observers = withObservers(observers, func(observer *observerContext) bool { - return handleObject(ctx, observer, object) + return handleObject(ctx, observer, &object) }) if len(observers) == 0 { return noObserversErr @@ -449,19 +444,8 @@ func withObservers(observers []*observerContext, handleObserver func(observer *o return nextObservers } -func handleObject(ctx context.Context, observer *observerContext, object metabase.LoopObjectEntry) bool { - expirationDate := time.Time{} - if object.ExpiresAt != nil { - expirationDate = *object.ExpiresAt - } - - if observer.HandleError(observer.Object(ctx, &Object{ - Location: object.Location(), - StreamID: object.StreamID, - SegmentCount: int(object.SegmentCount), - MetadataSize: len(object.EncryptedMetadata), - expirationDate: expirationDate, - })) { +func handleObject(ctx context.Context, observer *observerContext, object *Object) bool { + if observer.HandleError(observer.Object(ctx, object)) { return false } diff --git a/satellite/metainfo/loop_test.go b/satellite/metainfo/loop_test.go index f08260415..886638170 100644 --- a/satellite/metainfo/loop_test.go +++ b/satellite/metainfo/loop_test.go @@ -106,6 +106,8 @@ func TestLoop(t *testing.T) { assert.EqualValues(t, path.BucketName, "bucket") assert.EqualValues(t, path.ProjectID, projectID) } + // TODO we need better calulation + assert.NotZero(t, obs.totalMetadataSize) } }) } @@ -378,20 +380,22 @@ func TestLoopCancel(t *testing.T) { } type testObserver struct { - objectCount int - remoteSegCount int - inlineSegCount int - uniquePaths map[string]metabase.SegmentLocation - onSegment func(context.Context) error // if set, run this during RemoteSegment() + objectCount int + remoteSegCount int + inlineSegCount int + totalMetadataSize int + uniquePaths map[string]metabase.SegmentLocation + onSegment func(context.Context) error // if set, run this during RemoteSegment() } func newTestObserver(onSegment func(context.Context) error) *testObserver { return &testObserver{ - objectCount: 0, - remoteSegCount: 0, - inlineSegCount: 0, - uniquePaths: make(map[string]metabase.SegmentLocation), - onSegment: onSegment, + objectCount: 0, + remoteSegCount: 0, + inlineSegCount: 0, + totalMetadataSize: 0, + uniquePaths: make(map[string]metabase.SegmentLocation), + onSegment: onSegment, } } @@ -414,6 +418,7 @@ func (obs *testObserver) RemoteSegment(ctx context.Context, segment *metainfo.Se func (obs *testObserver) Object(ctx context.Context, object *metainfo.Object) error { obs.objectCount++ + obs.totalMetadataSize += object.EncryptedMetadataSize return nil } diff --git a/satellite/metainfo/metabase/loop.go b/satellite/metainfo/metabase/loop.go index 9bebf0dec..dc554df5c 100644 --- a/satellite/metainfo/metabase/loop.go +++ b/satellite/metainfo/metabase/loop.go @@ -9,7 +9,6 @@ import ( "github.com/zeebo/errs" - "storj.io/common/storj" "storj.io/common/uuid" "storj.io/storj/private/tagsql" ) @@ -18,28 +17,10 @@ const loopIteratorBatchSizeLimit = 2500 // LoopObjectEntry contains information about object needed by metainfo loop. type LoopObjectEntry struct { - ObjectStream - - CreatedAt time.Time - ExpiresAt *time.Time - - Status ObjectStatus - SegmentCount int32 - - EncryptedMetadataNonce []byte - EncryptedMetadata []byte - EncryptedMetadataEncryptedKey []byte - - TotalPlainSize int64 - TotalEncryptedSize int64 - FixedSegmentSize int32 - - Encryption storj.EncryptionParameters - - // ZombieDeletionDeadline defines when the pending raw object should be deleted from the database. - // This is as a safeguard against objects that failed to upload and the client has not indicated - // whether they want to continue uploading or delete the already uploaded data. - ZombieDeletionDeadline *time.Time + ObjectStream // metrics, repair, tally + ExpiresAt *time.Time // tally + SegmentCount int32 // metrics + EncryptedMetadataSize int // tally } // LoopObjectsIterator iterates over a sequence of LoopObjectEntry items. @@ -164,12 +145,10 @@ func (it *loopIterator) doNextQuery(ctx context.Context) (_ tagsql.Rows, err err return it.db.db.Query(ctx, ` SELECT project_id, bucket_name, - object_key, stream_id, version, status, - created_at, expires_at, + object_key, stream_id, version, + expires_at, segment_count, - encrypted_metadata_nonce, encrypted_metadata, encrypted_metadata_encrypted_key, - total_plain_size, total_encrypted_size, fixed_segment_size, - encryption + LENGTH(COALESCE(encrypted_metadata,'')) FROM objects WHERE (project_id, bucket_name, object_key, version) > ($1, $2, $3, $4) ORDER BY project_id ASC, bucket_name ASC, object_key ASC, version ASC @@ -184,11 +163,9 @@ func (it *loopIterator) doNextQuery(ctx context.Context) (_ tagsql.Rows, err err func (it *loopIterator) scanItem(item *LoopObjectEntry) error { return it.curRows.Scan( &item.ProjectID, &item.BucketName, - &item.ObjectKey, &item.StreamID, &item.Version, &item.Status, - &item.CreatedAt, &item.ExpiresAt, + &item.ObjectKey, &item.StreamID, &item.Version, + &item.ExpiresAt, &item.SegmentCount, - &item.EncryptedMetadataNonce, &item.EncryptedMetadata, &item.EncryptedMetadataEncryptedKey, - &item.TotalPlainSize, &item.TotalEncryptedSize, &item.FixedSegmentSize, - encryptionParameters{&item.Encryption}, + &item.EncryptedMetadataSize, ) } diff --git a/satellite/metainfo/metabase/loop_test.go b/satellite/metainfo/metabase/loop_test.go index 93b5f6b6a..89f31c429 100644 --- a/satellite/metainfo/metabase/loop_test.go +++ b/satellite/metainfo/metabase/loop_test.go @@ -6,7 +6,6 @@ package metabase_test import ( "strings" "testing" - "time" "storj.io/common/testcontext" "storj.io/common/testrand" @@ -51,8 +50,6 @@ func TestIterateLoopObjects(t *testing.T) { t.Run("pending and committed", func(t *testing.T) { defer DeleteAll{}.Check(ctx, t, db) - now := time.Now() - pending := randObjectStream() committed := randObjectStream() committed.ProjectID = pending.ProjectID @@ -93,18 +90,10 @@ func TestIterateLoopObjects(t *testing.T) { Result: []metabase.LoopObjectEntry{ { ObjectStream: pending, - CreatedAt: now, - Status: metabase.Pending, - Encryption: defaultTestEncryption, }, { - ObjectStream: committed, - CreatedAt: now, - Status: metabase.Committed, - Encryption: defaultTestEncryption, - EncryptedMetadataNonce: encryptedMetadataNonce[:], - EncryptedMetadata: encryptedMetadata, - EncryptedMetadataEncryptedKey: encryptedMetadataKey, + ObjectStream: committed, + EncryptedMetadataSize: len(encryptedMetadata), }, }, }.Check(ctx, t, db) @@ -251,15 +240,11 @@ func createFullObjectsWithKeys(ctx *testcontext.Context, t *testing.T, db *metab obj.ProjectID = projectID obj.BucketName = bucketName obj.ObjectKey = key - now := time.Now() createObject(ctx, t, db, obj, 0) objects[key] = metabase.LoopObjectEntry{ ObjectStream: obj, - CreatedAt: now, - Status: metabase.Committed, - Encryption: defaultTestEncryption, } } @@ -267,17 +252,8 @@ func createFullObjectsWithKeys(ctx *testcontext.Context, t *testing.T, db *metab } func loopObjectEntryFromRaw(m metabase.RawObject) metabase.LoopObjectEntry { return metabase.LoopObjectEntry{ - ObjectStream: m.ObjectStream, - CreatedAt: m.CreatedAt, - ExpiresAt: m.ExpiresAt, - Status: m.Status, - SegmentCount: m.SegmentCount, - EncryptedMetadataNonce: m.EncryptedMetadataNonce, - EncryptedMetadata: m.EncryptedMetadata, - EncryptedMetadataEncryptedKey: m.EncryptedMetadataEncryptedKey, - TotalEncryptedSize: m.TotalEncryptedSize, - FixedSegmentSize: m.FixedSegmentSize, - Encryption: m.Encryption, - ZombieDeletionDeadline: m.ZombieDeletionDeadline, + ObjectStream: m.ObjectStream, + ExpiresAt: m.ExpiresAt, + SegmentCount: m.SegmentCount, } }