satellite/inspector: migrate to metabase

Change-Id: Ibfc12bf9bce0f9f065f4859a6818e5c18bbd526a
This commit is contained in:
Fadila Khadar 2020-12-16 16:25:27 +01:00 committed by Michal Niewrzal
parent 8d3ea9c251
commit c705237beb
3 changed files with 166 additions and 118 deletions

View File

@ -460,7 +460,7 @@ func NewAPI(log *zap.Logger, full *identity.FullIdentity, db DB,
peer.Inspector.Endpoint = inspector.NewEndpoint(
peer.Log.Named("inspector"),
peer.Overlay.Service,
peer.Metainfo.Service,
peer.Metainfo.Metabase,
)
if err := internalpb.DRPCRegisterHealthInspector(peer.Server.PrivateDRPC(), peer.Inspector.Endpoint); err != nil {
return nil, errs.Combine(err, peer.Close())

View File

@ -5,7 +5,7 @@ package inspector
import (
"context"
"strconv"
"encoding/binary"
"github.com/spacemonkeygo/monkit/v3"
"github.com/zeebo/errs"
@ -26,23 +26,21 @@ var (
Error = errs.Class("Endpoint error")
)
const lastSegmentIndex = int64(-1)
// Endpoint for checking object and segment health.
//
// architecture: Endpoint
type Endpoint struct {
log *zap.Logger
overlay *overlay.Service
metainfo *metainfo.Service
log *zap.Logger
overlay *overlay.Service
metabaseDB metainfo.MetabaseDB
}
// NewEndpoint will initialize an Endpoint struct.
func NewEndpoint(log *zap.Logger, cache *overlay.Service, metainfo *metainfo.Service) *Endpoint {
func NewEndpoint(log *zap.Logger, cache *overlay.Service, metabaseDB metainfo.MetabaseDB) *Endpoint {
return &Endpoint{
log: log,
overlay: cache,
metainfo: metainfo,
log: log,
overlay: cache,
metabaseDB: metabaseDB,
}
}
@ -53,56 +51,53 @@ func (endpoint *Endpoint) ObjectHealth(ctx context.Context, in *internalpb.Objec
var segmentHealthResponses []*internalpb.SegmentHealth
var redundancy *pb.RedundancyScheme
limit := int64(100)
limit := int(100)
if in.GetLimit() > 0 {
limit = int64(in.GetLimit())
limit = int(in.GetLimit())
}
var start int64
var startPosition metabase.SegmentPosition
if in.GetStartAfterSegment() > 0 {
start = in.GetStartAfterSegment() + 1
startPosition = metabase.SegmentPositionFromEncoded(uint64(in.GetStartAfterSegment()))
}
end := limit + start
if in.GetEndBeforeSegment() > 0 {
end = in.GetEndBeforeSegment()
projectID, err := uuid.FromBytes(in.GetProjectId())
if err != nil {
return nil, Error.Wrap(err)
}
bucket := in.GetBucket()
encryptedPath := in.GetEncryptedPath()
projectID := in.GetProjectId()
objectLocation := metabase.ObjectLocation{
ProjectID: projectID,
BucketName: string(in.GetBucket()),
ObjectKey: metabase.ObjectKey(in.GetEncryptedPath()),
}
// TODO add version field to ObjectHealthRequest?
object, err := endpoint.metabaseDB.GetObjectLatestVersion(ctx, metabase.GetObjectLatestVersion{
ObjectLocation: objectLocation,
})
if err != nil {
return nil, Error.Wrap(err)
}
segmentIndex := start
for segmentIndex < end {
if segmentIndex-start >= limit {
break
}
listResult, err := endpoint.metabaseDB.ListSegments(ctx, metabase.ListSegments{
StreamID: object.StreamID,
Cursor: startPosition,
Limit: limit,
})
if err != nil {
return nil, Error.Wrap(err)
}
segment := &internalpb.SegmentHealthRequest{
Bucket: bucket,
EncryptedPath: encryptedPath,
SegmentIndex: segmentIndex,
ProjectId: projectID,
}
segmentHealth, err := endpoint.SegmentHealth(ctx, segment)
if err != nil {
if segmentIndex == lastSegmentIndex {
for _, segment := range listResult.Segments {
if !segment.Inline() {
segmentHealth, err := endpoint.segmentHealth(ctx, segment)
if err != nil {
return nil, Error.Wrap(err)
}
segmentIndex = lastSegmentIndex
continue
segmentHealthResponses = append(segmentHealthResponses, segmentHealth.GetHealth())
redundancy = segmentHealth.GetRedundancy()
}
segmentHealthResponses = append(segmentHealthResponses, segmentHealth.GetHealth())
redundancy = segmentHealth.GetRedundancy()
if segmentIndex == lastSegmentIndex {
break
}
segmentIndex++
}
return &internalpb.ObjectHealthResponse{
@ -112,33 +107,48 @@ func (endpoint *Endpoint) ObjectHealth(ctx context.Context, in *internalpb.Objec
}
// SegmentHealth will check the health of a segment.
func (endpoint *Endpoint) SegmentHealth(ctx context.Context, in *internalpb.SegmentHealthRequest) (resp *internalpb.SegmentHealthResponse, err error) {
func (endpoint *Endpoint) SegmentHealth(ctx context.Context, in *internalpb.SegmentHealthRequest) (_ *internalpb.SegmentHealthResponse, err error) {
defer mon.Task()(&ctx)(&err)
health := &internalpb.SegmentHealth{}
projectID, err := uuid.FromString(string(in.GetProjectId()))
projectID, err := uuid.FromBytes(in.GetProjectId())
if err != nil {
return nil, Error.Wrap(err)
}
location, err := metainfo.CreatePath(ctx, projectID, uint32(in.GetSegmentIndex()), in.GetBucket(), in.GetEncryptedPath())
objectLocation := metabase.ObjectLocation{
ProjectID: projectID,
BucketName: string(in.GetBucket()),
ObjectKey: metabase.ObjectKey(in.GetEncryptedPath()),
}
object, err := endpoint.metabaseDB.GetObjectLatestVersion(ctx, metabase.GetObjectLatestVersion{
ObjectLocation: objectLocation,
})
if err != nil {
return nil, Error.Wrap(err)
}
pointer, err := endpoint.metainfo.Get(ctx, location.Encode())
segment, err := endpoint.metabaseDB.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
StreamID: object.StreamID,
Position: metabase.SegmentPositionFromEncoded(uint64(in.GetSegmentIndex())),
})
if err != nil {
return nil, Error.Wrap(err)
}
if pointer.GetType() != pb.Pointer_REMOTE {
if segment.Inline() {
return nil, Error.New("cannot check health of inline segment")
}
return endpoint.segmentHealth(ctx, segment)
}
func (endpoint *Endpoint) segmentHealth(ctx context.Context, segment metabase.Segment) (_ *internalpb.SegmentHealthResponse, err error) {
health := &internalpb.SegmentHealth{}
var nodeIDs storj.NodeIDList
for _, piece := range pointer.GetRemote().GetRemotePieces() {
nodeIDs = append(nodeIDs, piece.NodeId)
for _, piece := range segment.Pieces {
nodeIDs = append(nodeIDs, piece.StorageNode)
}
unreliableOrOfflineNodes, err := endpoint.overlay.KnownUnreliableOrOffline(ctx, nodeIDs)
@ -160,6 +170,13 @@ func (endpoint *Endpoint) SegmentHealth(ctx context.Context, in *internalpb.Segm
unreliableOfflineMap[id] = true
}
redundancy := &pb.RedundancyScheme{
MinReq: int32(segment.Redundancy.RequiredShares),
RepairThreshold: int32(segment.Redundancy.RepairShares),
SuccessThreshold: int32(segment.Redundancy.OptimalShares),
Total: int32(segment.Redundancy.TotalShares),
}
var healthyNodes storj.NodeIDList
var unhealthyNodes storj.NodeIDList
for _, id := range nodeIDs {
@ -176,14 +193,12 @@ func (endpoint *Endpoint) SegmentHealth(ctx context.Context, in *internalpb.Segm
health.UnhealthyIds = unhealthyNodes
health.OfflineIds = offlineNodes
if uint32(in.GetSegmentIndex()) == metabase.LastSegmentIndex {
health.Segment = []byte("l")
} else {
health.Segment = []byte("s" + strconv.FormatInt(in.GetSegmentIndex(), 10))
}
health.Segment = make([]byte, 8)
binary.LittleEndian.PutUint64(health.Segment, segment.Position.Encode())
return &internalpb.SegmentHealthResponse{
Health: health,
Redundancy: pointer.GetRemote().GetRedundancy(),
Redundancy: redundancy,
}, nil
}

View File

@ -4,21 +4,23 @@
package inspector_test
import (
"bytes"
"context"
"fmt"
"strings"
"encoding/binary"
"errors"
"testing"
"github.com/btcsuite/btcutil/base58"
"github.com/stretchr/testify/require"
"storj.io/common/encryption"
"storj.io/common/memory"
"storj.io/common/paths"
"storj.io/common/pb"
"storj.io/common/storj"
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/storj/private/testplanet"
"storj.io/storj/satellite/internalpb"
"storj.io/storj/storage"
"storj.io/storj/satellite/metainfo/metabase"
"storj.io/uplink/private/eestream"
)
@ -26,75 +28,106 @@ func TestInspectorStats(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 6, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
uplink := planet.Uplinks[0]
satellite := planet.Satellites[0]
upl := planet.Uplinks[0]
testData := testrand.Bytes(1 * memory.MiB)
bucket := "testbucket"
projectID := upl.Projects[0].ID
err := uplink.Upload(ctx, planet.Satellites[0], bucket, "test/path", testData)
err := upl.Upload(ctx, planet.Satellites[0], bucket, "test/path", testData)
require.NoError(t, err)
healthEndpoint := planet.Satellites[0].Inspector.Endpoint
// Get path of random segment we just uploaded and check the health
_ = planet.Satellites[0].Metainfo.Database.Iterate(ctx, storage.IterateOptions{Recurse: true},
func(ctx context.Context, it storage.Iterator) error {
var item storage.ListItem
for it.Next(ctx, &item) {
if bytes.Contains(item.Key, []byte(fmt.Sprintf("%s/", bucket))) {
break
}
}
access := upl.Access[satellite.ID()]
serializedAccess, err := access.Serialize()
require.NoError(t, err)
fullPath := storj.SplitPath(item.Key.String())
require.Falsef(t, len(fullPath) < 4, "Could not retrieve a full path from pointerdb")
store, err := encryptionAccess(serializedAccess)
require.NoError(t, err)
projectID := fullPath[0]
bucket := fullPath[2]
encryptedPath := strings.Join(fullPath[3:], "/")
encryptedPath, err := encryption.EncryptPathWithStoreCipher(bucket, paths.NewUnencrypted("test/path"), store)
require.NoError(t, err)
{ // Test Segment Health Request
req := &internalpb.SegmentHealthRequest{
ProjectId: []byte(projectID),
EncryptedPath: []byte(encryptedPath),
Bucket: []byte(bucket),
SegmentIndex: -1,
}
objectLocation := metabase.ObjectLocation{
ProjectID: projectID,
BucketName: "testbucket",
ObjectKey: metabase.ObjectKey(encryptedPath.Raw()),
}
resp, err := healthEndpoint.SegmentHealth(ctx, req)
require.NoError(t, err)
segment, err := satellite.Metainfo.Metabase.GetLatestObjectLastSegment(ctx, metabase.GetLatestObjectLastSegment{
ObjectLocation: objectLocation,
})
require.NoError(t, err)
redundancy, err := eestream.NewRedundancyStrategyFromProto(resp.GetRedundancy())
require.NoError(t, err)
{ // Test Segment Health Request
req := &internalpb.SegmentHealthRequest{
ProjectId: projectID[:],
EncryptedPath: []byte(encryptedPath.Raw()),
Bucket: []byte(bucket),
SegmentIndex: int64(segment.Position.Encode()),
}
require.Equal(t, 4, redundancy.TotalCount())
require.True(t, bytes.Equal([]byte("l"), resp.GetHealth().GetSegment()))
}
resp, err := healthEndpoint.SegmentHealth(ctx, req)
require.NoError(t, err)
{ // Test Object Health Request
objectHealthReq := &internalpb.ObjectHealthRequest{
ProjectId: []byte(projectID),
EncryptedPath: []byte(encryptedPath),
Bucket: []byte(bucket),
StartAfterSegment: 0,
EndBeforeSegment: 0,
Limit: 0,
}
resp, err := healthEndpoint.ObjectHealth(ctx, objectHealthReq)
require.NoError(t, err)
redundancy, err := eestream.NewRedundancyStrategyFromProto(resp.GetRedundancy())
require.NoError(t, err)
segments := resp.GetSegments()
require.Len(t, segments, 1)
require.Equal(t, 4, redundancy.TotalCount())
encodedPosition := binary.LittleEndian.Uint64(resp.GetHealth().GetSegment())
position := metabase.SegmentPositionFromEncoded(encodedPosition)
require.Equal(t, segment.Position, position)
}
redundancy, err := eestream.NewRedundancyStrategyFromProto(resp.GetRedundancy())
require.NoError(t, err)
{ // Test Object Health Request
objectHealthReq := &internalpb.ObjectHealthRequest{
ProjectId: projectID[:],
EncryptedPath: []byte(encryptedPath.Raw()),
Bucket: []byte(bucket),
StartAfterSegment: 0,
EndBeforeSegment: 0,
Limit: 0,
}
resp, err := healthEndpoint.ObjectHealth(ctx, objectHealthReq)
require.NoError(t, err)
require.Equal(t, 4, redundancy.TotalCount())
require.True(t, bytes.Equal([]byte("l"), segments[0].GetSegment()))
}
segments := resp.GetSegments()
require.Len(t, segments, 1)
redundancy, err := eestream.NewRedundancyStrategyFromProto(resp.GetRedundancy())
require.NoError(t, err)
require.Equal(t, 4, redundancy.TotalCount())
encodedPosition := binary.LittleEndian.Uint64(segments[0].GetSegment())
position := metabase.SegmentPositionFromEncoded(encodedPosition)
require.Equal(t, segment.Position, position)
}
return nil
},
)
})
}
func encryptionAccess(access string) (*encryption.Store, error) {
data, version, err := base58.CheckDecode(access)
if err != nil || version != 0 {
return nil, errors.New("invalid access grant format")
}
p := new(pb.Scope)
if err := pb.Unmarshal(data, p); err != nil {
return nil, err
}
key, err := storj.NewKey(p.EncryptionAccess.DefaultKey)
if err != nil {
return nil, err
}
store := encryption.NewStore()
store.SetDefaultKey(key)
store.SetDefaultPathCipher(storj.EncAESGCM)
return store, nil
}