cmd/segment-reaper: several refactorings (#3637)

* Move the observer implementation and the type definitions related with
  it and helper functions to its own file.
* Cluster struct type is used as a key for a ObjectsMap type.
  Observer struct type has a field of ObjectsMap.
  Cluster has a field for project ID.
  Observer processSegment method uses its ObjectMap field for tracking
  objects.

  However Observer processSegment clears the map once the projectID
  diverges from the one kept in the Observer lastProjectID field, meaning
  that it isn't needed to keep the projectID as part of the ObjectMap key.

  For this reason, ObjectMap can use as a key just only the bucket name
  and Cluster struct isn't needed.

  Because of such change, the ObjectMap type has been renamed to a more
  descriptive name.
* Make the types defined for this specific package not being exported.
* Create a constructor function for observer to encapsulate the map
  allocation.
* Don't throw away the entirely buckets objects map when an empty one
  is used to reuse part of the allocations.

  Encapsulate the clearing up logic into a method.
* Make the analyzeProject function to be a method of observer.
This commit is contained in:
Ivan Fraixedes 2019-11-27 10:28:43 +01:00 committed by GitHub
parent a6235d3962
commit 7288d01781
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 170 additions and 146 deletions

View File

@ -4,20 +4,15 @@
package main
import (
"context"
"encoding/csv"
"os"
"strconv"
"github.com/gogo/protobuf/proto"
"github.com/spf13/cobra"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/storj/pkg/cfgstruct"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/process"
"storj.io/storj/pkg/storj"
"storj.io/storj/satellite/metainfo"
)
@ -39,121 +34,6 @@ var (
}
)
// Cluster key for objects map.
type Cluster struct {
projectID string
bucket string
}
// Object represents object with segments.
type Object struct {
// TODO verify if we have more than 64 segments for object in network
segments uint64
expectedNumberOfSegments byte
hasLastSegment bool
// if skip is true then segments from this object shouldn't be treated as zombie segments
// and printed out, e.g. when one of segments is out of specified date rage
skip bool
}
// ObjectsMap map that keeps objects representation.
type ObjectsMap map[Cluster]map[storj.Path]*Object
// Observer metainfo.Loop observer for zombie reaper.
type Observer struct {
db metainfo.PointerDB
objects ObjectsMap
writer *csv.Writer
lastProjectID string
inlineSegments int
lastInlineSegments int
remoteSegments int
}
// RemoteSegment processes a segment to collect data needed to detect zombie segment.
func (observer *Observer) RemoteSegment(ctx context.Context, path metainfo.ScopedPath, pointer *pb.Pointer) (err error) {
return observer.processSegment(ctx, path, pointer)
}
// InlineSegment processes a segment to collect data needed to detect zombie segment.
func (observer *Observer) InlineSegment(ctx context.Context, path metainfo.ScopedPath, pointer *pb.Pointer) (err error) {
return observer.processSegment(ctx, path, pointer)
}
// Object not used in this implementation.
func (observer *Observer) Object(ctx context.Context, path metainfo.ScopedPath, pointer *pb.Pointer) (err error) {
return nil
}
func (observer *Observer) processSegment(ctx context.Context, path metainfo.ScopedPath, pointer *pb.Pointer) error {
cluster := Cluster{
projectID: path.ProjectIDString,
bucket: path.BucketName,
}
if observer.lastProjectID != "" && observer.lastProjectID != cluster.projectID {
err := analyzeProject(ctx, observer.db, observer.objects, observer.writer)
if err != nil {
return err
}
// cleanup map to free memory
observer.objects = make(ObjectsMap)
}
isLastSegment := path.Segment == "l"
object := findOrCreate(cluster, path.EncryptedObjectPath, observer.objects)
if isLastSegment {
object.hasLastSegment = true
streamMeta := pb.StreamMeta{}
err := proto.Unmarshal(pointer.Metadata, &streamMeta)
if err != nil {
return errs.New("unexpected error unmarshalling pointer metadata %s", err)
}
if streamMeta.NumberOfSegments > 0 {
if streamMeta.NumberOfSegments > int64(maxNumOfSegments) {
object.skip = true
zap.S().Warn("unsupported number of segments", zap.Int64("index", streamMeta.NumberOfSegments))
}
object.expectedNumberOfSegments = byte(streamMeta.NumberOfSegments)
}
} else {
segmentIndex, err := strconv.Atoi(path.Segment[1:])
if err != nil {
return err
}
if segmentIndex >= int(maxNumOfSegments) {
object.skip = true
zap.S().Warn("unsupported segment index", zap.Int("index", segmentIndex))
}
if object.segments&(1<<uint64(segmentIndex)) != 0 {
// TODO make path displayable
return errs.New("fatal error this segment is duplicated: %s", path.Raw)
}
object.segments |= 1 << uint64(segmentIndex)
}
// collect number of pointers for report
if pointer.Type == pb.Pointer_INLINE {
observer.inlineSegments++
if isLastSegment {
observer.lastInlineSegments++
}
} else {
observer.remoteSegments++
}
observer.lastProjectID = cluster.projectID
return nil
}
func init() {
rootCmd.AddCommand(detectCmd)
@ -199,11 +79,8 @@ func cmdDetect(cmd *cobra.Command, args []string) (err error) {
return err
}
observer := &Observer{
objects: make(ObjectsMap),
db: db,
writer: writer,
}
observer := newObserver(db, writer)
err = metainfo.IterateDatabase(ctx, db, observer)
if err != nil {
return err
@ -214,24 +91,3 @@ func cmdDetect(cmd *cobra.Command, args []string) (err error) {
log.Info("number of remote segments", zap.Int("segments", observer.remoteSegments))
return nil
}
func analyzeProject(ctx context.Context, db metainfo.PointerDB, objectsMap ObjectsMap, csvWriter *csv.Writer) error {
// TODO this part will be implemented in next PR
return nil
}
func findOrCreate(cluster Cluster, path string, objects ObjectsMap) *Object {
objectsMap, ok := objects[cluster]
if !ok {
objectsMap = make(map[storj.Path]*Object)
objects[cluster] = objectsMap
}
object, ok := objectsMap[path]
if !ok {
object = &Object{}
objectsMap[path] = object
}
return object
}

View File

@ -0,0 +1,168 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package main
import (
"context"
"encoding/csv"
"strconv"
"github.com/gogo/protobuf/proto"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/storj"
"storj.io/storj/satellite/metainfo"
)
// object represents object with segments.
type object struct {
// TODO verify if we have more than 64 segments for object in network
segments uint64
expectedNumberOfSegments byte
hasLastSegment bool
// if skip is true then segments from this object shouldn't be treated as zombie segments
// and printed out, e.g. when one of segments is out of specified date rage
skip bool
}
// bucketsObjects keeps a list of objects associated with their path per bucket
// name.
type bucketsObjects map[string]map[storj.Path]*object
func newObserver(db metainfo.PointerDB, w *csv.Writer) *observer {
return &observer{
db: db,
writer: w,
objects: make(bucketsObjects),
}
}
// observer metainfo.Loop observer for zombie reaper.
type observer struct {
db metainfo.PointerDB
writer *csv.Writer
lastProjectID string
objects bucketsObjects
inlineSegments int
lastInlineSegments int
remoteSegments int
}
// RemoteSegment processes a segment to collect data needed to detect zombie segment.
func (obsvr *observer) RemoteSegment(ctx context.Context, path metainfo.ScopedPath, pointer *pb.Pointer) (err error) {
return obsvr.processSegment(ctx, path, pointer)
}
// InlineSegment processes a segment to collect data needed to detect zombie segment.
func (obsvr *observer) InlineSegment(ctx context.Context, path metainfo.ScopedPath, pointer *pb.Pointer) (err error) {
return obsvr.processSegment(ctx, path, pointer)
}
// Object not used in this implementation.
func (obsvr *observer) Object(ctx context.Context, path metainfo.ScopedPath, pointer *pb.Pointer) (err error) {
return nil
}
func (obsvr *observer) processSegment(ctx context.Context, path metainfo.ScopedPath, pointer *pb.Pointer) error {
if obsvr.lastProjectID != "" && obsvr.lastProjectID != path.ProjectIDString {
err := obsvr.analyzeProject(ctx)
if err != nil {
return err
}
// cleanup map to free memory
obsvr.clearBucketsObjects()
}
obsvr.lastProjectID = path.ProjectIDString
isLastSegment := path.Segment == "l"
object := findOrCreate(path.BucketName, path.EncryptedObjectPath, obsvr.objects)
if isLastSegment {
object.hasLastSegment = true
streamMeta := pb.StreamMeta{}
err := proto.Unmarshal(pointer.Metadata, &streamMeta)
if err != nil {
return errs.New("unexpected error unmarshalling pointer metadata %s", err)
}
if streamMeta.NumberOfSegments > 0 {
if streamMeta.NumberOfSegments > int64(maxNumOfSegments) {
object.skip = true
zap.S().Warn("unsupported number of segments", zap.Int64("index", streamMeta.NumberOfSegments))
}
object.expectedNumberOfSegments = byte(streamMeta.NumberOfSegments)
}
} else {
segmentIndex, err := strconv.Atoi(path.Segment[1:])
if err != nil {
return err
}
if segmentIndex >= int(maxNumOfSegments) {
object.skip = true
zap.S().Warn("unsupported segment index", zap.Int("index", segmentIndex))
}
if object.segments&(1<<uint64(segmentIndex)) != 0 {
// TODO make path displayable
return errs.New("fatal error this segment is duplicated: %s", path.Raw)
}
object.segments |= 1 << uint64(segmentIndex)
}
// collect number of pointers for report
if pointer.Type == pb.Pointer_INLINE {
obsvr.inlineSegments++
if isLastSegment {
obsvr.lastInlineSegments++
}
} else {
obsvr.remoteSegments++
}
return nil
}
// analyzeProject analyzes the objects in obsv.objects field for detecting bad
// segments and writing them to objs.writer.
func (obsvr *observer) analyzeProject(ctx context.Context) error {
// TODO this part will be implemented in next PR
// TODO(if): For what is this?
return nil
}
// clearBucketsObjects clears up the buckets objects map for reusing it.
func (obsvr *observer) clearBucketsObjects() {
// This is an idiomatic way of not having to destroy and recreate a new map
// each time that a empty map is required.
// See https://github.com/golang/go/issues/20138
for b := range obsvr.objects {
delete(obsvr.objects, b)
}
}
func findOrCreate(bucketName string, path string, buckets bucketsObjects) *object {
objects, ok := buckets[bucketName]
if !ok {
objects = make(map[storj.Path]*object)
buckets[bucketName] = objects
}
obj, ok := objects[path]
if !ok {
obj = &object{}
objects[path] = obj
}
return obj
}