satellite/metainfo.Loop: use a parsed path for observers (#3003)

This commit is contained in:
Egon Elbre 2019-09-12 13:38:49 +03:00 committed by GitHub
parent e5ac95b6e9
commit 8b668ab1f8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 111 additions and 65 deletions

View File

@ -12,6 +12,7 @@ import (
"strconv"
"time"
"github.com/skyrings/skyring-common/tools/uuid"
"github.com/spf13/pflag"
"github.com/zeebo/errs"
"go.uber.org/zap"
@ -38,7 +39,9 @@ type Uplink struct {
Identity *identity.FullIdentity
Transport transport.Client
StorageNodeCount int
APIKey map[storj.NodeID]string
APIKey map[storj.NodeID]string
ProjectID map[storj.NodeID]uuid.UUID
}
// newUplinks creates initializes uplinks, requires peer to have at least one satellite
@ -73,6 +76,8 @@ func (planet *Planet) newUplink(name string, storageNodeCount int) (*Uplink, err
Log: planet.log.Named(name),
Identity: identity,
StorageNodeCount: storageNodeCount,
APIKey: map[storj.NodeID]string{},
ProjectID: map[storj.NodeID]uuid.UUID{},
}
uplink.Log.Debug("id=" + identity.ID.String())
@ -87,7 +92,6 @@ func (planet *Planet) newUplink(name string, storageNodeCount int) (*Uplink, err
},
}
apiKeys := make(map[storj.NodeID]string)
for j, satellite := range planet.Satellites {
// TODO: find a nicer way to do this
// populate satellites console with example
@ -123,10 +127,10 @@ func (planet *Planet) newUplink(name string, storageNodeCount int) (*Uplink, err
return nil, err
}
apiKeys[satellite.ID()] = key.Serialize()
uplink.APIKey[satellite.ID()] = key.Serialize()
uplink.ProjectID[satellite.ID()] = project.ID
}
uplink.APIKey = apiKeys
planet.uplinks = append(planet.uplinks, uplink)
return uplink, nil

View File

@ -33,22 +33,22 @@ func NewPathCollector(reservoirSlots int, r *rand.Rand) *PathCollector {
}
// RemoteSegment takes a remote segment found in metainfo and creates a reservoir for it if it doesn't exist already
func (collector *PathCollector) RemoteSegment(ctx context.Context, path storj.Path, pointer *pb.Pointer) (err error) {
func (collector *PathCollector) RemoteSegment(ctx context.Context, path metainfo.ScopedPath, pointer *pb.Pointer) (err error) {
for _, piece := range pointer.GetRemote().GetRemotePieces() {
if _, ok := collector.Reservoirs[piece.NodeId]; !ok {
collector.Reservoirs[piece.NodeId] = NewReservoir(collector.slotCount)
}
collector.Reservoirs[piece.NodeId].Sample(collector.rand, path)
collector.Reservoirs[piece.NodeId].Sample(collector.rand, path.Raw)
}
return nil
}
// RemoteObject returns nil because the audit service does not interact with remote objects
func (collector *PathCollector) RemoteObject(ctx context.Context, path storj.Path, pointer *pb.Pointer) (err error) {
func (collector *PathCollector) RemoteObject(ctx context.Context, path metainfo.ScopedPath, pointer *pb.Pointer) (err error) {
return nil
}
// InlineSegment returns nil because we're only auditing for storage nodes for now
func (collector *PathCollector) InlineSegment(ctx context.Context, path storj.Path, pointer *pb.Pointer) (err error) {
func (collector *PathCollector) InlineSegment(ctx context.Context, path metainfo.ScopedPath, pointer *pb.Pointer) (err error) {
return nil
}

View File

@ -44,8 +44,8 @@ func NewPieceTracker(log *zap.Logger, config Config, pieceCounts map[storj.NodeI
}
// RemoteSegment takes a remote segment found in metainfo and adds pieces to bloom filters
func (pieceTracker *PieceTracker) RemoteSegment(ctx context.Context, path storj.Path, pointer *pb.Pointer) (err error) {
defer mon.Task()(&ctx, path)(&err)
func (pieceTracker *PieceTracker) RemoteSegment(ctx context.Context, path metainfo.ScopedPath, pointer *pb.Pointer) (err error) {
defer mon.Task()(&ctx, path.Raw)(&err)
remote := pointer.GetRemote()
pieces := remote.GetRemotePieces()
@ -58,12 +58,12 @@ func (pieceTracker *PieceTracker) RemoteSegment(ctx context.Context, path storj.
}
// RemoteObject returns nil because gc does not interact with remote objects
func (pieceTracker *PieceTracker) RemoteObject(ctx context.Context, path storj.Path, pointer *pb.Pointer) (err error) {
func (pieceTracker *PieceTracker) RemoteObject(ctx context.Context, path metainfo.ScopedPath, pointer *pb.Pointer) (err error) {
return nil
}
// InlineSegment returns nil because we're only doing gc for storage nodes for now
func (pieceTracker *PieceTracker) InlineSegment(ctx context.Context, path storj.Path, pointer *pb.Pointer) (err error) {
func (pieceTracker *PieceTracker) InlineSegment(ctx context.Context, path metainfo.ScopedPath, pointer *pb.Pointer) (err error) {
return nil
}

View File

@ -8,6 +8,7 @@ import (
"time"
"github.com/gogo/protobuf/proto"
"github.com/skyrings/skyring-common/tools/uuid"
"github.com/zeebo/errs"
"storj.io/storj/pkg/pb"
@ -26,9 +27,21 @@ var (
//
// architecture: Observer
type Observer interface {
RemoteSegment(context.Context, storj.Path, *pb.Pointer) error
RemoteObject(context.Context, storj.Path, *pb.Pointer) error
InlineSegment(context.Context, storj.Path, *pb.Pointer) error
RemoteSegment(context.Context, ScopedPath, *pb.Pointer) error
RemoteObject(context.Context, ScopedPath, *pb.Pointer) error
InlineSegment(context.Context, ScopedPath, *pb.Pointer) error
}
// ScopedPath contains full expanded information about the path
type ScopedPath struct {
ProjectID uuid.UUID
ProjectIDString string
BucketName string
// TODO: should these be a []byte?
// Raw is the same path as pointerDB is using.
Raw storj.Path
}
type observerContext struct {
@ -163,7 +176,7 @@ waitformore:
// iterate over every segment in metainfo
for it.Next(ctx, &item) {
path := item.Key.String()
rawPath := item.Key.String()
pointer := &pb.Pointer{}
err = proto.Unmarshal(item.Value, pointer)
@ -171,10 +184,28 @@ waitformore:
return LoopError.New("unexpected error unmarshalling pointer %s", err)
}
nextObservers := observers[:0]
pathElements := storj.SplitPath(rawPath)
if len(pathElements) < 3 {
return LoopError.New("invalid path %q", rawPath)
}
isLastSegment := pathElements[1] == "l"
path := ScopedPath{
Raw: rawPath,
ProjectIDString: pathElements[0],
BucketName: pathElements[2],
}
projectID, err := uuid.Parse(path.ProjectIDString)
if err != nil {
return LoopError.Wrap(err)
}
path.ProjectID = *projectID
nextObservers := observers[:0]
for _, observer := range observers {
keepObserver := handlePointer(ctx, observer, path, pointer)
keepObserver := handlePointer(ctx, observer, path, isLastSegment, pointer)
if keepObserver {
nextObservers = append(nextObservers, observer)
}
@ -200,16 +231,13 @@ waitformore:
// handlePointer deals with a pointer for a single observer
// if there is some error on the observer, handle the error and return false. Otherwise, return true
func handlePointer(ctx context.Context, observer *observerContext, path storj.Path, pointer *pb.Pointer) bool {
pathElements := storj.SplitPath(path)
isLastSeg := len(pathElements) >= 2 && pathElements[1] == "l"
func handlePointer(ctx context.Context, observer *observerContext, path ScopedPath, isLastSegment bool, pointer *pb.Pointer) bool {
remote := pointer.GetRemote()
if remote != nil {
if observer.HandleError(observer.RemoteSegment(ctx, path, pointer)) {
return false
}
if isLastSeg {
if isLastSegment {
if observer.HandleError(observer.RemoteObject(ctx, path, pointer)) {
return false
}

View File

@ -22,12 +22,11 @@ import (
"storj.io/storj/internal/testplanet"
"storj.io/storj/internal/testrand"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/storj"
"storj.io/storj/satellite"
"storj.io/storj/satellite/metainfo"
)
// TestMetainfoLoop does the following
// TestLoop does the following
// * upload 5 remote files with 1 segment
// * (TODO) upload 3 remote files with 2 segments
// * upload 2 inline files
@ -38,7 +37,7 @@ import (
// - 5 remote segments
// - 2 inline files/segments
// - 7 unique path items
func TestMetainfoLoop(t *testing.T) {
func TestLoop(t *testing.T) {
// TODO: figure out how to configure testplanet so we can upload 2*segmentSize to get two segments
segmentSize := 8 * memory.KiB
@ -95,16 +94,21 @@ func TestMetainfoLoop(t *testing.T) {
err := group.Wait()
require.NoError(t, err)
projectID := ul.ProjectID[satellite.ID()]
for _, obs := range []*testObserver{obs1, obs2} {
assert.EqualValues(t, 5, obs.remoteSegCount)
assert.EqualValues(t, 5, obs.remoteFileCount)
assert.EqualValues(t, 2, obs.inlineSegCount)
assert.EqualValues(t, 7, len(obs.uniquePaths))
for _, path := range obs.uniquePaths {
assert.EqualValues(t, path.BucketName, "bucket")
assert.EqualValues(t, path.ProjectID, projectID)
}
}
})
}
// TestMetainfoLoopObserverCancel does the following:
// TestLoopObserverCancel does the following:
// * upload 3 remote segments
// * hook three observers up to metainfo loop
// * let observer 1 run normally
@ -112,7 +116,7 @@ func TestMetainfoLoop(t *testing.T) {
// * let observer 3's context be canceled
// * expect observer 1 to see all segments
// * expect observers 2 and 3 to finish with errors
func TestMetainfoLoopObserverCancel(t *testing.T) {
func TestLoopObserverCancel(t *testing.T) {
segmentSize := 8 * memory.KiB
testplanet.Run(t, testplanet.Config{
@ -190,13 +194,13 @@ func TestMetainfoLoopObserverCancel(t *testing.T) {
})
}
// TestMetainfoLoopCancel does the following:
// TestLoopCancel does the following:
// * upload 3 remote segments
// * hook two observers up to metainfo loop
// * cancel loop context partway through
// * expect both observers to exit with an error and see fewer than 3 remote segments
// * expect that a new observer attempting to join at this point receives a loop closed error
func TestMetainfoLoopCancel(t *testing.T) {
func TestLoopCancel(t *testing.T) {
segmentSize := 8 * memory.KiB
testplanet.Run(t, testplanet.Config{
@ -281,7 +285,7 @@ type testObserver struct {
remoteSegCount int
remoteFileCount int
inlineSegCount int
uniquePaths map[string]struct{}
uniquePaths map[string]metainfo.ScopedPath
onSegment func(context.Context) error // if set, run this during RemoteSegment()
}
@ -290,19 +294,19 @@ func newTestObserver(onSegment func(context.Context) error) *testObserver {
remoteSegCount: 0,
remoteFileCount: 0,
inlineSegCount: 0,
uniquePaths: make(map[string]struct{}),
uniquePaths: make(map[string]metainfo.ScopedPath),
onSegment: onSegment,
}
}
func (obs *testObserver) RemoteSegment(ctx context.Context, path storj.Path, pointer *pb.Pointer) error {
func (obs *testObserver) RemoteSegment(ctx context.Context, path metainfo.ScopedPath, pointer *pb.Pointer) error {
obs.remoteSegCount++
if _, ok := obs.uniquePaths[path]; ok {
if _, ok := obs.uniquePaths[path.Raw]; ok {
// TODO: collect the errors and check in test
panic("Expected unique path in observer.RemoteSegment")
}
obs.uniquePaths[path] = struct{}{}
obs.uniquePaths[path.Raw] = path
if obs.onSegment != nil {
return obs.onSegment(ctx)
@ -311,17 +315,17 @@ func (obs *testObserver) RemoteSegment(ctx context.Context, path storj.Path, poi
return nil
}
func (obs *testObserver) RemoteObject(ctx context.Context, path storj.Path, pointer *pb.Pointer) error {
func (obs *testObserver) RemoteObject(ctx context.Context, path metainfo.ScopedPath, pointer *pb.Pointer) error {
obs.remoteFileCount++
return nil
}
func (obs *testObserver) InlineSegment(ctx context.Context, path storj.Path, pointer *pb.Pointer) error {
func (obs *testObserver) InlineSegment(ctx context.Context, path metainfo.ScopedPath, pointer *pb.Pointer) error {
obs.inlineSegCount++
if _, ok := obs.uniquePaths[path]; ok {
if _, ok := obs.uniquePaths[path.Raw]; ok {
// TODO: collect the errors and check in test
panic("Expected unique path in observer.InlineSegment")
}
obs.uniquePaths[path] = struct{}{}
obs.uniquePaths[path.Raw] = path
return nil
}

View File

@ -223,7 +223,7 @@ type checkerObserver struct {
log *zap.Logger
}
func (obs *checkerObserver) RemoteSegment(ctx context.Context, path storj.Path, pointer *pb.Pointer) (err error) {
func (obs *checkerObserver) RemoteSegment(ctx context.Context, path metainfo.ScopedPath, pointer *pb.Pointer) (err error) {
defer mon.Task()(&ctx)(&err)
obs.monStats.remoteSegmentsChecked++
@ -252,7 +252,7 @@ func (obs *checkerObserver) RemoteSegment(ctx context.Context, path storj.Path,
if numHealthy >= redundancy.MinReq && numHealthy <= redundancy.RepairThreshold && numHealthy < redundancy.SuccessThreshold {
if len(missingPieces) == 0 {
obs.log.Error("Missing pieces is zero in checker, but this should be impossible -- bad redundancy scheme:",
zap.String("path", path),
zap.String("path", path.Raw),
zap.Int32("min", redundancy.MinReq),
zap.Int32("repair", redundancy.RepairThreshold),
zap.Int32("success", redundancy.SuccessThreshold),
@ -261,7 +261,7 @@ func (obs *checkerObserver) RemoteSegment(ctx context.Context, path storj.Path,
}
obs.monStats.remoteSegmentsNeedingRepair++
err = obs.repairQueue.Insert(ctx, &pb.InjuredSegment{
Path: []byte(path),
Path: []byte(path.Raw),
LostPieces: missingPieces,
InsertedTime: time.Now().UTC(),
})
@ -271,19 +271,22 @@ func (obs *checkerObserver) RemoteSegment(ctx context.Context, path storj.Path,
}
// delete always returns nil when something was deleted and also when element didn't exists
err = obs.irrdb.Delete(ctx, []byte(path))
err = obs.irrdb.Delete(ctx, []byte(path.Raw))
if err != nil {
obs.log.Error("error deleting entry from irreparable db", zap.Error(err))
return nil
}
} else if numHealthy < redundancy.MinReq && numHealthy < redundancy.RepairThreshold {
pathElements := storj.SplitPath(path)
// TODO: see whether this can be handled with metainfo.ScopedPath
pathElements := storj.SplitPath(path.Raw)
// check to make sure there are at least *4* path elements. the first three
// are project, segment, and bucket name, but we want to make sure we're talking
// about an actual object, and that there's an object name specified
if len(pathElements) >= 4 {
project, bucketName, segmentpath := pathElements[0], pathElements[2], pathElements[3]
// TODO: is this correct? split splits all path components, but it's only using the third.
lostSegInfo := storj.JoinPaths(project, bucketName, segmentpath)
if !contains(obs.monStats.remoteSegmentInfo, lostSegInfo) {
obs.monStats.remoteSegmentInfo = append(obs.monStats.remoteSegmentInfo, lostSegInfo)
@ -293,7 +296,7 @@ func (obs *checkerObserver) RemoteSegment(ctx context.Context, path storj.Path,
obs.monStats.remoteSegmentsLost++
// make an entry into the irreparable table
segmentInfo := &pb.IrreparableSegment{
Path: []byte(path),
Path: []byte(path.Raw),
SegmentDetail: pointer,
LostPieces: int32(len(missingPieces)),
LastRepairAttempt: time.Now().Unix(),
@ -311,7 +314,7 @@ func (obs *checkerObserver) RemoteSegment(ctx context.Context, path storj.Path,
return nil
}
func (obs *checkerObserver) RemoteObject(ctx context.Context, path storj.Path, pointer *pb.Pointer) (err error) {
func (obs *checkerObserver) RemoteObject(ctx context.Context, path metainfo.ScopedPath, pointer *pb.Pointer) (err error) {
defer mon.Task()(&ctx)(&err)
obs.monStats.remoteFilesChecked++
@ -319,7 +322,7 @@ func (obs *checkerObserver) RemoteObject(ctx context.Context, path storj.Path, p
return nil
}
func (obs *checkerObserver) InlineSegment(ctx context.Context, path storj.Path, pointer *pb.Pointer) (err error) {
func (obs *checkerObserver) InlineSegment(ctx context.Context, path metainfo.ScopedPath, pointer *pb.Pointer) (err error) {
defer mon.Task()(&ctx)(&err)
return nil
}

View File

@ -13,7 +13,7 @@ import (
"storj.io/storj/internal/testcontext"
"storj.io/storj/internal/testplanet"
"storj.io/storj/internal/teststorj"
"storj.io/storj/internal/testrand"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/storj"
"storj.io/storj/storage"
@ -26,15 +26,18 @@ func TestIdentifyInjuredSegments(t *testing.T) {
checker := planet.Satellites[0].Repair.Checker
checker.Loop.Stop()
projectID := testrand.UUID()
pointerPathPrefix := storj.JoinPaths(projectID.String(), "l", "bucket") + "/"
//add noise to metainfo before bad record
for x := 0; x < 10; x++ {
makePointer(t, planet, fmt.Sprintf("a-%d", x), false)
makePointer(t, planet, pointerPathPrefix+fmt.Sprintf("a-%d", x), false)
}
//create piece that needs repair
makePointer(t, planet, fmt.Sprintf("b"), true)
makePointer(t, planet, pointerPathPrefix+"b", true)
//add more noise to metainfo after bad record
for x := 0; x < 10; x++ {
makePointer(t, planet, fmt.Sprintf("c-%d", x), false)
makePointer(t, planet, pointerPathPrefix+fmt.Sprintf("c-%d", x), false)
}
err := checker.IdentifyInjuredSegments(ctx)
require.NoError(t, err)
@ -47,13 +50,13 @@ func TestIdentifyInjuredSegments(t *testing.T) {
require.NoError(t, err)
numValidNode := int32(len(planet.StorageNodes))
require.Equal(t, []byte("b"), injuredSegment.Path)
require.Equal(t, []byte(pointerPathPrefix+"b"), injuredSegment.Path)
require.Equal(t, len(planet.StorageNodes), len(injuredSegment.LostPieces))
for _, lostPiece := range injuredSegment.LostPieces {
// makePointer() starts with numValidNode good pieces
require.True(t, lostPiece >= numValidNode, fmt.Sprintf("%d >= %d \n", lostPiece, numValidNode))
require.True(t, lostPiece >= numValidNode, pointerPathPrefix+fmt.Sprintf("%d >= %d \n", lostPiece, numValidNode))
// makePointer() than has numValidNode bad pieces
require.True(t, lostPiece < numValidNode*2, fmt.Sprintf("%d < %d \n", lostPiece, numValidNode*2))
require.True(t, lostPiece < numValidNode*2, pointerPathPrefix+fmt.Sprintf("%d < %d \n", lostPiece, numValidNode*2))
}
})
}
@ -86,6 +89,10 @@ func TestIdentifyIrreparableSegments(t *testing.T) {
expectedLostPieces[int32(i)] = true
}
projectID := testrand.UUID()
pointerPath := storj.JoinPaths(projectID.String(), "l", "bucket", "piece")
pieceID := testrand.PieceID()
// when number of healthy piece is less than minimum required number of piece in redundancy,
// the piece is considered irreparable and will be put into irreparable DB
pointer := &pb.Pointer{
@ -97,14 +104,14 @@ func TestIdentifyIrreparableSegments(t *testing.T) {
SuccessThreshold: int32(9),
Total: int32(10),
},
RootPieceId: teststorj.PieceIDFromString("fake-piece-id"),
RootPieceId: pieceID,
RemotePieces: pieces,
},
}
// put test pointer to db
metainfo := planet.Satellites[0].Metainfo.Service
err := metainfo.Put(ctx, "fake-piece-id", pointer)
err := metainfo.Put(ctx, pointerPath, pointer)
require.NoError(t, err)
err = checker.IdentifyInjuredSegments(ctx)
@ -117,7 +124,7 @@ func TestIdentifyIrreparableSegments(t *testing.T) {
//check if the expected segments were added to the irreparable DB
irreparable := planet.Satellites[0].DB.Irreparable()
remoteSegmentInfo, err := irreparable.Get(ctx, []byte("fake-piece-id"))
remoteSegmentInfo, err := irreparable.Get(ctx, []byte(pointerPath))
require.NoError(t, err)
require.Equal(t, len(expectedLostPieces), int(remoteSegmentInfo.LostPieces))
@ -129,7 +136,7 @@ func TestIdentifyIrreparableSegments(t *testing.T) {
err = checker.IdentifyInjuredSegments(ctx)
require.NoError(t, err)
remoteSegmentInfo, err = irreparable.Get(ctx, []byte("fake-piece-id"))
remoteSegmentInfo, err = irreparable.Get(ctx, []byte(pointerPath))
require.NoError(t, err)
require.Equal(t, len(expectedLostPieces), int(remoteSegmentInfo.LostPieces))
@ -147,25 +154,25 @@ func TestIdentifyIrreparableSegments(t *testing.T) {
SuccessThreshold: int32(9),
Total: int32(10),
},
RootPieceId: teststorj.PieceIDFromString("fake-piece-id"),
RootPieceId: pieceID,
RemotePieces: pieces,
},
}
// update test pointer in db
err = metainfo.Delete(ctx, "fake-piece-id")
err = metainfo.Delete(ctx, pointerPath)
require.NoError(t, err)
err = metainfo.Put(ctx, "fake-piece-id", pointer)
err = metainfo.Put(ctx, pointerPath, pointer)
require.NoError(t, err)
err = checker.IdentifyInjuredSegments(ctx)
require.NoError(t, err)
_, err = irreparable.Get(ctx, []byte("fake-piece-id"))
_, err = irreparable.Get(ctx, []byte(pointerPath))
require.Error(t, err)
})
}
func makePointer(t *testing.T, planet *testplanet.Planet, pieceID string, createLost bool) {
func makePointer(t *testing.T, planet *testplanet.Planet, pointerPath string, createLost bool) {
ctx := context.TODO()
numOfStorageNodes := len(planet.StorageNodes)
pieces := make([]*pb.RemotePiece, 0, numOfStorageNodes)
@ -198,12 +205,12 @@ func makePointer(t *testing.T, planet *testplanet.Planet, pieceID string, create
SuccessThreshold: int32(repairThreshold) + 1,
Total: int32(repairThreshold) + 2,
},
RootPieceId: teststorj.PieceIDFromString(pieceID),
RootPieceId: testrand.PieceID(),
RemotePieces: pieces,
},
}
// put test pointer to db
pointerdb := planet.Satellites[0].Metainfo.Service
err := pointerdb.Put(ctx, pieceID, pointer)
err := pointerdb.Put(ctx, pointerPath, pointer)
require.NoError(t, err)
}