satellite/metainfo: replace local constants with metabase values
Change-Id: Ie855c10dd259464658952186251b7d3210eb49ce
This commit is contained in:
parent
9a627b22e8
commit
d199daa907
@ -41,8 +41,6 @@ import (
|
|||||||
|
|
||||||
const (
|
const (
|
||||||
satIDExpiration = 48 * time.Hour
|
satIDExpiration = 48 * time.Hour
|
||||||
lastSegment = -1
|
|
||||||
firstSegment = 0
|
|
||||||
listLimit = 1000
|
listLimit = 1000
|
||||||
|
|
||||||
deleteObjectPiecesSuccessThreshold = 0.75
|
deleteObjectPiecesSuccessThreshold = 0.75
|
||||||
@ -424,12 +422,12 @@ func (endpoint *Endpoint) DeleteBucket(ctx context.Context, req *pb.BucketDelete
|
|||||||
// since from the user's perspective, objects without last segment are invisible.
|
// since from the user's perspective, objects without last segment are invisible.
|
||||||
func (endpoint *Endpoint) deleteBucketNotEmpty(ctx context.Context, projectID uuid.UUID, bucketName []byte) ([]byte, int, error) {
|
func (endpoint *Endpoint) deleteBucketNotEmpty(ctx context.Context, projectID uuid.UUID, bucketName []byte) ([]byte, int, error) {
|
||||||
// Delete all objects that has last segment.
|
// Delete all objects that has last segment.
|
||||||
deletedCount, err := endpoint.deleteByPrefix(ctx, projectID, bucketName, lastSegment)
|
deletedCount, err := endpoint.deleteByPrefix(ctx, projectID, bucketName, metabase.LastSegmentIndex)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, 0, rpcstatus.Error(rpcstatus.Internal, err.Error())
|
return nil, 0, rpcstatus.Error(rpcstatus.Internal, err.Error())
|
||||||
}
|
}
|
||||||
// Delete all zombie objects that have first segment.
|
// Delete all zombie objects that have first segment.
|
||||||
_, err = endpoint.deleteByPrefix(ctx, projectID, bucketName, firstSegment)
|
_, err = endpoint.deleteByPrefix(ctx, projectID, bucketName, metabase.FirstSegmentIndex)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, deletedCount, rpcstatus.Error(rpcstatus.Internal, err.Error())
|
return nil, deletedCount, rpcstatus.Error(rpcstatus.Internal, err.Error())
|
||||||
}
|
}
|
||||||
@ -695,7 +693,7 @@ func (endpoint *Endpoint) BeginObject(ctx context.Context, req *pb.ObjectBeginRe
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
location, err := CreatePath(ctx, keyInfo.ProjectID, lastSegment, req.Bucket, req.EncryptedPath)
|
location, err := CreatePath(ctx, keyInfo.ProjectID, metabase.LastSegmentIndex, req.Bucket, req.EncryptedPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
endpoint.log.Error("unable to create path", zap.Error(err))
|
endpoint.log.Error("unable to create path", zap.Error(err))
|
||||||
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
|
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
|
||||||
@ -798,7 +796,7 @@ func (endpoint *Endpoint) commitObject(ctx context.Context, req *pb.ObjectCommit
|
|||||||
lastSegmentPointer.Remote.Redundancy = streamID.Redundancy
|
lastSegmentPointer.Remote.Redundancy = streamID.Redundancy
|
||||||
lastSegmentPointer.Metadata = req.EncryptedMetadata
|
lastSegmentPointer.Metadata = req.EncryptedMetadata
|
||||||
|
|
||||||
lastSegmentLocation, err := CreatePath(ctx, keyInfo.ProjectID, int64(lastSegment), streamID.Bucket, streamID.EncryptedPath)
|
lastSegmentLocation, err := CreatePath(ctx, keyInfo.ProjectID, int64(metabase.LastSegmentIndex), streamID.Bucket, streamID.EncryptedPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
endpoint.log.Error("unable to create path", zap.Error(err))
|
endpoint.log.Error("unable to create path", zap.Error(err))
|
||||||
return nil, rpcstatus.Error(rpcstatus.Internal, "unable to commit object")
|
return nil, rpcstatus.Error(rpcstatus.Internal, "unable to commit object")
|
||||||
@ -846,7 +844,7 @@ func (endpoint *Endpoint) GetObject(ctx context.Context, req *pb.ObjectGetReques
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (endpoint *Endpoint) getObject(ctx context.Context, projectID uuid.UUID, bucket, encryptedPath []byte, version int32) (*pb.Object, error) {
|
func (endpoint *Endpoint) getObject(ctx context.Context, projectID uuid.UUID, bucket, encryptedPath []byte, version int32) (*pb.Object, error) {
|
||||||
pointer, _, err := endpoint.getPointer(ctx, projectID, lastSegment, bucket, encryptedPath)
|
pointer, _, err := endpoint.getPointer(ctx, projectID, metabase.LastSegmentIndex, bucket, encryptedPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -952,7 +950,7 @@ func (endpoint *Endpoint) ListObjects(ctx context.Context, req *pb.ObjectListReq
|
|||||||
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
|
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
prefix, err := CreatePath(ctx, keyInfo.ProjectID, lastSegment, req.Bucket, req.EncryptedPrefix)
|
prefix, err := CreatePath(ctx, keyInfo.ProjectID, metabase.LastSegmentIndex, req.Bucket, req.EncryptedPrefix)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
|
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
|
||||||
}
|
}
|
||||||
@ -1109,7 +1107,7 @@ func (endpoint *Endpoint) GetObjectIPs(ctx context.Context, req *pb.ObjectGetIPs
|
|||||||
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
|
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
lastPointer, _, err := endpoint.getPointer(ctx, keyInfo.ProjectID, lastSegment, req.Bucket, req.EncryptedPath)
|
lastPointer, _, err := endpoint.getPointer(ctx, keyInfo.ProjectID, metabase.LastSegmentIndex, req.Bucket, req.EncryptedPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// endpoint.getPointer already returns valid rpcstatus errors
|
// endpoint.getPointer already returns valid rpcstatus errors
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -1142,7 +1140,7 @@ func (endpoint *Endpoint) GetObjectIPs(ctx context.Context, req *pb.ObjectGetIPs
|
|||||||
// If we do know the number of segments, we want to run the loop as long as
|
// If we do know the number of segments, we want to run the loop as long as
|
||||||
// the numberOfSegmentsToFetch is > 0 and until we have fetched that many
|
// the numberOfSegmentsToFetch is > 0 and until we have fetched that many
|
||||||
// segments.
|
// segments.
|
||||||
for i := firstSegment; !numSegmentsKnown || (numSegmentsKnown && numberOfSegmentsToFetch > 0 && i < numberOfSegmentsToFetch); i++ {
|
for i := metabase.FirstSegmentIndex; !numSegmentsKnown || (numSegmentsKnown && numberOfSegmentsToFetch > 0 && i < numberOfSegmentsToFetch); i++ {
|
||||||
location, err := CreatePath(ctx, keyInfo.ProjectID, int64(i), req.Bucket, req.EncryptedPath)
|
location, err := CreatePath(ctx, keyInfo.ProjectID, int64(i), req.Bucket, req.EncryptedPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
|
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
|
||||||
@ -1600,7 +1598,7 @@ func (endpoint *Endpoint) ListSegments(ctx context.Context, req *pb.SegmentListR
|
|||||||
limit = listLimit
|
limit = listLimit
|
||||||
}
|
}
|
||||||
|
|
||||||
pointer, _, err := endpoint.getPointer(ctx, keyInfo.ProjectID, lastSegment, streamID.Bucket, streamID.EncryptedPath)
|
pointer, _, err := endpoint.getPointer(ctx, keyInfo.ProjectID, metabase.LastSegmentIndex, streamID.Bucket, streamID.EncryptedPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if rpcstatus.Code(err) == rpcstatus.NotFound {
|
if rpcstatus.Code(err) == rpcstatus.NotFound {
|
||||||
return &pb.SegmentListResponse{}, nil
|
return &pb.SegmentListResponse{}, nil
|
||||||
@ -1675,7 +1673,7 @@ func (endpoint *Endpoint) listSegmentsFromNumberOfSegments(numberOfSegments, cur
|
|||||||
// last segment is always the last one
|
// last segment is always the last one
|
||||||
segmentItems = append(segmentItems, &pb.SegmentListItem{
|
segmentItems = append(segmentItems, &pb.SegmentListItem{
|
||||||
Position: &pb.SegmentPosition{
|
Position: &pb.SegmentPosition{
|
||||||
Index: lastSegment,
|
Index: metabase.LastSegmentIndex,
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@ -1730,7 +1728,7 @@ func (endpoint *Endpoint) listSegmentsManually(ctx context.Context, projectID uu
|
|||||||
if limit > int32(len(segmentItems)) {
|
if limit > int32(len(segmentItems)) {
|
||||||
segmentItems = append(segmentItems, &pb.SegmentListItem{
|
segmentItems = append(segmentItems, &pb.SegmentListItem{
|
||||||
Position: &pb.SegmentPosition{
|
Position: &pb.SegmentPosition{
|
||||||
Index: lastSegment,
|
Index: metabase.LastSegmentIndex,
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
} else {
|
} else {
|
||||||
@ -1796,7 +1794,7 @@ func (endpoint *Endpoint) DownloadSegment(ctx context.Context, req *pb.SegmentDo
|
|||||||
var encryptedKey []byte
|
var encryptedKey []byte
|
||||||
if len(pointer.Metadata) != 0 {
|
if len(pointer.Metadata) != 0 {
|
||||||
var segmentMeta *pb.SegmentMeta
|
var segmentMeta *pb.SegmentMeta
|
||||||
if req.CursorPosition.Index == lastSegment {
|
if req.CursorPosition.Index == metabase.LastSegmentIndex {
|
||||||
streamMeta := &pb.StreamMeta{}
|
streamMeta := &pb.StreamMeta{}
|
||||||
err = pb.Unmarshal(pointer.Metadata, streamMeta)
|
err = pb.Unmarshal(pointer.Metadata, streamMeta)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -2114,7 +2112,7 @@ func (endpoint *Endpoint) RevokeAPIKey(ctx context.Context, req *pb.RevokeAPIKey
|
|||||||
func CreatePath(ctx context.Context, projectID uuid.UUID, segmentIndex int64, bucket, path []byte) (_ metabase.SegmentLocation, err error) {
|
func CreatePath(ctx context.Context, projectID uuid.UUID, segmentIndex int64, bucket, path []byte) (_ metabase.SegmentLocation, err error) {
|
||||||
// TODO rename to CreateLocation
|
// TODO rename to CreateLocation
|
||||||
defer mon.Task()(&ctx)(&err)
|
defer mon.Task()(&ctx)(&err)
|
||||||
if segmentIndex < lastSegment { // lastSegment = -1
|
if segmentIndex < metabase.LastSegmentIndex {
|
||||||
return metabase.SegmentLocation{}, errors.New("invalid segment index")
|
return metabase.SegmentLocation{}, errors.New("invalid segment index")
|
||||||
}
|
}
|
||||||
return metabase.SegmentLocation{
|
return metabase.SegmentLocation{
|
||||||
|
Loading…
Reference in New Issue
Block a user