satellite/metainfo: Check ctx in download Object & Segment
Check if the context is done at the beginning of the download object and segment and return right away if it's the case. Check if Redis returned an error due to context cancellation for not logging the error. Change some logging messages of Redis errors to reflect on them if the error happens while downloading an object or a segment. Change-Id: I8ed8ff9ff7bb170b560f41356ea06820ce6c4e12
This commit is contained in:
parent
7d84482381
commit
1d14e4ef60
@ -16,6 +16,7 @@ import (
|
||||
|
||||
"storj.io/common/context2"
|
||||
"storj.io/common/encryption"
|
||||
"storj.io/common/errs2"
|
||||
"storj.io/common/lrucache"
|
||||
"storj.io/common/macaroon"
|
||||
"storj.io/common/memory"
|
||||
@ -880,6 +881,10 @@ func (endpoint *Endpoint) GetObject(ctx context.Context, req *pb.ObjectGetReques
|
||||
func (endpoint *Endpoint) DownloadObject(ctx context.Context, req *pb.ObjectDownloadRequest) (resp *pb.ObjectDownloadResponse, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
if ctx.Err() != nil {
|
||||
return nil, rpcstatus.Error(rpcstatus.Canceled, "client has closed the connection")
|
||||
}
|
||||
|
||||
err = endpoint.versionCollector.collect(req.Header.UserAgent, mon.Func().ShortName())
|
||||
if err != nil {
|
||||
endpoint.log.Warn("unable to collect uplink version", zap.Error(err))
|
||||
@ -901,6 +906,10 @@ func (endpoint *Endpoint) DownloadObject(ctx context.Context, req *pb.ObjectDown
|
||||
}
|
||||
|
||||
if exceeded, limit, err := endpoint.projectUsage.ExceedsBandwidthUsage(ctx, keyInfo.ProjectID); err != nil {
|
||||
if errs2.IsCanceled(err) {
|
||||
return nil, rpcstatus.Wrap(rpcstatus.Canceled, err)
|
||||
}
|
||||
|
||||
endpoint.log.Error(
|
||||
"Retrieving project bandwidth total failed; bandwidth limit won't be enforced",
|
||||
zap.Stringer("Project ID", keyInfo.ProjectID),
|
||||
@ -965,11 +974,15 @@ func (endpoint *Endpoint) DownloadObject(ctx context.Context, req *pb.ObjectDown
|
||||
// Update the current bandwidth cache value incrementing the SegmentSize.
|
||||
err = endpoint.projectUsage.UpdateProjectBandwidthUsage(ctx, keyInfo.ProjectID, downloadSizes.encryptedSize)
|
||||
if err != nil {
|
||||
if errs2.IsCanceled(err) {
|
||||
return nil, rpcstatus.Wrap(rpcstatus.Canceled, err)
|
||||
}
|
||||
|
||||
// log it and continue. it's most likely our own fault that we couldn't
|
||||
// track it, and the only thing that will be affected is our per-project
|
||||
// bandwidth limits.
|
||||
endpoint.log.Error(
|
||||
"Could not track the new project's bandwidth usage",
|
||||
"Could not track the new project's bandwidth usage when downloading an object",
|
||||
zap.Stringer("Project ID", keyInfo.ProjectID),
|
||||
zap.Error(err),
|
||||
)
|
||||
@ -2108,6 +2121,10 @@ func convertStreamListResults(result metabase.ListStreamPositionsResult) (*pb.Se
|
||||
func (endpoint *Endpoint) DownloadSegment(ctx context.Context, req *pb.SegmentDownloadRequest) (resp *pb.SegmentDownloadResponse, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
if ctx.Err() != nil {
|
||||
return nil, rpcstatus.Error(rpcstatus.Canceled, "client has closed the connection")
|
||||
}
|
||||
|
||||
err = endpoint.versionCollector.collect(req.Header.UserAgent, mon.Func().ShortName())
|
||||
if err != nil {
|
||||
endpoint.log.Warn("unable to collect uplink version", zap.Error(err))
|
||||
@ -2131,6 +2148,10 @@ func (endpoint *Endpoint) DownloadSegment(ctx context.Context, req *pb.SegmentDo
|
||||
bucket := metabase.BucketLocation{ProjectID: keyInfo.ProjectID, BucketName: string(streamID.Bucket)}
|
||||
|
||||
if exceeded, limit, err := endpoint.projectUsage.ExceedsBandwidthUsage(ctx, keyInfo.ProjectID); err != nil {
|
||||
if errs2.IsCanceled(err) {
|
||||
return nil, rpcstatus.Wrap(rpcstatus.Canceled, err)
|
||||
}
|
||||
|
||||
endpoint.log.Error(
|
||||
"Retrieving project bandwidth total failed; bandwidth limit won't be enforced",
|
||||
zap.Stringer("Project ID", keyInfo.ProjectID),
|
||||
@ -2179,10 +2200,14 @@ func (endpoint *Endpoint) DownloadSegment(ctx context.Context, req *pb.SegmentDo
|
||||
// Update the current bandwidth cache value incrementing the SegmentSize.
|
||||
err = endpoint.projectUsage.UpdateProjectBandwidthUsage(ctx, keyInfo.ProjectID, int64(segment.EncryptedSize))
|
||||
if err != nil {
|
||||
if errs2.IsCanceled(err) {
|
||||
return nil, rpcstatus.Wrap(rpcstatus.Canceled, err)
|
||||
}
|
||||
|
||||
// log it and continue. it's most likely our own fault that we couldn't
|
||||
// track it, and the only thing that will be affected is our per-project
|
||||
// bandwidth limits.
|
||||
endpoint.log.Error("Could not track the new project's bandwidth usage",
|
||||
endpoint.log.Error("Could not track the new project's bandwidth usage when downloading a segment",
|
||||
zap.Stringer("Project ID", keyInfo.ProjectID),
|
||||
zap.Error(err),
|
||||
)
|
||||
@ -2497,7 +2522,7 @@ func (endpoint *Endpoint) objectToProto(ctx context.Context, object metabase.Obj
|
||||
CipherSuite: pb.CipherSuite(object.Encryption.CipherSuite),
|
||||
BlockSize: int64(object.Encryption.BlockSize),
|
||||
},
|
||||
//TODO: this is the only one place where placement is not added to the StreamID
|
||||
// TODO: this is the only one place where placement is not added to the StreamID
|
||||
// bucket info would be required to add placement here
|
||||
})
|
||||
if err != nil {
|
||||
@ -2696,6 +2721,10 @@ func (endpoint *Endpoint) checkExceedsStorageUsage(ctx context.Context, projectI
|
||||
|
||||
exceeded, limit, err := endpoint.projectUsage.ExceedsStorageUsage(ctx, projectID)
|
||||
if err != nil {
|
||||
if errs2.IsCanceled(err) {
|
||||
return rpcstatus.Wrap(rpcstatus.Canceled, err)
|
||||
}
|
||||
|
||||
endpoint.log.Error(
|
||||
"Retrieving project storage totals failed; storage usage limit won't be enforced",
|
||||
zap.Stringer("Project ID", projectID),
|
||||
|
Loading…
Reference in New Issue
Block a user