satellite/metainfo: Don't response errors when Redis down
For being able to have resilient multi-region satellites we cannot stop processing uploads/download client request when Redis isn't responding properly. These changes avoid to stop the processing of the client requests when we cannot check if the client exceeds its storage or bandwidth limits and we cannot update its used storage/bandwidth limits because Redis is not responding successfully or the satellite database returns an error. Change-Id: Ia7f12c07fc9ffdfad0e7ff052ff3fd81eca0f0e3
This commit is contained in:
parent
a73c59bbdd
commit
a4d06b9b1e
@ -1170,16 +1170,8 @@ func (endpoint *Endpoint) BeginSegment(ctx context.Context, req *pb.SegmentBegin
|
||||
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, "segment index must be greater then 0")
|
||||
}
|
||||
|
||||
exceeded, limit, err := endpoint.projectUsage.ExceedsStorageUsage(ctx, keyInfo.ProjectID)
|
||||
if err != nil {
|
||||
endpoint.log.Error("Retrieving project storage totals failed.", zap.Error(err))
|
||||
}
|
||||
if exceeded {
|
||||
endpoint.log.Error("Monthly storage limit exceeded.",
|
||||
zap.Stringer("Limit", limit),
|
||||
zap.Stringer("Project ID", keyInfo.ProjectID),
|
||||
)
|
||||
return nil, rpcstatus.Error(rpcstatus.ResourceExhausted, "Exceeded Usage Limit")
|
||||
if err := endpoint.checkExceedsStorageUsage(ctx, keyInfo.ProjectID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
redundancy, err := eestream.NewRedundancyStrategyFromProto(streamID.Redundancy)
|
||||
@ -1311,16 +1303,8 @@ func (endpoint *Endpoint) commitSegment(ctx context.Context, req *pb.SegmentComm
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
exceeded, limit, err := endpoint.projectUsage.ExceedsStorageUsage(ctx, keyInfo.ProjectID)
|
||||
if err != nil {
|
||||
return nil, nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
|
||||
}
|
||||
if exceeded {
|
||||
endpoint.log.Error("The project limit of storage and bandwidth has been exceeded",
|
||||
zap.Int64("limit", limit.Int64()),
|
||||
zap.Stringer("Project ID", keyInfo.ProjectID),
|
||||
)
|
||||
return nil, nil, rpcstatus.Error(rpcstatus.ResourceExhausted, "Exceeded Usage Limit")
|
||||
if err := endpoint.checkExceedsStorageUsage(ctx, keyInfo.ProjectID); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
// clear hashes so we don't store them
|
||||
@ -1346,12 +1330,13 @@ func (endpoint *Endpoint) commitSegment(ctx context.Context, req *pb.SegmentComm
|
||||
}
|
||||
|
||||
if err := endpoint.projectUsage.AddProjectStorageUsage(ctx, keyInfo.ProjectID, segmentSize); err != nil {
|
||||
endpoint.log.Error("Could not track new storage usage by project",
|
||||
// log it and continue. it's most likely our own fault that we couldn't
|
||||
// track it, and the only thing that will be affected is our per-project
|
||||
// storage limits.
|
||||
endpoint.log.Error("Could not track new project's storage usage",
|
||||
zap.Stringer("Project ID", keyInfo.ProjectID),
|
||||
zap.Error(err),
|
||||
)
|
||||
// but continue. it's most likely our own fault that we couldn't track it, and the only thing
|
||||
// that will be affected is our per-project bandwidth and storage limits.
|
||||
}
|
||||
|
||||
if savePointer {
|
||||
@ -1407,22 +1392,18 @@ func (endpoint *Endpoint) makeInlineSegment(ctx context.Context, req *pb.Segment
|
||||
return nil, nil, rpcstatus.Error(rpcstatus.InvalidArgument, fmt.Sprintf("inline segment size cannot be larger than %s", endpoint.config.MaxInlineSegmentSize))
|
||||
}
|
||||
|
||||
exceeded, limit, err := endpoint.projectUsage.ExceedsStorageUsage(ctx, keyInfo.ProjectID)
|
||||
if err != nil {
|
||||
return nil, nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
|
||||
}
|
||||
if exceeded {
|
||||
endpoint.log.Error("Monthly storage limit exceeded.",
|
||||
zap.Stringer("Limit", limit),
|
||||
zap.Stringer("Project ID", keyInfo.ProjectID),
|
||||
)
|
||||
return nil, nil, rpcstatus.Error(rpcstatus.ResourceExhausted, "Exceeded Usage Limit")
|
||||
if err := endpoint.checkExceedsStorageUsage(ctx, keyInfo.ProjectID); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
if err := endpoint.projectUsage.AddProjectStorageUsage(ctx, keyInfo.ProjectID, inlineUsed); err != nil {
|
||||
endpoint.log.Error("Could not track new storage usage.", zap.Stringer("Project ID", keyInfo.ProjectID), zap.Error(err))
|
||||
// but continue. it's most likely our own fault that we couldn't track it, and the only thing
|
||||
// that will be affected is our per-project bandwidth and storage limits.
|
||||
// log it and continue. it's most likely our own fault that we couldn't
|
||||
// track it, and the only thing that will be affected is our per-project
|
||||
// bandwidth and storage limits.
|
||||
endpoint.log.Error("Could not track new project's storage usage",
|
||||
zap.Stringer("Project ID", keyInfo.ProjectID),
|
||||
zap.Error(err),
|
||||
)
|
||||
}
|
||||
|
||||
metadata, err := pb.Marshal(&pb.SegmentMeta{
|
||||
@ -1502,12 +1483,10 @@ func (endpoint *Endpoint) DownloadSegment(ctx context.Context, req *pb.SegmentDo
|
||||
|
||||
bucket := metabase.BucketLocation{ProjectID: keyInfo.ProjectID, BucketName: string(streamID.Bucket)}
|
||||
|
||||
exceeded, limit, err := endpoint.projectUsage.ExceedsBandwidthUsage(ctx, keyInfo.ProjectID)
|
||||
if err != nil {
|
||||
endpoint.log.Error("Retrieving project bandwidth total failed.", zap.Error(err))
|
||||
}
|
||||
if exceeded {
|
||||
endpoint.log.Error("Monthly bandwidth limit exceeded.",
|
||||
if exceeded, limit, err := endpoint.projectUsage.ExceedsBandwidthUsage(ctx, keyInfo.ProjectID); err != nil {
|
||||
endpoint.log.Error("Retrieving project bandwidth total failed; bandwidth limit won't be enforced", zap.Error(err))
|
||||
} else if exceeded {
|
||||
endpoint.log.Error("Monthly bandwidth limit exceeded",
|
||||
zap.Stringer("Limit", limit),
|
||||
zap.Stringer("Project ID", keyInfo.ProjectID),
|
||||
)
|
||||
@ -1520,9 +1499,14 @@ func (endpoint *Endpoint) DownloadSegment(ctx context.Context, req *pb.SegmentDo
|
||||
}
|
||||
|
||||
// Update the current bandwidth cache value incrementing the SegmentSize.
|
||||
err = endpoint.projectUsage.UpdateProjectBandwidthUsage(ctx, keyInfo.ProjectID, pointer.SegmentSize)
|
||||
if err != nil {
|
||||
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
|
||||
if err = endpoint.projectUsage.UpdateProjectBandwidthUsage(ctx, keyInfo.ProjectID, pointer.SegmentSize); err != nil {
|
||||
// log it and continue. it's most likely our own fault that we couldn't
|
||||
// track it, and the only thing that will be affected is our per-project
|
||||
// bandwidth limits.
|
||||
endpoint.log.Error("Could not track the new project's bandwidth usage",
|
||||
zap.Stringer("Project ID", keyInfo.ProjectID),
|
||||
zap.Error(err),
|
||||
)
|
||||
}
|
||||
|
||||
segmentID, err := endpoint.packSegmentID(ctx, &internalpb.SegmentID{})
|
||||
@ -1837,6 +1821,26 @@ func (endpoint *Endpoint) RevokeAPIKey(ctx context.Context, req *pb.RevokeAPIKey
|
||||
return &pb.RevokeAPIKeyResponse{}, nil
|
||||
}
|
||||
|
||||
func (endpoint *Endpoint) checkExceedsStorageUsage(ctx context.Context, projectID uuid.UUID) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
exceeded, limit, err := endpoint.projectUsage.ExceedsStorageUsage(ctx, projectID)
|
||||
if err != nil {
|
||||
endpoint.log.Error(
|
||||
"Retrieving project storage totals failed; storage usage limit won't be enforced",
|
||||
zap.Error(err),
|
||||
)
|
||||
} else if exceeded {
|
||||
endpoint.log.Error("Monthly storage limit exceeded",
|
||||
zap.Stringer("Limit", limit),
|
||||
zap.Stringer("Project ID", projectID),
|
||||
)
|
||||
return rpcstatus.Error(rpcstatus.ResourceExhausted, "Exceeded Usage Limit")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// CreatePath creates a segment key.
|
||||
func CreatePath(ctx context.Context, projectID uuid.UUID, segmentIndex int64, bucket, path []byte) (_ metabase.SegmentLocation, err error) {
|
||||
// TODO rename to CreateLocation
|
||||
|
Loading…
Reference in New Issue
Block a user