satellite/metainfo: migrate GetObjectIP endpoint method to metabase

Change-Id: I0da346fc2ce84787a09d2bb7c4ffc02a5d8a3b2f
This commit is contained in:
Michal Niewrzal 2020-12-07 16:43:57 +01:00
parent bc3be9c416
commit fb5244e8f1

View File

@ -1039,53 +1039,43 @@ func (endpoint *Endpoint) GetObjectIPs(ctx context.Context, req *pb.ObjectGetIPs
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
}
lastPointer, _, err := endpoint.getPointer(ctx, keyInfo.ProjectID, metabase.LastSegmentIndex, req.Bucket, req.EncryptedPath)
// TODO we may need custom metabase request to avoid two DB calls
object, err := endpoint.metainfo.metabaseDB.GetObjectLatestVersion(ctx, metabase.GetObjectLatestVersion{
ObjectLocation: metabase.ObjectLocation{
ProjectID: keyInfo.ProjectID,
BucketName: string(req.Bucket),
ObjectKey: metabase.ObjectKey(req.EncryptedPath),
},
})
if err != nil {
// endpoint.getPointer already returns valid rpcstatus errors
return nil, err
}
var nodeIDs []storj.NodeID
addPointerToNodeIDs := func(pointer *pb.Pointer) {
if pointer.Remote != nil {
for _, piece := range pointer.Remote.RemotePieces {
nodeIDs = append(nodeIDs, piece.NodeId)
}
if storj.ErrObjectNotFound.Has(err) {
return nil, rpcstatus.Error(rpcstatus.NotFound, err.Error())
}
}
addPointerToNodeIDs(lastPointer)
streamMeta := &pb.StreamMeta{}
err = pb.Unmarshal(lastPointer.Metadata, streamMeta)
if err != nil {
endpoint.log.Error("internal", zap.Error(err))
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
numSegmentsKnown := streamMeta.NumberOfSegments > 0
numberOfSegmentsToFetch := int(streamMeta.NumberOfSegments) - 1 // remove last segment since we've already fetch manually
more := true
cursor := metabase.SegmentPosition{}
// If we do not know the number of segments from the streamMeta, we want to
// continue to run this loop until it cannot find another segment (the
// break condition in the loop).
//
// If we do know the number of segments, we want to run the loop as long as
// the numberOfSegmentsToFetch is > 0 and until we have fetched that many
// segments.
for i := metabase.FirstSegmentIndex; !numSegmentsKnown || (numSegmentsKnown && numberOfSegmentsToFetch > 0 && i < numberOfSegmentsToFetch); i++ {
location, err := CreatePath(ctx, keyInfo.ProjectID, int64(i), req.Bucket, req.EncryptedPath)
var nodeIDs []storj.NodeID
for more {
list, err := endpoint.metainfo.metabaseDB.ListSegments(ctx, metabase.ListSegments{
StreamID: object.StreamID,
Cursor: cursor,
})
if err != nil {
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
}
pointer, err := endpoint.metainfo.Get(ctx, location.Encode())
if err != nil {
if storj.ErrObjectNotFound.Has(err) {
break
}
endpoint.log.Error("internal", zap.Error(err))
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
addPointerToNodeIDs(pointer)
for _, segment := range list.Segments {
for _, piece := range segment.Pieces {
nodeIDs = append(nodeIDs, piece.StorageNode)
}
cursor = segment.Position
}
more = list.More
}
nodes, err := endpoint.overlay.GetOnlineNodesForGetDelete(ctx, nodeIDs)
@ -1670,29 +1660,6 @@ func (endpoint *Endpoint) DownloadSegment(ctx context.Context, req *pb.SegmentDo
}, nil
}
// getPointer returns the pointer and the segment path projectID, bucket and
// encryptedPath. It returns an error with a specific RPC status.
func (endpoint *Endpoint) getPointer(
ctx context.Context, projectID uuid.UUID, segmentIndex int64, bucket, encryptedPath []byte,
) (pointer *pb.Pointer, location metabase.SegmentLocation, err error) {
defer mon.Task()(&ctx, projectID.String(), segmentIndex, bucket, encryptedPath)(&err)
location, err = CreatePath(ctx, projectID, segmentIndex, bucket, encryptedPath)
if err != nil {
return nil, location, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
}
pointer, err = endpoint.metainfo.Get(ctx, location.Encode())
if err != nil {
if storj.ErrObjectNotFound.Has(err) {
return nil, location, rpcstatus.Error(rpcstatus.NotFound, err.Error())
}
endpoint.log.Error("error getting the pointer from metainfo service", zap.Error(err))
return nil, location, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
return pointer, location, nil
}
// sortLimits sorts order limits and fill missing ones with nil values.
func sortLimits(limits []*pb.AddressedOrderLimit, segment metabase.Segment) []*pb.AddressedOrderLimit {
sorted := make([]*pb.AddressedOrderLimit, segment.Redundancy.TotalShares)