uplink: Reduce satellite request using Batch when possible (#3351)

* uplink/metainfo: Return classified Not Found error

Metainfo client Batch method must return the Storj Not Found error class
when the RCP server response with a not found status code as any other
metainfo Client method does.

Also if the error isn't Not Found one, it must wrap the error.

* uplink/storage/streams: Use Batch request in Delete

Change the 2 individual metainfo Client calls that streamStore Delete
method does by a single Batch one.
This commit is contained in:
Ivan Fraixedes 2019-10-24 23:18:48 +02:00 committed by Michal Niewrzal
parent 7f6893ea86
commit d9d82b0336
2 changed files with 34 additions and 15 deletions

View File

@ -1096,7 +1096,11 @@ func (client *Client) Batch(ctx context.Context, requests ...BatchItem) (resp []
Requests: batchItems,
})
if err != nil {
return []BatchResponse{}, err
if errs2.IsRPC(err, rpcstatus.NotFound) {
return []BatchResponse{}, storj.ErrObjectNotFound.Wrap(err)
}
return []BatchResponse{}, Error.Wrap(err)
}
resp = make([]BatchResponse, len(response.Responses))

View File

@ -413,29 +413,44 @@ func (s *streamStore) Delete(ctx context.Context, path Path, pathCipher storj.Ci
return err
}
// TODO do it in batch
streamID, err := s.metainfo.BeginDeleteObject(ctx, metainfo.BeginDeleteObjectParams{
Bucket: []byte(path.Bucket()),
EncryptedPath: []byte(encPath.Raw()),
})
batchItems := []metainfo.BatchItem{
&metainfo.BeginDeleteObjectParams{
Bucket: []byte(path.Bucket()),
EncryptedPath: []byte(encPath.Raw()),
},
&metainfo.ListSegmentsParams{
CursorPosition: storj.SegmentPosition{
Index: 0,
},
},
}
resps, err := s.metainfo.Batch(ctx, batchItems...)
if err != nil {
return err
}
// TODO handle `more`
items, _, err := s.metainfo.ListSegments(ctx, metainfo.ListSegmentsParams{
StreamID: streamID,
CursorPosition: storj.SegmentPosition{
Index: 0,
},
})
if len(resps) != 2 {
return errs.New(
"metainfo.Batch request returned an unexpected number of responses. Want: 2, got: %d", len(resps),
)
}
delResp, err := resps[0].BeginDeleteObject()
if err != nil {
return err
}
listResp, err := resps[1].ListSegment()
if err != nil {
return err
}
// TODO handle listResp.More
var errlist errs.Group
for _, item := range items {
err = s.segments.Delete(ctx, streamID, item.Position.Index)
for _, item := range listResp.Items {
err = s.segments.Delete(ctx, delResp.StreamID, item.Position.Index)
if err != nil {
errlist.Add(err)
continue