satellite/metainfo: endpoint cleanup

Two small cleanups:
* merging private commitObject, commitSegment,
makeInlineSegment with its public versions. We were
using it when pb.Pointer was still used.
* removing unused CreatePath method

Change-Id: Ib18b07473d91259335dab874559ef52412ab813d
This commit is contained in:
Michał Niewrzał 2021-08-23 13:17:40 +02:00 committed by Michal Niewrzal
parent c29734ef64
commit 5c91ecd271
2 changed files with 27 additions and 60 deletions

View File

@ -98,7 +98,7 @@ func (endpoint *Endpoint) Batch(ctx context.Context, req *pb.BatchRequest) (resp
var err error var err error
switch { switch {
case prevSegmentReq.GetSegmentMakeInline() != nil: case prevSegmentReq.GetSegmentMakeInline() != nil:
pointer, segmentResp, segmentErr := endpoint.makeInlineSegment(ctx, prevSegmentReq.GetSegmentMakeInline(), false) segmentResp, segmentErr := endpoint.MakeInlineSegment(ctx, prevSegmentReq.GetSegmentMakeInline())
prevSegmentReq = nil prevSegmentReq = nil
if segmentErr != nil { if segmentErr != nil {
return resp, segmentErr return resp, segmentErr
@ -109,9 +109,9 @@ func (endpoint *Endpoint) Batch(ctx context.Context, req *pb.BatchRequest) (resp
SegmentMakeInline: segmentResp, SegmentMakeInline: segmentResp,
}, },
}) })
response, err = endpoint.commitObject(ctx, singleRequest.ObjectCommit, pointer) response, err = endpoint.CommitObject(ctx, singleRequest.ObjectCommit)
case prevSegmentReq.GetSegmentCommit() != nil: case prevSegmentReq.GetSegmentCommit() != nil:
pointer, segmentResp, segmentErr := endpoint.commitSegment(ctx, prevSegmentReq.GetSegmentCommit(), false) segmentResp, segmentErr := endpoint.CommitSegment(ctx, prevSegmentReq.GetSegmentCommit())
prevSegmentReq = nil prevSegmentReq = nil
if segmentErr != nil { if segmentErr != nil {
return resp, segmentErr return resp, segmentErr
@ -122,7 +122,7 @@ func (endpoint *Endpoint) Batch(ctx context.Context, req *pb.BatchRequest) (resp
SegmentCommit: segmentResp, SegmentCommit: segmentResp,
}, },
}) })
response, err = endpoint.commitObject(ctx, singleRequest.ObjectCommit, pointer) response, err = endpoint.CommitObject(ctx, singleRequest.ObjectCommit)
default: default:
response, err = endpoint.CommitObject(ctx, singleRequest.ObjectCommit) response, err = endpoint.CommitObject(ctx, singleRequest.ObjectCommit)
} }

View File

@ -707,12 +707,6 @@ func (endpoint *Endpoint) CommitObject(ctx context.Context, req *pb.ObjectCommit
endpoint.log.Warn("unable to collect uplink version", zap.Error(err)) endpoint.log.Warn("unable to collect uplink version", zap.Error(err))
} }
return endpoint.commitObject(ctx, req, nil)
}
func (endpoint *Endpoint) commitObject(ctx context.Context, req *pb.ObjectCommitRequest, pointer *pb.Pointer) (resp *pb.ObjectCommitResponse, err error) {
defer mon.Task()(&ctx)(&err)
streamID, err := endpoint.unmarshalSatStreamID(ctx, req.StreamId) streamID, err := endpoint.unmarshalSatStreamID(ctx, req.StreamId)
if err != nil { if err != nil {
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error()) return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
@ -1749,16 +1743,9 @@ func (endpoint *Endpoint) CommitSegment(ctx context.Context, req *pb.SegmentComm
endpoint.log.Warn("unable to collect uplink version", zap.Error(err)) endpoint.log.Warn("unable to collect uplink version", zap.Error(err))
} }
_, resp, err = endpoint.commitSegment(ctx, req, true)
return resp, err
}
func (endpoint *Endpoint) commitSegment(ctx context.Context, req *pb.SegmentCommitRequest, savePointer bool) (_ *pb.Pointer, resp *pb.SegmentCommitResponse, err error) {
defer mon.Task()(&ctx)(&err)
segmentID, err := endpoint.unmarshalSatSegmentID(ctx, req.SegmentId) segmentID, err := endpoint.unmarshalSatSegmentID(ctx, req.SegmentId)
if err != nil { if err != nil {
return nil, nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error()) return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
} }
streamID := segmentID.StreamId streamID := segmentID.StreamId
@ -1770,7 +1757,7 @@ func (endpoint *Endpoint) commitSegment(ctx context.Context, req *pb.SegmentComm
Time: time.Now(), Time: time.Now(),
}) })
if err != nil { if err != nil {
return nil, nil, err return nil, err
} }
// cheap basic verification // cheap basic verification
@ -1781,7 +1768,7 @@ func (endpoint *Endpoint) commitSegment(ctx context.Context, req *pb.SegmentComm
zap.Int32("redundancy optimal threshold", streamID.Redundancy.GetSuccessThreshold()), zap.Int32("redundancy optimal threshold", streamID.Redundancy.GetSuccessThreshold()),
zap.Stringer("Segment ID", req.SegmentId), zap.Stringer("Segment ID", req.SegmentId),
) )
return nil, nil, rpcstatus.Errorf(rpcstatus.InvalidArgument, return nil, rpcstatus.Errorf(rpcstatus.InvalidArgument,
"the number of results of uploaded pieces (%d) is below the optimal threshold (%d)", "the number of results of uploaded pieces (%d) is below the optimal threshold (%d)",
numResults, streamID.Redundancy.GetSuccessThreshold(), numResults, streamID.Redundancy.GetSuccessThreshold(),
) )
@ -1799,7 +1786,7 @@ func (endpoint *Endpoint) commitSegment(ctx context.Context, req *pb.SegmentComm
err = endpoint.pointerVerification.VerifySizes(ctx, rs, req.SizeEncryptedData, req.UploadResult) err = endpoint.pointerVerification.VerifySizes(ctx, rs, req.SizeEncryptedData, req.UploadResult)
if err != nil { if err != nil {
endpoint.log.Debug("piece sizes are invalid", zap.Error(err)) endpoint.log.Debug("piece sizes are invalid", zap.Error(err))
return nil, nil, rpcstatus.Errorf(rpcstatus.InvalidArgument, "piece sizes are invalid: %v", err) return nil, rpcstatus.Errorf(rpcstatus.InvalidArgument, "piece sizes are invalid: %v", err)
} }
// extract the original order limits // extract the original order limits
@ -1812,7 +1799,7 @@ func (endpoint *Endpoint) commitSegment(ctx context.Context, req *pb.SegmentComm
validPieces, invalidPieces, err := endpoint.pointerVerification.SelectValidPieces(ctx, req.UploadResult, originalLimits) validPieces, invalidPieces, err := endpoint.pointerVerification.SelectValidPieces(ctx, req.UploadResult, originalLimits)
if err != nil { if err != nil {
endpoint.log.Debug("pointer verification failed", zap.Error(err)) endpoint.log.Debug("pointer verification failed", zap.Error(err))
return nil, nil, rpcstatus.Errorf(rpcstatus.InvalidArgument, "pointer verification failed: %s", err) return nil, rpcstatus.Errorf(rpcstatus.InvalidArgument, "pointer verification failed: %s", err)
} }
if len(validPieces) < int(rs.OptimalShares) { if len(validPieces) < int(rs.OptimalShares) {
@ -1836,7 +1823,7 @@ func (endpoint *Endpoint) commitSegment(ctx context.Context, req *pb.SegmentComm
) )
} }
} }
return nil, nil, rpcstatus.Error(rpcstatus.InvalidArgument, errMsg) return nil, rpcstatus.Error(rpcstatus.InvalidArgument, errMsg)
} }
pieces := metabase.Pieces{} pieces := metabase.Pieces{}
@ -1850,7 +1837,7 @@ func (endpoint *Endpoint) commitSegment(ctx context.Context, req *pb.SegmentComm
id, err := uuid.FromBytes(streamID.StreamId) id, err := uuid.FromBytes(streamID.StreamId)
if err != nil { if err != nil {
endpoint.log.Error("internal", zap.Error(err)) endpoint.log.Error("internal", zap.Error(err))
return nil, nil, rpcstatus.Error(rpcstatus.Internal, err.Error()) return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
} }
var expiresAt *time.Time var expiresAt *time.Time
@ -1886,11 +1873,11 @@ func (endpoint *Endpoint) commitSegment(ctx context.Context, req *pb.SegmentComm
err = endpoint.validateRemoteSegment(ctx, mbCommitSegment, originalLimits) err = endpoint.validateRemoteSegment(ctx, mbCommitSegment, originalLimits)
if err != nil { if err != nil {
return nil, nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error()) return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
} }
if err := endpoint.checkExceedsStorageUsage(ctx, keyInfo.ProjectID); err != nil { if err := endpoint.checkExceedsStorageUsage(ctx, keyInfo.ProjectID); err != nil {
return nil, nil, err return nil, err
} }
segmentSize := req.SizeEncryptedData segmentSize := req.SizeEncryptedData
@ -1907,7 +1894,7 @@ func (endpoint *Endpoint) commitSegment(ctx context.Context, req *pb.SegmentComm
zap.Int16("redundancy minimum requested", rs.RequiredShares), zap.Int16("redundancy minimum requested", rs.RequiredShares),
zap.Int16("redundancy total", rs.TotalShares), zap.Int16("redundancy total", rs.TotalShares),
) )
return nil, nil, rpcstatus.Error(rpcstatus.InvalidArgument, "mismatched segment size and piece usage") return nil, rpcstatus.Error(rpcstatus.InvalidArgument, "mismatched segment size and piece usage")
} }
if err := endpoint.projectUsage.AddProjectStorageUsage(ctx, keyInfo.ProjectID, segmentSize); err != nil { if err := endpoint.projectUsage.AddProjectStorageUsage(ctx, keyInfo.ProjectID, segmentSize); err != nil {
@ -1923,13 +1910,13 @@ func (endpoint *Endpoint) commitSegment(ctx context.Context, req *pb.SegmentComm
err = endpoint.metainfo.metabaseDB.CommitSegment(ctx, mbCommitSegment) err = endpoint.metainfo.metabaseDB.CommitSegment(ctx, mbCommitSegment)
if err != nil { if err != nil {
if metabase.ErrInvalidRequest.Has(err) { if metabase.ErrInvalidRequest.Has(err) {
return nil, nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error()) return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
} }
endpoint.log.Error("internal", zap.Error(err)) endpoint.log.Error("internal", zap.Error(err))
return nil, nil, rpcstatus.Error(rpcstatus.Internal, err.Error()) return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
} }
return nil, &pb.SegmentCommitResponse{ return &pb.SegmentCommitResponse{
SuccessfulPieces: int32(len(pieces)), SuccessfulPieces: int32(len(pieces)),
}, nil }, nil
} }
@ -1943,17 +1930,9 @@ func (endpoint *Endpoint) MakeInlineSegment(ctx context.Context, req *pb.Segment
endpoint.log.Warn("unable to collect uplink version", zap.Error(err)) endpoint.log.Warn("unable to collect uplink version", zap.Error(err))
} }
_, resp, err = endpoint.makeInlineSegment(ctx, req, true)
return resp, err
}
// makeInlineSegment makes inline segment on satellite.
func (endpoint *Endpoint) makeInlineSegment(ctx context.Context, req *pb.SegmentMakeInlineRequest, savePointer bool) (pointer *pb.Pointer, resp *pb.SegmentMakeInlineResponse, err error) {
defer mon.Task()(&ctx)(&err)
streamID, err := endpoint.unmarshalSatStreamID(ctx, req.StreamId) streamID, err := endpoint.unmarshalSatStreamID(ctx, req.StreamId)
if err != nil { if err != nil {
return nil, nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error()) return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
} }
keyInfo, err := endpoint.validateAuth(ctx, req.Header, macaroon.Action{ keyInfo, err := endpoint.validateAuth(ctx, req.Header, macaroon.Action{
@ -1963,20 +1942,20 @@ func (endpoint *Endpoint) makeInlineSegment(ctx context.Context, req *pb.Segment
Time: time.Now(), Time: time.Now(),
}) })
if err != nil { if err != nil {
return nil, nil, err return nil, err
} }
if req.Position.Index < 0 { if req.Position.Index < 0 {
return nil, nil, rpcstatus.Error(rpcstatus.InvalidArgument, "segment index must be greater then 0") return nil, rpcstatus.Error(rpcstatus.InvalidArgument, "segment index must be greater then 0")
} }
inlineUsed := int64(len(req.EncryptedInlineData)) inlineUsed := int64(len(req.EncryptedInlineData))
if inlineUsed > endpoint.encInlineSegmentSize { if inlineUsed > endpoint.encInlineSegmentSize {
return nil, nil, rpcstatus.Error(rpcstatus.InvalidArgument, fmt.Sprintf("inline segment size cannot be larger than %s", endpoint.config.MaxInlineSegmentSize)) return nil, rpcstatus.Error(rpcstatus.InvalidArgument, fmt.Sprintf("inline segment size cannot be larger than %s", endpoint.config.MaxInlineSegmentSize))
} }
if err := endpoint.checkExceedsStorageUsage(ctx, keyInfo.ProjectID); err != nil { if err := endpoint.checkExceedsStorageUsage(ctx, keyInfo.ProjectID); err != nil {
return nil, nil, err return nil, err
} }
if err := endpoint.projectUsage.AddProjectStorageUsage(ctx, keyInfo.ProjectID, inlineUsed); err != nil { if err := endpoint.projectUsage.AddProjectStorageUsage(ctx, keyInfo.ProjectID, inlineUsed); err != nil {
@ -1992,7 +1971,7 @@ func (endpoint *Endpoint) makeInlineSegment(ctx context.Context, req *pb.Segment
id, err := uuid.FromBytes(streamID.StreamId) id, err := uuid.FromBytes(streamID.StreamId)
if err != nil { if err != nil {
endpoint.log.Error("internal", zap.Error(err)) endpoint.log.Error("internal", zap.Error(err))
return nil, nil, rpcstatus.Error(rpcstatus.Internal, err.Error()) return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
} }
var expiresAt *time.Time var expiresAt *time.Time
@ -2024,23 +2003,23 @@ func (endpoint *Endpoint) makeInlineSegment(ctx context.Context, req *pb.Segment
}) })
if err != nil { if err != nil {
if metabase.ErrInvalidRequest.Has(err) { if metabase.ErrInvalidRequest.Has(err) {
return nil, nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error()) return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
} }
endpoint.log.Error("internal", zap.Error(err)) endpoint.log.Error("internal", zap.Error(err))
return nil, nil, rpcstatus.Error(rpcstatus.Internal, err.Error()) return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
} }
bucket := metabase.BucketLocation{ProjectID: keyInfo.ProjectID, BucketName: string(streamID.Bucket)} bucket := metabase.BucketLocation{ProjectID: keyInfo.ProjectID, BucketName: string(streamID.Bucket)}
err = endpoint.orders.UpdatePutInlineOrder(ctx, bucket, inlineUsed) err = endpoint.orders.UpdatePutInlineOrder(ctx, bucket, inlineUsed)
if err != nil { if err != nil {
endpoint.log.Error("internal", zap.Error(err)) endpoint.log.Error("internal", zap.Error(err))
return nil, nil, rpcstatus.Error(rpcstatus.Internal, err.Error()) return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
} }
endpoint.log.Info("Inline Segment Upload", zap.Stringer("Project ID", keyInfo.ProjectID), zap.String("operation", "put"), zap.String("type", "inline")) endpoint.log.Info("Inline Segment Upload", zap.Stringer("Project ID", keyInfo.ProjectID), zap.String("operation", "put"), zap.String("type", "inline"))
mon.Meter("req_put_inline").Mark(1) mon.Meter("req_put_inline").Mark(1)
return nil, &pb.SegmentMakeInlineResponse{}, nil return &pb.SegmentMakeInlineResponse{}, nil
} }
// ListSegments list object segments. // ListSegments list object segments.
@ -2789,15 +2768,3 @@ func (endpoint *Endpoint) checkExceedsStorageUsage(ctx context.Context, projectI
return nil return nil
} }
// CreatePath creates a segment key.
func CreatePath(ctx context.Context, projectID uuid.UUID, segmentIndex uint32, bucket, path []byte) (_ metabase.SegmentLocation, err error) {
// TODO rename to CreateLocation
defer mon.Task()(&ctx)(&err)
return metabase.SegmentLocation{
ProjectID: projectID,
BucketName: string(bucket),
Position: metabase.SegmentPosition{Index: segmentIndex},
ObjectKey: metabase.ObjectKey(path),
}, nil
}