satellite/repair: use metabase.SegmentKey type in repair package
Another change which is a part of refactoring to replace path parameter (string/[]byte) with key paramter (metabase.SegmentKey) Change-Id: I617878442442e5d59bbe5c995f913c3c93c16928
This commit is contained in:
parent
c753d17e8f
commit
27a9d14e2a
@ -161,7 +161,7 @@ func containsObjectLocation(a []metabase.ObjectLocation, x metabase.ObjectLocati
|
||||
return false
|
||||
}
|
||||
|
||||
func (checker *Checker) updateIrreparableSegmentStatus(ctx context.Context, pointer *pb.Pointer, path string) (err error) {
|
||||
func (checker *Checker) updateIrreparableSegmentStatus(ctx context.Context, pointer *pb.Pointer, key metabase.SegmentKey) (err error) {
|
||||
// TODO figure out how to reduce duplicate code between here and checkerObs.RemoteSegment
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
remote := pointer.GetRemote()
|
||||
@ -196,7 +196,7 @@ func (checker *Checker) updateIrreparableSegmentStatus(ctx context.Context, poin
|
||||
// keep it in the irreparabledb queue either.
|
||||
if numHealthy >= redundancy.MinReq && numHealthy <= repairThreshold && numHealthy < redundancy.SuccessThreshold {
|
||||
_, err = checker.repairQueue.Insert(ctx, &pb.InjuredSegment{
|
||||
Path: []byte(path),
|
||||
Path: key,
|
||||
LostPieces: missingPieces,
|
||||
InsertedTime: time.Now().UTC(),
|
||||
}, int(numHealthy))
|
||||
@ -205,7 +205,7 @@ func (checker *Checker) updateIrreparableSegmentStatus(ctx context.Context, poin
|
||||
}
|
||||
|
||||
// delete always returns nil when something was deleted and also when element didn't exists
|
||||
err = checker.irrdb.Delete(ctx, []byte(path))
|
||||
err = checker.irrdb.Delete(ctx, key)
|
||||
if err != nil {
|
||||
checker.logger.Error("error deleting entry from irreparable db: ", zap.Error(err))
|
||||
}
|
||||
@ -213,7 +213,7 @@ func (checker *Checker) updateIrreparableSegmentStatus(ctx context.Context, poin
|
||||
|
||||
// make an entry into the irreparable table
|
||||
segmentInfo := &pb.IrreparableSegment{
|
||||
Path: []byte(path),
|
||||
Path: key,
|
||||
SegmentDetail: pointer,
|
||||
LostPieces: int32(len(missingPieces)),
|
||||
LastRepairAttempt: time.Now().Unix(),
|
||||
@ -226,7 +226,7 @@ func (checker *Checker) updateIrreparableSegmentStatus(ctx context.Context, poin
|
||||
return errs.Combine(Error.New("error handling irreparable segment to queue"), err)
|
||||
}
|
||||
} else if numHealthy > repairThreshold || numHealthy >= redundancy.SuccessThreshold {
|
||||
err = checker.irrdb.Delete(ctx, []byte(path))
|
||||
err = checker.irrdb.Delete(ctx, key)
|
||||
if err != nil {
|
||||
return Error.New("error removing segment from irreparable queue: %v", err)
|
||||
}
|
||||
@ -373,10 +373,10 @@ func (obs *checkerObserver) InlineSegment(ctx context.Context, location metabase
|
||||
func (checker *Checker) IrreparableProcess(ctx context.Context) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
const limit = 1000
|
||||
lastSeenSegmentPath := []byte{}
|
||||
var lastSeenSegmentKey metabase.SegmentKey
|
||||
|
||||
for {
|
||||
segments, err := checker.irrdb.GetLimited(ctx, limit, lastSeenSegmentPath)
|
||||
segments, err := checker.irrdb.GetLimited(ctx, limit, lastSeenSegmentKey)
|
||||
if err != nil {
|
||||
return errs.Combine(Error.New("error reading segment from the queue"), err)
|
||||
}
|
||||
@ -386,10 +386,10 @@ func (checker *Checker) IrreparableProcess(ctx context.Context) (err error) {
|
||||
break
|
||||
}
|
||||
|
||||
lastSeenSegmentPath = segments[len(segments)-1].Path
|
||||
lastSeenSegmentKey = metabase.SegmentKey(segments[len(segments)-1].Path)
|
||||
|
||||
for _, segment := range segments {
|
||||
err = checker.updateIrreparableSegmentStatus(ctx, segment.GetSegmentDetail(), string(segment.GetPath()))
|
||||
err = checker.updateIrreparableSegmentStatus(ctx, segment.GetSegmentDetail(), metabase.SegmentKey(segment.GetPath()))
|
||||
if err != nil {
|
||||
checker.logger.Error("irrepair segment checker failed: ", zap.Error(err))
|
||||
}
|
||||
|
@ -105,8 +105,6 @@ func TestIdentifyIrreparableSegments(t *testing.T) {
|
||||
expectedLostPieces[int32(i)] = true
|
||||
}
|
||||
|
||||
projectID := testrand.UUID()
|
||||
pointerPath := storj.JoinPaths(projectID.String(), "l", "bucket", "piece")
|
||||
pieceID := testrand.PieceID()
|
||||
|
||||
// when number of healthy piece is less than minimum required number of piece in redundancy,
|
||||
@ -127,13 +125,24 @@ func TestIdentifyIrreparableSegments(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
projectID := testrand.UUID()
|
||||
pointerLocation := metabase.SegmentLocation{
|
||||
ProjectID: projectID,
|
||||
BucketName: "bucket",
|
||||
Index: metabase.LastSegmentIndex,
|
||||
ObjectKey: "piece",
|
||||
}
|
||||
|
||||
pointerKey := pointerLocation.Encode()
|
||||
pointerLocation.ObjectKey += "-expired"
|
||||
pointerExpiredKey := pointerLocation.Encode()
|
||||
// put test pointer to db
|
||||
metainfo := planet.Satellites[0].Metainfo.Service
|
||||
err := metainfo.Put(ctx, metabase.SegmentKey(pointerPath), pointer)
|
||||
err := metainfo.Put(ctx, pointerKey, pointer)
|
||||
require.NoError(t, err)
|
||||
// modify pointer to make it expired and put to db
|
||||
pointer.ExpirationDate = time.Now().Add(-time.Hour)
|
||||
err = metainfo.Put(ctx, metabase.SegmentKey(pointerPath+"-expired"), pointer)
|
||||
err = metainfo.Put(ctx, pointerExpiredKey, pointer)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = checker.IdentifyInjuredSegments(ctx)
|
||||
@ -146,10 +155,10 @@ func TestIdentifyIrreparableSegments(t *testing.T) {
|
||||
|
||||
//check if the expected segments were added to the irreparable DB
|
||||
irreparable := planet.Satellites[0].DB.Irreparable()
|
||||
remoteSegmentInfo, err := irreparable.Get(ctx, []byte(pointerPath))
|
||||
remoteSegmentInfo, err := irreparable.Get(ctx, pointerKey)
|
||||
require.NoError(t, err)
|
||||
// check that the expired segment was not added to the irreparable DB
|
||||
_, err = irreparable.Get(ctx, []byte(pointerPath+"-expired"))
|
||||
_, err = irreparable.Get(ctx, pointerExpiredKey)
|
||||
require.Error(t, err)
|
||||
|
||||
require.Equal(t, len(expectedLostPieces), int(remoteSegmentInfo.LostPieces))
|
||||
@ -161,7 +170,7 @@ func TestIdentifyIrreparableSegments(t *testing.T) {
|
||||
err = checker.IdentifyInjuredSegments(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
remoteSegmentInfo, err = irreparable.Get(ctx, []byte(pointerPath))
|
||||
remoteSegmentInfo, err = irreparable.Get(ctx, pointerKey)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, len(expectedLostPieces), int(remoteSegmentInfo.LostPieces))
|
||||
@ -186,15 +195,15 @@ func TestIdentifyIrreparableSegments(t *testing.T) {
|
||||
},
|
||||
}
|
||||
// update test pointer in db
|
||||
err = metainfo.UnsynchronizedDelete(ctx, metabase.SegmentKey(pointerPath))
|
||||
err = metainfo.UnsynchronizedDelete(ctx, pointerKey)
|
||||
require.NoError(t, err)
|
||||
err = metainfo.Put(ctx, metabase.SegmentKey(pointerPath), pointer)
|
||||
err = metainfo.Put(ctx, pointerKey, pointer)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = checker.IdentifyInjuredSegments(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = irreparable.Get(ctx, []byte(pointerPath))
|
||||
_, err = irreparable.Get(ctx, pointerKey)
|
||||
require.Error(t, err)
|
||||
})
|
||||
}
|
||||
|
@ -7,6 +7,7 @@ import (
|
||||
"context"
|
||||
|
||||
"storj.io/common/pb"
|
||||
"storj.io/storj/satellite/metainfo/metabase"
|
||||
)
|
||||
|
||||
// DB stores information about repairs that have failed.
|
||||
@ -15,10 +16,10 @@ import (
|
||||
type DB interface {
|
||||
// IncrementRepairAttempts increments the repair attempts.
|
||||
IncrementRepairAttempts(ctx context.Context, segmentInfo *pb.IrreparableSegment) error
|
||||
// Get returns irreparable segment info based on segmentPath.
|
||||
Get(ctx context.Context, segmentPath []byte) (*pb.IrreparableSegment, error)
|
||||
// Get returns irreparable segment info based on segmentKey.
|
||||
Get(ctx context.Context, segmentKey metabase.SegmentKey) (*pb.IrreparableSegment, error)
|
||||
// GetLimited returns a list of irreparable segment info starting after the last segment info we retrieved
|
||||
GetLimited(ctx context.Context, limit int, lastSeenSegmentPath []byte) ([]*pb.IrreparableSegment, error)
|
||||
// Delete removes irreparable segment info based on segmentPath.
|
||||
Delete(ctx context.Context, segmentPath []byte) error
|
||||
GetLimited(ctx context.Context, limit int, lastSeenSegmentKey metabase.SegmentKey) ([]*pb.IrreparableSegment, error)
|
||||
// Delete removes irreparable segment info based on segmentKey.
|
||||
Delete(ctx context.Context, segmentKey metabase.SegmentKey) error
|
||||
}
|
||||
|
@ -9,6 +9,7 @@ import (
|
||||
"errors"
|
||||
|
||||
"storj.io/common/pb"
|
||||
"storj.io/storj/satellite/metainfo/metabase"
|
||||
"storj.io/storj/satellite/satellitedb/dbx"
|
||||
)
|
||||
|
||||
@ -57,9 +58,9 @@ func (db *irreparableDB) IncrementRepairAttempts(ctx context.Context, segmentInf
|
||||
}
|
||||
|
||||
// Get a irreparable's segment info from the db.
|
||||
func (db *irreparableDB) Get(ctx context.Context, segmentPath []byte) (resp *pb.IrreparableSegment, err error) {
|
||||
func (db *irreparableDB) Get(ctx context.Context, segmentKey metabase.SegmentKey) (resp *pb.IrreparableSegment, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
dbxInfo, err := db.db.Get_Irreparabledb_By_Segmentpath(ctx, dbx.Irreparabledb_Segmentpath(segmentPath))
|
||||
dbxInfo, err := db.db.Get_Irreparabledb_By_Segmentpath(ctx, dbx.Irreparabledb_Segmentpath(segmentKey))
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
@ -81,14 +82,14 @@ func (db *irreparableDB) Get(ctx context.Context, segmentPath []byte) (resp *pb.
|
||||
}
|
||||
|
||||
// GetLimited returns a list of irreparable segment info starting after the last segment info we retrieved.
|
||||
func (db *irreparableDB) GetLimited(ctx context.Context, limit int, lastSeenSegmentPath []byte) (resp []*pb.IrreparableSegment, err error) {
|
||||
func (db *irreparableDB) GetLimited(ctx context.Context, limit int, lastSeenSegmentKey metabase.SegmentKey) (resp []*pb.IrreparableSegment, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
// the offset is hardcoded to 0 since we are using the lastSeenSegmentPath to
|
||||
// indicate the item we last listed instead. In a perfect world this db query would
|
||||
// not take an offset as an argument, but currently dbx only supports `limitoffset`
|
||||
const offset = 0
|
||||
rows, err := db.db.Limited_Irreparabledb_By_Segmentpath_Greater_OrderBy_Asc_Segmentpath(ctx,
|
||||
dbx.Irreparabledb_Segmentpath(lastSeenSegmentPath),
|
||||
dbx.Irreparabledb_Segmentpath(lastSeenSegmentKey),
|
||||
limit, offset,
|
||||
)
|
||||
if err != nil {
|
||||
@ -114,9 +115,9 @@ func (db *irreparableDB) GetLimited(ctx context.Context, limit int, lastSeenSegm
|
||||
}
|
||||
|
||||
// Delete a irreparable's segment info from the db.
|
||||
func (db *irreparableDB) Delete(ctx context.Context, segmentPath []byte) (err error) {
|
||||
func (db *irreparableDB) Delete(ctx context.Context, segmentKey metabase.SegmentKey) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
_, err = db.db.Delete_Irreparabledb_By_Segmentpath(ctx, dbx.Irreparabledb_Segmentpath(segmentPath))
|
||||
_, err = db.db.Delete_Irreparabledb_By_Segmentpath(ctx, dbx.Irreparabledb_Segmentpath(segmentKey))
|
||||
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user