metainfo: move FinishDeleteSegment logic to BeginDeleteSegment (#3104)
This commit is contained in:
parent
eb73a3d35f
commit
607da4ab4a
@ -1621,7 +1621,7 @@ func (endpoint *Endpoint) BeginDeleteSegment(ctx context.Context, req *pb.Segmen
|
||||
return nil, status.Error(codes.Unauthenticated, err.Error())
|
||||
}
|
||||
|
||||
pointer, _, err := endpoint.getPointer(ctx, keyInfo.ProjectID, int64(req.Position.Index), streamID.Bucket, streamID.EncryptedPath)
|
||||
pointer, path, err := endpoint.getPointer(ctx, keyInfo.ProjectID, int64(req.Position.Index), streamID.Bucket, streamID.EncryptedPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -1636,6 +1636,20 @@ func (endpoint *Endpoint) BeginDeleteSegment(ctx context.Context, req *pb.Segmen
|
||||
}
|
||||
}
|
||||
|
||||
// moved from FinishDeleteSegment to avoid inconsistency if someone will not
|
||||
// call FinishDeleteSegment on uplink side
|
||||
for _, piece := range pointer.GetRemote().GetRemotePieces() {
|
||||
_, err := endpoint.containment.Delete(ctx, piece.NodeId)
|
||||
if err != nil {
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
err = endpoint.metainfo.Delete(ctx, path)
|
||||
if err != nil {
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
segmentID, err := endpoint.packSegmentID(ctx, &pb.SatSegmentID{
|
||||
StreamId: streamID,
|
||||
OriginalOrderLimits: limits,
|
||||
@ -1661,7 +1675,7 @@ func (endpoint *Endpoint) FinishDeleteSegment(ctx context.Context, req *pb.Segme
|
||||
|
||||
streamID := segmentID.StreamId
|
||||
|
||||
keyInfo, err := endpoint.validateAuth(ctx, req.Header, macaroon.Action{
|
||||
_, err = endpoint.validateAuth(ctx, req.Header, macaroon.Action{
|
||||
Op: macaroon.ActionDelete,
|
||||
Bucket: streamID.Bucket,
|
||||
EncryptedPath: streamID.EncryptedPath,
|
||||
@ -1671,22 +1685,7 @@ func (endpoint *Endpoint) FinishDeleteSegment(ctx context.Context, req *pb.Segme
|
||||
return nil, status.Error(codes.Unauthenticated, err.Error())
|
||||
}
|
||||
|
||||
pointer, path, err := endpoint.getPointer(ctx, keyInfo.ProjectID, int64(segmentID.Index), streamID.Bucket, streamID.EncryptedPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, piece := range pointer.GetRemote().GetRemotePieces() {
|
||||
_, err := endpoint.containment.Delete(ctx, piece.NodeId)
|
||||
if err != nil {
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
err = endpoint.metainfo.Delete(ctx, path)
|
||||
if err != nil {
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
// at the moment logic is in BeginDeleteSegment
|
||||
|
||||
return &pb.SegmentFinishDeleteResponse{}, nil
|
||||
}
|
||||
|
@ -209,7 +209,7 @@ func (s *segmentStore) Get(ctx context.Context, streamID storj.StreamID, segment
|
||||
func (s *segmentStore) Delete(ctx context.Context, streamID storj.StreamID, segmentIndex int32) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
segmentID, limits, privateKey, err := s.metainfo.BeginDeleteSegment(ctx, metainfo.BeginDeleteSegmentParams{
|
||||
_, limits, privateKey, err := s.metainfo.BeginDeleteSegment(ctx, metainfo.BeginDeleteSegmentParams{
|
||||
StreamID: streamID,
|
||||
Position: storj.SegmentPosition{
|
||||
Index: segmentIndex,
|
||||
@ -227,13 +227,8 @@ func (s *segmentStore) Delete(ctx context.Context, streamID storj.StreamID, segm
|
||||
}
|
||||
}
|
||||
|
||||
err = s.metainfo.FinishDeleteSegment(ctx, metainfo.FinishDeleteSegmentParams{
|
||||
SegmentID: segmentID,
|
||||
// TODO add delete results
|
||||
})
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
// don't do FinishDeleteSegment at the moment to avoid satellite round trip
|
||||
// FinishDeleteSegment doesn't implement any specific logic at the moment
|
||||
|
||||
return nil
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user