diff --git a/satellite/metainfo/loop.go b/satellite/metainfo/loop.go index bd82dc9f0..a7ea15521 100644 --- a/satellite/metainfo/loop.go +++ b/satellite/metainfo/loop.go @@ -14,8 +14,8 @@ import ( "storj.io/common/pb" "storj.io/common/storj" + "storj.io/common/uuid" "storj.io/storj/satellite/metainfo/metabase" - "storj.io/storj/storage" ) var ( @@ -40,7 +40,7 @@ func (object *Object) Expired(now time.Time) bool { // Segment is the segment info passed to Observer by metainfo loop. type Segment struct { - Location metabase.SegmentLocation // tally, expired deletion, repair, graceful exit, audit, segment reaper + Location metabase.SegmentLocation // tally, repair, graceful exit, audit, segment reaper DataSize int // tally, graceful exit MetadataSize int // tally Inline bool // metrics, segment reaper @@ -48,9 +48,9 @@ type Segment struct { RootPieceID storj.PieceID // gc, graceful exit Pieces metabase.Pieces // tally, audit, gc, graceful exit, repair CreationDate time.Time // repair, segment reaper - expirationDate time.Time // tally, expired deletion, repair + expirationDate time.Time // tally, repair LastRepaired time.Time // repair - Pointer *pb.Pointer // expired deletion, repair + Pointer *pb.Pointer // repair MetadataNumberOfSegments int // segment reaper } @@ -156,6 +156,25 @@ func (observer *observerContext) Wait() error { return <-observer.done } +type observers []*observerContext + +func (o *observers) Remove(toRemove *observerContext) { + list := *o + for i, observer := range list { + if observer == toRemove { + list[len(list)-1], list[i] = list[i], list[len(list)-1] + *o = list[:len(list)-1] + return + } + } +} + +func (o *observers) Finish() { + for _, observer := range *o { + observer.Finish() + } +} + // LoopConfig contains configurable values for the metainfo loop. type LoopConfig struct { CoalesceDuration time.Duration `help:"how long to wait for new observers before starting iteration" releaseDefault:"5s" devDefault:"5s"` @@ -178,10 +197,12 @@ type Loop struct { // NewLoop creates a new metainfo loop service. func NewLoop(config LoopConfig, db PointerDB, bucketsDB BucketsDB, metabaseDB MetabaseDB) *Loop { return &Loop{ - db: db, - config: config, - join: make(chan []*observerContext), - done: make(chan struct{}), + db: db, + bucketsDB: bucketsDB, + metabaseDB: metabaseDB, + config: config, + join: make(chan []*observerContext), + done: make(chan struct{}), } } @@ -274,74 +295,205 @@ func IterateDatabase(ctx context.Context, rateLimit float64, db PointerDB, bucke return iterateDatabase(ctx, db, bucketsDB, metabaseDB, obsContexts, 10000, rate.NewLimiter(rate.Limit(rateLimit), 1)) } -// handlePointer deals with a pointer for a single observer -// if there is some error on the observer, handles the error and returns false. Otherwise, returns true. -func handlePointer(ctx context.Context, observer *observerContext, location metabase.SegmentLocation, pointer *pb.Pointer) bool { - segment := &Segment{ +// Wait waits for run to be finished. +// Safe to be called concurrently. +func (loop *Loop) Wait() { + <-loop.done +} + +func iterateDatabase(ctx context.Context, db PointerDB, bucketsDB BucketsDB, metabaseDB MetabaseDB, observers observers, limit int, rateLimiter *rate.Limiter) (err error) { + defer func() { + if err != nil { + for _, observer := range observers { + observer.HandleError(err) + } + return + } + observers.Finish() + }() + + more := true + bucketsCursor := ListAllBucketsCursor{} + for more { + buckets, err := bucketsDB.ListAllBuckets(ctx, ListAllBucketsOptions{ + Cursor: bucketsCursor, + Limit: limit, + }) + if err != nil { + return LoopError.Wrap(err) + } + + for _, bucket := range buckets.Items { + err := iterateObjects(ctx, bucket.ProjectID, bucket.Name, metabaseDB, observers, limit, rateLimiter) + if err != nil { + return LoopError.Wrap(err) + } + } + + more = buckets.More + if more { + lastBucket := buckets.Items[len(buckets.Items)-1] + bucketsCursor.ProjectID = lastBucket.ProjectID + bucketsCursor.BucketName = []byte(lastBucket.Name) + } + } + return err +} + +func iterateObjects(ctx context.Context, projectID uuid.UUID, bucket string, metabaseDB MetabaseDB, observers observers, limit int, rateLimiter *rate.Limiter) (err error) { + defer mon.Task()(&ctx)(&err) + + // TODO we should improve performance here, this is just most straightforward solution + + err = metabaseDB.IterateObjectsAllVersions(ctx, metabase.IterateObjects{ + ProjectID: projectID, + BucketName: bucket, + BatchSize: limit, + Recursive: true, + Status: metabase.Committed, // TODO we should iterate also Pending objects + }, func(ctx context.Context, it metabase.ObjectsIterator) error { + var entry metabase.ObjectEntry + for it.Next(ctx, &entry) { + if err := rateLimiter.Wait(ctx); err != nil { + // We don't really execute concurrent batches so we should never + // exceed the burst size of 1 and this should never happen. + // We can also enter here if the context is cancelled. + return err + } + + for _, observer := range observers { + location := metabase.ObjectLocation{ + ProjectID: projectID, + BucketName: bucket, + ObjectKey: entry.ObjectKey, + } + keepObserver := handleObject(ctx, observer, location, entry) + if !keepObserver { + observers.Remove(observer) + } + } + + if len(observers) == 0 { + return nil + } + + // if context has been canceled exit. Otherwise, continue + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + err = iterateSegments(ctx, entry.StreamID, projectID, bucket, entry.ObjectKey, metabaseDB, observers, limit, rateLimiter) + if err != nil { + return err + } + } + return nil + }) + + return err +} + +func iterateSegments(ctx context.Context, streamID uuid.UUID, projectID uuid.UUID, bucket string, objectKey metabase.ObjectKey, metabaseDB MetabaseDB, observers observers, limit int, rateLimiter *rate.Limiter) (err error) { + defer mon.Task()(&ctx)(&err) + + more := true + cursor := metabase.SegmentPosition{} + for more { + if err := rateLimiter.Wait(ctx); err != nil { + // We don't really execute concurrent batches so we should never + // exceed the burst size of 1 and this should never happen. + // We can also enter here if the context is cancelled. + return err + } + + segments, err := metabaseDB.ListSegments(ctx, metabase.ListSegments{ + StreamID: streamID, + Cursor: cursor, + Limit: limit, + }) + if err != nil { + return err + } + + for _, segment := range segments.Segments { + for _, observer := range observers { + location := metabase.SegmentLocation{ + ProjectID: projectID, + BucketName: bucket, + ObjectKey: objectKey, + Index: int64(segment.Position.Index), + } + keepObserver := handleSegment(ctx, observer, location, segment) + if !keepObserver { + observers.Remove(observer) + } + } + + if len(observers) == 0 { + return nil + } + + // if context has been canceled exit. Otherwise, continue + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + } + + more = segments.More + if more { + lastSegment := segments.Segments[len(segments.Segments)-1] + cursor = lastSegment.Position + } + } + + return nil +} + +func handleObject(ctx context.Context, observer *observerContext, location metabase.ObjectLocation, object metabase.ObjectEntry) bool { + expirationDate := time.Time{} + if object.ExpiresAt != nil { + expirationDate = *object.ExpiresAt + } + + if observer.HandleError(observer.Object(ctx, &Object{ Location: location, - MetadataSize: len(pointer.Metadata), - CreationDate: pointer.CreationDate, - LastRepaired: pointer.LastRepaired, - Pointer: pointer, - expirationDate: pointer.ExpirationDate, - } - - if location.IsLast() { - streamMeta := pb.StreamMeta{} - err := pb.Unmarshal(pointer.Metadata, &streamMeta) - if observer.HandleError(LoopError.Wrap(err)) { - return false - } - segment.MetadataNumberOfSegments = int(streamMeta.NumberOfSegments) - } - - switch pointer.GetType() { - case pb.Pointer_REMOTE: - switch { - case pointer.Remote == nil: - observer.HandleError(LoopError.New("no remote segment specified")) - return false - case pointer.Remote.RemotePieces == nil: - observer.HandleError(LoopError.New("no remote segment pieces specified")) - return false - case pointer.Remote.Redundancy == nil: - observer.HandleError(LoopError.New("no redundancy scheme specified")) - return false - } - segment.DataSize = int(pointer.SegmentSize) - segment.RootPieceID = pointer.Remote.RootPieceId - segment.Redundancy = storj.RedundancyScheme{ - Algorithm: storj.ReedSolomon, - RequiredShares: int16(pointer.Remote.Redundancy.MinReq), - RepairShares: int16(pointer.Remote.Redundancy.RepairThreshold), - OptimalShares: int16(pointer.Remote.Redundancy.SuccessThreshold), - TotalShares: int16(pointer.Remote.Redundancy.Total), - ShareSize: pointer.Remote.Redundancy.ErasureShareSize, - } - segment.Pieces = make(metabase.Pieces, len(pointer.Remote.RemotePieces)) - for i, piece := range pointer.Remote.RemotePieces { - segment.Pieces[i].Number = uint16(piece.PieceNum) - segment.Pieces[i].StorageNode = piece.NodeId - } - if observer.HandleError(observer.RemoteSegment(ctx, segment)) { - return false - } - case pb.Pointer_INLINE: - segment.DataSize = len(pointer.InlineSegment) - segment.Inline = true - if observer.HandleError(observer.InlineSegment(ctx, segment)) { - return false - } - default: + SegmentCount: int(object.SegmentCount), + expirationDate: expirationDate, + LastSegment: &Segment{}, // TODO ideally would be to remove this field + })) { return false } - if location.IsLast() { - if observer.HandleError(observer.Object(ctx, &Object{ - Location: location.Object(), - SegmentCount: segment.MetadataNumberOfSegments, - LastSegment: segment, - expirationDate: segment.expirationDate, - })) { + + select { + case <-observer.ctx.Done(): + observer.HandleError(observer.ctx.Err()) + return false + default: + } + + return true +} + +func handleSegment(ctx context.Context, observer *observerContext, location metabase.SegmentLocation, segment metabase.Segment) bool { + loopSegment := &Segment{ + Location: location, + } + + loopSegment.DataSize = int(segment.EncryptedSize) // TODO should this be plain or enrypted size + if segment.Inline() { + loopSegment.Inline = true + if observer.HandleError(observer.InlineSegment(ctx, loopSegment)) { + return false + } + } else { + loopSegment.RootPieceID = segment.RootPieceID + loopSegment.Redundancy = segment.Redundancy + loopSegment.Pieces = segment.Pieces + if observer.HandleError(observer.RemoteSegment(ctx, loopSegment)) { return false } } @@ -356,78 +508,6 @@ func handlePointer(ctx context.Context, observer *observerContext, location meta return true } -// Wait waits for run to be finished. -// Safe to be called concurrently. -func (loop *Loop) Wait() { - <-loop.done -} - -func iterateDatabase(ctx context.Context, db PointerDB, bucketsDB BucketsDB, metabaseDB MetabaseDB, observers []*observerContext, limit int, rateLimiter *rate.Limiter) (err error) { - defer func() { - if err != nil { - for _, observer := range observers { - observer.HandleError(err) - } - return - } - finishObservers(observers) - }() - - err = db.IterateWithoutLookupLimit(ctx, storage.IterateOptions{ - Recurse: true, - Limit: limit, - }, func(ctx context.Context, it storage.Iterator) error { - var item storage.ListItem - - // iterate over every segment in metainfo - nextSegment: - for it.Next(ctx, &item) { - if err := rateLimiter.Wait(ctx); err != nil { - // We don't really execute concurrent batches so we should never - // exceed the burst size of 1 and this should never happen. - // We can also enter here if the context is cancelled. - return LoopError.Wrap(err) - } - - rawPath := item.Key.String() - pointer := &pb.Pointer{} - - err := pb.Unmarshal(item.Value, pointer) - if err != nil { - return LoopError.New("unexpected error unmarshalling pointer %s", err) - } - - location, err := metabase.ParseSegmentKey(metabase.SegmentKey(rawPath)) - if err != nil { - // TODO should we log error here - continue nextSegment - } - - nextObservers := observers[:0] - for _, observer := range observers { - keepObserver := handlePointer(ctx, observer, location, pointer) - if keepObserver { - nextObservers = append(nextObservers, observer) - } - } - - observers = nextObservers - if len(observers) == 0 { - return nil - } - - // if context has been canceled exit. Otherwise, continue - select { - case <-ctx.Done(): - return ctx.Err() - default: - } - } - return nil - }) - return err -} - func finishObservers(observers []*observerContext) { for _, observer := range observers { observer.Finish() diff --git a/satellite/metainfo/metabase/common.go b/satellite/metainfo/metabase/common.go index 3bff6155d..ec3618f69 100644 --- a/satellite/metainfo/metabase/common.go +++ b/satellite/metainfo/metabase/common.go @@ -133,7 +133,7 @@ type SegmentKey []byte type SegmentLocation struct { ProjectID uuid.UUID BucketName string - Index int64 + Index int64 // TODO refactor to SegmentPosition ObjectKey ObjectKey } diff --git a/satellite/metainfo/metabase/get.go b/satellite/metainfo/metabase/get.go index 8c560d369..b5759734b 100644 --- a/satellite/metainfo/metabase/get.go +++ b/satellite/metainfo/metabase/get.go @@ -20,6 +20,11 @@ type Object RawObject // TODO define separated struct. type Segment RawSegment +// Inline returns true if segment is inline. +func (s Segment) Inline() bool { + return len(s.InlineData) != 0 && s.Redundancy.IsZero() +} + // GetObjectExactVersion contains arguments necessary for fetching an information // about exact object version. type GetObjectExactVersion struct {