satellite/metainfo: upload/download with metabase

This change is adjusting metainfo endpoint to use metabase for uploading
and downloading remote objects. Inline segments will be added later.

Change-Id: I109d45bf644cd48096c47361043ebd8dfeaea0f3
This commit is contained in:
Michal Niewrzal 2020-11-06 12:54:52 +01:00
parent 2bd239bb75
commit 3fe16f4003
8 changed files with 478 additions and 403 deletions

2
go.mod
View File

@ -46,5 +46,5 @@ require (
storj.io/drpc v0.0.16
storj.io/monkit-jaeger v0.0.0-20200518165323-80778fc3f91b
storj.io/private v0.0.0-20201026143115-bc926bfa3bca
storj.io/uplink v1.3.2-0.20201106105834-ec8e5cc29f7e
storj.io/uplink v1.3.2-0.20201109124414-ccb0c91f4a8c
)

4
go.sum
View File

@ -914,5 +914,5 @@ storj.io/monkit-jaeger v0.0.0-20200518165323-80778fc3f91b h1:Bbg9JCtY6l3HrDxs3BX
storj.io/monkit-jaeger v0.0.0-20200518165323-80778fc3f91b/go.mod h1:gj4vuCeyCRjRmH8LIrgoyU9Dc9uR6H+/GcDUXmTbf80=
storj.io/private v0.0.0-20201026143115-bc926bfa3bca h1:ekR7vtUYC5+cDyim0ZJaSZeXidyzQqDYsnFPYXgTozc=
storj.io/private v0.0.0-20201026143115-bc926bfa3bca/go.mod h1:EaLnIyNyqWQUJB+7+KWVez0In9czl0nHHlm2WobebuA=
storj.io/uplink v1.3.2-0.20201106105834-ec8e5cc29f7e h1:a58hzcYciTvBtWST+Byoj76NWuYdnPcz2GK8ynyEyfA=
storj.io/uplink v1.3.2-0.20201106105834-ec8e5cc29f7e/go.mod h1:mrdt4I4EhPRC7cnvCD5490IBm423pgKrVoUiC9a5Srg=
storj.io/uplink v1.3.2-0.20201109124414-ccb0c91f4a8c h1:o+bxDRF7QvNCOM7lZI8EBV6xridymSt6Lljy/kmmPeA=
storj.io/uplink v1.3.2-0.20201109124414-ccb0c91f4a8c/go.mod h1:mrdt4I4EhPRC7cnvCD5490IBm423pgKrVoUiC9a5Srg=

View File

@ -170,6 +170,20 @@ type MetabaseDB interface {
MigrateToLatest(ctx context.Context) error
// DeleteObjectsAllVersions deletes all versions of multiple objects from the same bucket.
DeleteObjectsAllVersions(ctx context.Context, opts metabase.DeleteObjectsAllVersions) (result metabase.DeleteObjectResult, err error)
// BeginObjectExactVersion adds a pending object to the database, with specific version.
BeginObjectExactVersion(ctx context.Context, opts metabase.BeginObjectExactVersion) (committed metabase.Version, err error)
// CommitObject adds a pending object to the database.
CommitObject(ctx context.Context, opts metabase.CommitObject) (object metabase.Object, err error)
// BeginSegment verifies whether a new segment upload can be started.
BeginSegment(ctx context.Context, opts metabase.BeginSegment) (err error)
// CommitSegment commits segment to the database.
CommitSegment(ctx context.Context, opts metabase.CommitSegment) (err error)
// GetObjectLatestVersion returns object information for latest version.
GetObjectLatestVersion(ctx context.Context, opts metabase.GetObjectLatestVersion) (_ metabase.Object, err error)
// GetSegmentByPosition returns a information about segment which covers specified offset.
GetSegmentByPosition(ctx context.Context, opts metabase.GetSegmentByPosition) (segment metabase.Segment, err error)
// GetLatestObjectLastSegment returns an object last segment information.
GetLatestObjectLastSegment(ctx context.Context, opts metabase.GetLatestObjectLastSegment) (segment metabase.Segment, err error)
// InternalImplementation returns *metabase.DB.
// TODO: remove.

View File

@ -159,7 +159,7 @@ func (seg *GetSegmentByPosition) Verify() error {
return nil
}
// GetSegmentByPosition returns a segment information.
// GetSegmentByPosition returns a information about segment which covers specified offset.
func (db *DB) GetSegmentByPosition(ctx context.Context, opts GetSegmentByPosition) (segment Segment, err error) {
defer mon.Task()(&ctx)(&err)

View File

@ -135,21 +135,9 @@ func NewEndpoint(log *zap.Logger, metainfo *Service, deletePieces *piecedeletion
// Close closes resources.
func (endpoint *Endpoint) Close() error { return nil }
func calculateSpaceUsed(ptr *pb.Pointer) (segmentSize, totalStored int64) {
inline := ptr.GetInlineSegment()
if inline != nil {
inlineSize := int64(len(inline))
return inlineSize, inlineSize
}
segmentSize = ptr.GetSegmentSize()
remote := ptr.GetRemote()
if remote == nil {
return 0, 0
}
minReq := remote.GetRedundancy().GetMinReq()
pieceSize := segmentSize / int64(minReq)
pieces := remote.GetRemotePieces()
return segmentSize, pieceSize * int64(len(pieces))
func calculateSpaceUsed(segmentSize int64, numberOfPieces int, rs storj.RedundancyScheme) (totalStored int64) {
pieceSize := segmentSize / int64(rs.RequiredShares)
return pieceSize * int64(numberOfPieces)
}
// filterValidPieces filter out the invalid remote pieces held by pointer.
@ -159,58 +147,58 @@ func calculateSpaceUsed(ptr *pb.Pointer) (segmentSize, totalStored int64) {
//
// The method always return a gRPC status error so the caller can directly
// return it to the client.
func (endpoint *Endpoint) filterValidPieces(ctx context.Context, pointer *pb.Pointer, originalLimits []*pb.OrderLimit) (err error) {
defer mon.Task()(&ctx)(&err)
// func (endpoint *Endpoint) filterValidPieces(ctx context.Context, pointer *pb.Pointer, originalLimits []*pb.OrderLimit) (err error) {
// defer mon.Task()(&ctx)(&err)
if pointer.Type != pb.Pointer_REMOTE {
return nil
}
// if pointer.Type != pb.Pointer_REMOTE {
// return nil
// }
// verify that the piece sizes matches what we would expect.
err = endpoint.pointerVerification.VerifySizes(ctx, pointer)
if err != nil {
endpoint.log.Debug("piece sizes are invalid", zap.Error(err))
return rpcstatus.Errorf(rpcstatus.InvalidArgument, "piece sizes are invalid: %v", err)
}
// // verify that the piece sizes matches what we would expect.
// err = endpoint.pointerVerification.VerifySizes(ctx, pointer)
// if err != nil {
// endpoint.log.Debug("piece sizes are invalid", zap.Error(err))
// return rpcstatus.Errorf(rpcstatus.InvalidArgument, "piece sizes are invalid: %v", err)
// }
validPieces, invalidPieces, err := endpoint.pointerVerification.SelectValidPieces(ctx, pointer, originalLimits)
if err != nil {
endpoint.log.Debug("pointer verification failed", zap.Error(err))
return rpcstatus.Errorf(rpcstatus.InvalidArgument, "pointer verification failed: %s", err)
}
// validPieces, invalidPieces, err := endpoint.pointerVerification.SelectValidPieces(ctx, pointer, originalLimits)
// if err != nil {
// endpoint.log.Debug("pointer verification failed", zap.Error(err))
// return rpcstatus.Errorf(rpcstatus.InvalidArgument, "pointer verification failed: %s", err)
// }
remote := pointer.Remote
// remote := pointer.Remote
if int32(len(validPieces)) < remote.Redundancy.SuccessThreshold {
endpoint.log.Debug("Number of valid pieces is less than the success threshold",
zap.Int("totalReceivedPieces", len(remote.RemotePieces)),
zap.Int("validPieces", len(validPieces)),
zap.Int("invalidPieces", len(invalidPieces)),
zap.Int32("successThreshold", remote.Redundancy.SuccessThreshold),
)
// if int32(len(validPieces)) < remote.Redundancy.SuccessThreshold {
// endpoint.log.Debug("Number of valid pieces is less than the success threshold",
// zap.Int("totalReceivedPieces", len(remote.RemotePieces)),
// zap.Int("validPieces", len(validPieces)),
// zap.Int("invalidPieces", len(invalidPieces)),
// zap.Int32("successThreshold", remote.Redundancy.SuccessThreshold),
// )
errMsg := fmt.Sprintf("Number of valid pieces (%d) is less than the success threshold (%d). Found %d invalid pieces",
len(validPieces),
remote.Redundancy.SuccessThreshold,
len(remote.RemotePieces),
)
if len(invalidPieces) > 0 {
errMsg = fmt.Sprintf("%s. Invalid Pieces:", errMsg)
// errMsg := fmt.Sprintf("Number of valid pieces (%d) is less than the success threshold (%d). Found %d invalid pieces",
// len(validPieces),
// remote.Redundancy.SuccessThreshold,
// len(remote.RemotePieces),
// )
// if len(invalidPieces) > 0 {
// errMsg = fmt.Sprintf("%s. Invalid Pieces:", errMsg)
for _, p := range invalidPieces {
errMsg = fmt.Sprintf("%s\nNodeID: %v, PieceNum: %d, Reason: %s",
errMsg, p.NodeID, p.PieceNum, p.Reason,
)
}
}
// for _, p := range invalidPieces {
// errMsg = fmt.Sprintf("%s\nNodeID: %v, PieceNum: %d, Reason: %s",
// errMsg, p.NodeID, p.PieceNum, p.Reason,
// )
// }
// }
return rpcstatus.Error(rpcstatus.InvalidArgument, errMsg)
}
// return rpcstatus.Error(rpcstatus.InvalidArgument, errMsg)
// }
remote.RemotePieces = validPieces
// remote.RemotePieces = validPieces
return nil
}
// return nil
// }
// ProjectInfo returns allowed ProjectInfo for the provided API key.
func (endpoint *Endpoint) ProjectInfo(ctx context.Context, req *pb.ProjectInfoRequest) (_ *pb.ProjectInfoResponse, err error) {
@ -664,25 +652,6 @@ func (endpoint *Endpoint) BeginObject(ctx context.Context, req *pb.ObjectBeginRe
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
if err := endpoint.ensureAttribution(ctx, req.Header, keyInfo, req.Bucket); err != nil {
return nil, err
}
// use only satellite values for Redundancy Scheme
pbRS := endpoint.defaultRS
streamID, err := endpoint.packStreamID(ctx, &internalpb.StreamID{
Bucket: req.Bucket,
EncryptedPath: req.EncryptedPath,
Version: req.Version,
Redundancy: pbRS,
CreationDate: time.Now(),
ExpirationDate: req.ExpiresAt,
})
if err != nil {
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
_, err = endpoint.validateAuth(ctx, req.Header, macaroon.Action{
Op: macaroon.ActionDelete,
Bucket: req.Bucket,
@ -710,6 +679,52 @@ func (endpoint *Endpoint) BeginObject(ctx context.Context, req *pb.ObjectBeginRe
}
}
if err := endpoint.ensureAttribution(ctx, req.Header, keyInfo, req.Bucket); err != nil {
return nil, err
}
// use only satellite values for Redundancy Scheme
pbRS := endpoint.defaultRS
streamID, err := uuid.New()
if err != nil {
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
// TODO this will work only with newsest uplink
// figue out what to do with this
encryptionParameters := storj.EncryptionParameters{
CipherSuite: storj.CipherSuite(req.EncryptionParameters.CipherSuite),
BlockSize: int32(req.EncryptionParameters.BlockSize), // TODO check conversion
}
_, err = endpoint.metainfo.metabaseDB.BeginObjectExactVersion(ctx, metabase.BeginObjectExactVersion{
ObjectStream: metabase.ObjectStream{
ProjectID: keyInfo.ProjectID,
BucketName: string(req.Bucket),
ObjectKey: metabase.ObjectKey(req.EncryptedPath),
StreamID: streamID,
Version: metabase.Version(1),
},
Encryption: encryptionParameters,
})
if err != nil {
endpoint.log.Error("internal", zap.Error(err))
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
satStreamID, err := endpoint.packStreamID(ctx, &internalpb.StreamID{
Bucket: req.Bucket,
EncryptedPath: req.EncryptedPath,
Version: req.Version,
Redundancy: pbRS,
CreationDate: time.Now(),
ExpirationDate: req.ExpiresAt,
StreamId: streamID[:],
})
if err != nil {
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
endpoint.log.Info("Object Upload", zap.Stringer("Project ID", keyInfo.ProjectID), zap.String("operation", "put"), zap.String("type", "object"))
mon.Meter("req_put_object").Mark(1)
@ -717,7 +732,7 @@ func (endpoint *Endpoint) BeginObject(ctx context.Context, req *pb.ObjectBeginRe
Bucket: req.Bucket,
EncryptedPath: req.EncryptedPath,
Version: req.Version,
StreamId: streamID,
StreamId: satStreamID,
RedundancyScheme: pbRS,
}, nil
}
@ -751,55 +766,27 @@ func (endpoint *Endpoint) commitObject(ctx context.Context, req *pb.ObjectCommit
if metadataSize > endpoint.config.MaxMetadataSize {
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, fmt.Sprintf("Metadata is too large, got %v, maximum allowed is %v", metadataSize, endpoint.config.MaxMetadataSize))
}
streamMeta := pb.StreamMeta{}
err = pb.Unmarshal(req.EncryptedMetadata, &streamMeta)
id, err := uuid.FromBytes(streamID.StreamId)
if err != nil {
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, "invalid metadata structure")
endpoint.log.Error("internal", zap.Error(err))
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
lastSegmentPointer := pointer
if pointer == nil {
lastSegmentIndex := streamMeta.NumberOfSegments - 1
lastSegmentLocation, err := CreatePath(ctx, keyInfo.ProjectID, lastSegmentIndex, streamID.Bucket, streamID.EncryptedPath)
if err != nil {
return nil, rpcstatus.Errorf(rpcstatus.InvalidArgument, "unable to create segment path: %s", err.Error())
}
var lastSegmentPointerBytes []byte
lastSegmentKey := lastSegmentLocation.Encode()
lastSegmentPointerBytes, lastSegmentPointer, err = endpoint.metainfo.GetWithBytes(ctx, lastSegmentKey)
if err != nil {
endpoint.log.Error("unable to get pointer", zap.ByteString("segmentPath", lastSegmentKey), zap.Error(err))
return nil, rpcstatus.Error(rpcstatus.Internal, "unable to commit object")
}
if lastSegmentPointer == nil {
return nil, rpcstatus.Errorf(rpcstatus.NotFound, "unable to find object: %q/%q", streamID.Bucket, streamID.EncryptedPath)
}
err = endpoint.metainfo.Delete(ctx, lastSegmentKey, lastSegmentPointerBytes)
if err != nil {
endpoint.log.Error("unable to delete pointer", zap.ByteString("segmentPath", lastSegmentKey), zap.Error(err))
return nil, rpcstatus.Error(rpcstatus.Internal, "unable to commit object")
}
}
if lastSegmentPointer.Remote == nil {
lastSegmentPointer.Remote = &pb.RemoteSegment{}
}
// RS is set always for last segment to emulate RS per object
lastSegmentPointer.Remote.Redundancy = streamID.Redundancy
lastSegmentPointer.Metadata = req.EncryptedMetadata
lastSegmentLocation, err := CreatePath(ctx, keyInfo.ProjectID, int64(metabase.LastSegmentIndex), streamID.Bucket, streamID.EncryptedPath)
_, err = endpoint.metainfo.metabaseDB.CommitObject(ctx, metabase.CommitObject{
ObjectStream: metabase.ObjectStream{
ProjectID: keyInfo.ProjectID,
BucketName: string(streamID.Bucket),
ObjectKey: metabase.ObjectKey(streamID.EncryptedPath),
StreamID: id,
Version: metabase.Version(1),
},
EncryptedMetadata: req.EncryptedMetadata,
EncryptedMetadataNonce: req.EncryptedMetadataNonce[:],
})
if err != nil {
endpoint.log.Error("unable to create path", zap.Error(err))
return nil, rpcstatus.Error(rpcstatus.Internal, "unable to commit object")
}
err = endpoint.metainfo.UnsynchronizedPut(ctx, lastSegmentLocation.Encode(), lastSegmentPointer)
if err != nil {
endpoint.log.Error("unable to put pointer", zap.Error(err))
return nil, rpcstatus.Error(rpcstatus.Internal, "unable to commit object")
endpoint.log.Error("internal", zap.Error(err))
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
return &pb.ObjectCommitResponse{}, nil
@ -838,14 +825,18 @@ func (endpoint *Endpoint) GetObject(ctx context.Context, req *pb.ObjectGetReques
}
func (endpoint *Endpoint) getObject(ctx context.Context, projectID uuid.UUID, bucket, encryptedPath []byte, version int32) (*pb.Object, error) {
pointer, _, err := endpoint.getPointer(ctx, projectID, metabase.LastSegmentIndex, bucket, encryptedPath)
if err != nil {
return nil, err
}
streamMeta := &pb.StreamMeta{}
err = pb.Unmarshal(pointer.Metadata, streamMeta)
metaObject, err := endpoint.metainfo.metabaseDB.GetObjectLatestVersion(ctx, metabase.GetObjectLatestVersion{
ObjectLocation: metabase.ObjectLocation{
ProjectID: projectID,
BucketName: string(bucket),
ObjectKey: metabase.ObjectKey(encryptedPath),
},
})
if err != nil {
if storj.ErrObjectNotFound.Has(err) {
return nil, rpcstatus.Error(rpcstatus.NotFound, err.Error())
}
endpoint.log.Error("internal", zap.Error(err))
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
@ -854,61 +845,32 @@ func (endpoint *Endpoint) getObject(ctx context.Context, projectID uuid.UUID, bu
EncryptedPath: encryptedPath,
Version: version,
CreationDate: time.Now(),
StreamId: metaObject.StreamID[:],
})
if err != nil {
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
expires := time.Time{}
if metaObject.ExpiresAt != nil {
expires = *metaObject.ExpiresAt
}
object := &pb.Object{
Bucket: bucket,
EncryptedPath: encryptedPath,
Version: -1,
Version: int32(metaObject.Version), // TODO incomatible types
StreamId: streamID,
ExpiresAt: pointer.ExpirationDate,
CreatedAt: pointer.CreationDate,
EncryptedMetadata: pointer.Metadata,
ExpiresAt: expires,
CreatedAt: metaObject.CreatedAt,
EncryptedMetadata: metaObject.EncryptedMetadata,
EncryptionParameters: &pb.EncryptionParameters{
CipherSuite: pb.CipherSuite(streamMeta.EncryptionType),
BlockSize: int64(streamMeta.EncryptionBlockSize),
CipherSuite: pb.CipherSuite(metaObject.Encryption.CipherSuite),
BlockSize: int64(metaObject.Encryption.BlockSize),
},
}
if pointer.Remote != nil {
object.RedundancyScheme = pointer.Remote.Redundancy
// NumberOfSegments == 0 - pointer with encrypted num of segments
// NumberOfSegments > 1 - pointer with unencrypted num of segments and multiple segments
} else if streamMeta.NumberOfSegments == 0 || streamMeta.NumberOfSegments > 1 {
// workaround
// The new metainfo API redundancy scheme is on object level (not per segment).
// Because of that, RS is always taken from the last segment.
// The old implementation saves RS per segment, and in some cases
// when the remote file's last segment is an inline segment, we end up
// missing an RS scheme. This loop will search for RS in segments other than the last one.
index := int64(0)
for {
location, err := CreatePath(ctx, projectID, index, bucket, encryptedPath)
if err != nil {
endpoint.log.Error("unable to get pointer path", zap.Error(err))
return nil, rpcstatus.Error(rpcstatus.Internal, "unable to get object")
}
pointer, err = endpoint.metainfo.Get(ctx, location.Encode())
if err != nil {
if storj.ErrObjectNotFound.Has(err) {
break
}
endpoint.log.Error("unable to get pointer", zap.Error(err))
return nil, rpcstatus.Error(rpcstatus.Internal, "unable to get object")
}
if pointer.Remote != nil {
object.RedundancyScheme = pointer.Remote.Redundancy
break
}
index++
}
// TODO extend DownloadSegment response to provide RS values for client
RedundancyScheme: endpoint.defaultRS,
}
return object, nil
@ -1195,8 +1157,41 @@ func (endpoint *Endpoint) BeginSegment(ctx context.Context, req *pb.SegmentBegin
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
id, err := uuid.FromBytes(streamID.StreamId)
if err != nil {
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
pieces := metabase.Pieces{}
for i, limit := range addressedLimits {
pieces = append(pieces, metabase.Piece{
Number: uint16(i),
StorageNode: limit.Limit.StorageNodeId,
})
}
err = endpoint.metainfo.metabaseDB.BeginSegment(ctx, metabase.BeginSegment{
ObjectStream: metabase.ObjectStream{
ProjectID: keyInfo.ProjectID,
BucketName: string(streamID.Bucket),
ObjectKey: metabase.ObjectKey(streamID.EncryptedPath),
StreamID: id,
Version: 1,
},
Position: metabase.SegmentPosition{
Part: uint32(req.Position.PartNumber),
Index: uint32(req.Position.Index),
},
RootPieceID: rootPieceID,
Pieces: pieces,
})
if err != nil {
endpoint.log.Error("internal", zap.Error(err))
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
segmentID, err := endpoint.packSegmentID(ctx, &internalpb.SegmentID{
StreamId: streamID,
PartNumber: req.Position.PartNumber,
Index: req.Position.Index,
OriginalOrderLimits: addressedLimits,
RootPieceId: rootPieceID,
@ -1221,7 +1216,7 @@ func (endpoint *Endpoint) CommitSegment(ctx context.Context, req *pb.SegmentComm
return resp, err
}
func (endpoint *Endpoint) commitSegment(ctx context.Context, req *pb.SegmentCommitRequest, savePointer bool) (pointer *pb.Pointer, resp *pb.SegmentCommitResponse, err error) {
func (endpoint *Endpoint) commitSegment(ctx context.Context, req *pb.SegmentCommitRequest, savePointer bool) (_ *pb.Pointer, resp *pb.SegmentCommitResponse, err error) {
defer mon.Task()(&ctx)(&err)
segmentID, err := endpoint.unmarshalSatSegmentID(ctx, req.SegmentId)
@ -1253,55 +1248,22 @@ func (endpoint *Endpoint) commitSegment(ctx context.Context, req *pb.SegmentComm
)
}
pieces := make([]*pb.RemotePiece, len(req.UploadResult))
for i, result := range req.UploadResult {
pieces[i] = &pb.RemotePiece{
PieceNum: result.PieceNum,
NodeId: result.NodeId,
Hash: result.Hash,
}
}
remote := &pb.RemoteSegment{
Redundancy: streamID.Redundancy,
RootPieceId: segmentID.RootPieceId,
RemotePieces: pieces,
}
// TODO bring back validation
metadata, err := pb.Marshal(&pb.SegmentMeta{
EncryptedKey: req.EncryptedKey,
KeyNonce: req.EncryptedKeyNonce.Bytes(),
})
if err != nil {
endpoint.log.Error("unable to marshal segment metadata", zap.Error(err))
return nil, nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
// orderLimits := make([]*pb.OrderLimit, len(segmentID.OriginalOrderLimits))
// for i, orderLimit := range segmentID.OriginalOrderLimits {
// orderLimits[i] = orderLimit.Limit
// }
pointer = &pb.Pointer{
Type: pb.Pointer_REMOTE,
Remote: remote,
SegmentSize: req.SizeEncryptedData,
// err = endpoint.validatePointer(ctx, pointer, orderLimits)
// if err != nil {
// return nil, nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
// }
CreationDate: streamID.CreationDate,
ExpirationDate: streamID.ExpirationDate,
Metadata: metadata,
PieceHashesVerified: true,
}
orderLimits := make([]*pb.OrderLimit, len(segmentID.OriginalOrderLimits))
for i, orderLimit := range segmentID.OriginalOrderLimits {
orderLimits[i] = orderLimit.Limit
}
err = endpoint.validatePointer(ctx, pointer, orderLimits)
if err != nil {
return nil, nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
}
err = endpoint.filterValidPieces(ctx, pointer, orderLimits)
if err != nil {
return nil, nil, err
}
// err = endpoint.filterValidPieces(ctx, pointer, orderLimits)
// if err != nil {
// return nil, nil, err
// }
exceeded, limit, err := endpoint.projectUsage.ExceedsStorageUsage(ctx, keyInfo.ProjectID)
if err != nil {
@ -1315,26 +1277,38 @@ func (endpoint *Endpoint) commitSegment(ctx context.Context, req *pb.SegmentComm
return nil, nil, rpcstatus.Error(rpcstatus.ResourceExhausted, "Exceeded Usage Limit")
}
// clear hashes so we don't store them
for _, piece := range pointer.GetRemote().GetRemotePieces() {
piece.Hash = nil
pieces := metabase.Pieces{}
for _, result := range req.UploadResult {
pieces = append(pieces, metabase.Piece{
Number: uint16(result.PieceNum),
StorageNode: result.NodeId,
})
}
segmentSize, totalStored := calculateSpaceUsed(pointer)
rs := storj.RedundancyScheme{
Algorithm: storj.RedundancyAlgorithm(streamID.Redundancy.Type),
RequiredShares: int16(streamID.Redundancy.MinReq),
RepairShares: int16(streamID.Redundancy.RepairThreshold),
OptimalShares: int16(streamID.Redundancy.SuccessThreshold),
TotalShares: int16(streamID.Redundancy.Total),
ShareSize: streamID.Redundancy.ErasureShareSize,
}
segmentSize := req.SizeEncryptedData
totalStored := calculateSpaceUsed(segmentSize, len(pieces), rs)
// ToDo: Replace with hash & signature validation
// Ensure neither uplink or storage nodes are cheating on us
if pointer.Type == pb.Pointer_REMOTE {
// We cannot have more redundancy than total/min
if float64(totalStored) > (float64(pointer.SegmentSize)/float64(pointer.Remote.Redundancy.MinReq))*float64(pointer.Remote.Redundancy.Total) {
endpoint.log.Debug("data size mismatch",
zap.Int64("segment", pointer.SegmentSize),
zap.Int64("pieces", totalStored),
zap.Int32("redundancy minimum requested", pointer.Remote.Redundancy.MinReq),
zap.Int32("redundancy total", pointer.Remote.Redundancy.Total),
)
return nil, nil, rpcstatus.Error(rpcstatus.InvalidArgument, "mismatched segment size and piece usage")
}
// We cannot have more redundancy than total/min
if float64(totalStored) > (float64(segmentSize)/float64(rs.RequiredShares))*float64(rs.TotalShares) {
endpoint.log.Debug("data size mismatch",
zap.Int64("segment", segmentSize),
zap.Int64("pieces", totalStored),
zap.Int16("redundancy minimum requested", rs.RequiredShares),
zap.Int16("redundancy total", rs.TotalShares),
)
return nil, nil, rpcstatus.Error(rpcstatus.InvalidArgument, "mismatched segment size and piece usage")
}
if err := endpoint.projectUsage.AddProjectStorageUsage(ctx, keyInfo.ProjectID, segmentSize); err != nil {
@ -1346,20 +1320,41 @@ func (endpoint *Endpoint) commitSegment(ctx context.Context, req *pb.SegmentComm
// that will be affected is our per-project bandwidth and storage limits.
}
if savePointer {
location, err := CreatePath(ctx, keyInfo.ProjectID, int64(segmentID.Index), streamID.Bucket, streamID.EncryptedPath)
if err != nil {
return nil, nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
}
err = endpoint.metainfo.UnsynchronizedPut(ctx, location.Encode(), pointer)
if err != nil {
return nil, nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
id, err := uuid.FromBytes(streamID.StreamId)
if err != nil {
return nil, nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
return pointer, &pb.SegmentCommitResponse{
SuccessfulPieces: int32(len(pointer.Remote.RemotePieces)),
err = endpoint.metainfo.metabaseDB.CommitSegment(ctx, metabase.CommitSegment{
ObjectStream: metabase.ObjectStream{
ProjectID: keyInfo.ProjectID,
BucketName: string(streamID.Bucket),
ObjectKey: metabase.ObjectKey(streamID.EncryptedPath),
StreamID: id,
Version: 1,
},
EncryptedKey: req.EncryptedKey,
EncryptedKeyNonce: req.EncryptedKeyNonce[:],
EncryptedSize: int32(req.SizeEncryptedData), // TODO verify int32 vs int64
Position: metabase.SegmentPosition{
Part: uint32(segmentID.PartNumber),
Index: uint32(segmentID.Index),
},
RootPieceID: segmentID.RootPieceId,
Redundancy: rs,
Pieces: pieces,
PlainSize: 1, // TODO
})
if err != nil {
endpoint.log.Error("internal", zap.Error(err))
return nil, nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
return nil, &pb.SegmentCommitResponse{
SuccessfulPieces: int32(len(pieces)),
}, nil
}
@ -1506,53 +1501,48 @@ func (endpoint *Endpoint) DownloadSegment(ctx context.Context, req *pb.SegmentDo
return nil, rpcstatus.Error(rpcstatus.ResourceExhausted, "Exceeded Usage Limit")
}
pointer, _, err := endpoint.getPointer(ctx, keyInfo.ProjectID, int64(req.CursorPosition.Index), streamID.Bucket, streamID.EncryptedPath)
id, err := uuid.FromBytes(streamID.StreamId)
if err != nil {
return nil, err
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
var segment metabase.Segment
if req.CursorPosition.PartNumber == 0 && req.CursorPosition.Index == -1 {
segment, err = endpoint.metainfo.metabaseDB.GetLatestObjectLastSegment(ctx, metabase.GetLatestObjectLastSegment{
ObjectLocation: metabase.ObjectLocation{
ProjectID: keyInfo.ProjectID,
BucketName: string(streamID.Bucket),
ObjectKey: metabase.ObjectKey(streamID.EncryptedPath),
},
})
} else {
segment, err = endpoint.metainfo.metabaseDB.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
StreamID: id,
Position: metabase.SegmentPosition{
Part: uint32(req.CursorPosition.PartNumber),
Index: uint32(req.CursorPosition.Index),
},
})
}
if err != nil {
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
// Update the current bandwidth cache value incrementing the SegmentSize.
err = endpoint.projectUsage.UpdateProjectBandwidthUsage(ctx, keyInfo.ProjectID, pointer.SegmentSize)
err = endpoint.projectUsage.UpdateProjectBandwidthUsage(ctx, keyInfo.ProjectID, int64(segment.EncryptedSize))
if err != nil {
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
segmentID, err := endpoint.packSegmentID(ctx, &internalpb.SegmentID{})
encryptedKeyNonce, err := storj.NonceFromBytes(segment.EncryptedKeyNonce)
if err != nil {
endpoint.log.Error("unable to get encryption key nonce from metadata", zap.Error(err))
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
var encryptedKeyNonce storj.Nonce
var encryptedKey []byte
if len(pointer.Metadata) != 0 {
var segmentMeta *pb.SegmentMeta
if req.CursorPosition.Index == metabase.LastSegmentIndex {
streamMeta := &pb.StreamMeta{}
err = pb.Unmarshal(pointer.Metadata, streamMeta)
if err != nil {
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
segmentMeta = streamMeta.LastSegmentMeta
} else {
segmentMeta = &pb.SegmentMeta{}
err = pb.Unmarshal(pointer.Metadata, segmentMeta)
if err != nil {
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
}
if segmentMeta != nil {
encryptedKeyNonce, err = storj.NonceFromBytes(segmentMeta.KeyNonce)
if err != nil {
endpoint.log.Error("unable to get encryption key nonce from metadata", zap.Error(err))
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
encryptedKey = segmentMeta.EncryptedKey
}
}
if pointer.Type == pb.Pointer_INLINE {
err := endpoint.orders.UpdateGetInlineOrder(ctx, bucket, int64(len(pointer.InlineSegment)))
if segment.Redundancy.IsZero() { // TODO maybe add method Segment.Inline() bool
// Inline segment
err := endpoint.orders.UpdateGetInlineOrder(ctx, bucket, int64(len(segment.InlineData)))
if err != nil {
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
@ -1560,50 +1550,47 @@ func (endpoint *Endpoint) DownloadSegment(ctx context.Context, req *pb.SegmentDo
mon.Meter("req_get_inline").Mark(1)
return &pb.SegmentDownloadResponse{
SegmentId: segmentID,
SegmentSize: pointer.SegmentSize,
EncryptedInlineData: pointer.InlineSegment,
SegmentSize: int64(segment.EncryptedSize),
EncryptedInlineData: segment.InlineData,
EncryptedKeyNonce: encryptedKeyNonce,
EncryptedKey: encryptedKey,
}, nil
} else if pointer.Type == pb.Pointer_REMOTE && pointer.Remote != nil {
limits, privateKey, err := endpoint.orders.CreateGetOrderLimits(ctx, bucket, pointer)
if err != nil {
if orders.ErrDownloadFailedNotEnoughPieces.Has(err) {
endpoint.log.Error("Unable to create order limits.",
zap.Stringer("Project ID", keyInfo.ProjectID),
zap.Stringer("API Key ID", keyInfo.ID),
zap.Error(err),
)
}
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
limits = sortLimits(limits, pointer)
// workaround to avoid sending nil values on top level
for i := range limits {
if limits[i] == nil {
limits[i] = &pb.AddressedOrderLimit{}
}
}
endpoint.log.Info("Segment Download", zap.Stringer("Project ID", keyInfo.ProjectID), zap.String("operation", "get"), zap.String("type", "remote"))
mon.Meter("req_get_remote").Mark(1)
return &pb.SegmentDownloadResponse{
SegmentId: segmentID,
AddressedLimits: limits,
PrivateKey: privateKey,
SegmentSize: pointer.SegmentSize,
EncryptedKeyNonce: encryptedKeyNonce,
EncryptedKey: encryptedKey,
EncryptedKey: segment.EncryptedKey,
}, nil
}
return &pb.SegmentDownloadResponse{}, rpcstatus.Error(rpcstatus.Internal, "invalid type of pointer")
// Remote segment
limits, privateKey, err := endpoint.orders.CreateGetOrderLimits2(ctx, bucket, segment)
if err != nil {
if orders.ErrDownloadFailedNotEnoughPieces.Has(err) {
endpoint.log.Error("Unable to create order limits.",
zap.Stringer("Project ID", keyInfo.ProjectID),
zap.Stringer("API Key ID", keyInfo.ID),
zap.Error(err),
)
}
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
limits = sortLimits(limits, segment)
// workaround to avoid sending nil values on top level
for i := range limits {
if limits[i] == nil {
limits[i] = &pb.AddressedOrderLimit{}
}
}
endpoint.log.Info("Segment Download", zap.Stringer("Project ID", keyInfo.ProjectID), zap.String("operation", "get"), zap.String("type", "remote"))
mon.Meter("req_get_remote").Mark(1)
return &pb.SegmentDownloadResponse{
AddressedLimits: limits,
PrivateKey: privateKey,
SegmentSize: int64(segment.EncryptedSize),
EncryptedKeyNonce: encryptedKeyNonce,
EncryptedKey: segment.EncryptedKey,
}, nil
}
// getPointer returns the pointer and the segment path projectID, bucket and
@ -1630,10 +1617,10 @@ func (endpoint *Endpoint) getPointer(
}
// sortLimits sorts order limits and fill missing ones with nil values.
func sortLimits(limits []*pb.AddressedOrderLimit, pointer *pb.Pointer) []*pb.AddressedOrderLimit {
sorted := make([]*pb.AddressedOrderLimit, pointer.GetRemote().GetRedundancy().GetTotal())
for _, piece := range pointer.GetRemote().GetRemotePieces() {
sorted[piece.GetPieceNum()] = getLimitByStorageNodeID(limits, piece.NodeId)
func sortLimits(limits []*pb.AddressedOrderLimit, segment metabase.Segment) []*pb.AddressedOrderLimit {
sorted := make([]*pb.AddressedOrderLimit, segment.Redundancy.TotalShares)
for _, piece := range segment.Pieces {
sorted[piece.Number] = getLimitByStorageNodeID(limits, piece.StorageNode)
}
return sorted
}

View File

@ -1512,8 +1512,11 @@ func TestCommitObjectMetadataSize(t *testing.T) {
OptimalShares: 3,
TotalShares: 4,
},
EncryptionParameters: storj.EncryptionParameters{},
ExpiresAt: time.Now().Add(24 * time.Hour),
EncryptionParameters: storj.EncryptionParameters{
BlockSize: 256,
CipherSuite: storj.EncNull,
},
ExpiresAt: time.Now().Add(24 * time.Hour),
}
beginObjectResponse, err := metainfoClient.BeginObject(ctx, params)
require.NoError(t, err)
@ -1554,6 +1557,9 @@ func TestCommitObjectMetadataSize(t *testing.T) {
}
err = metainfoClient.CommitSegment(ctx, metainfo.CommitSegmentParams{
SegmentID: segmentID,
Encryption: storj.SegmentEncryption{
EncryptedKey: []byte{1},
},
SizeEncryptedData: memory.MiB.Int64(),
UploadResult: []*pb.SegmentPieceUploadResult{

View File

@ -8,13 +8,11 @@ import (
"context"
"crypto/subtle"
"regexp"
"time"
"github.com/zeebo/errs"
"go.uber.org/zap"
"golang.org/x/time/rate"
"storj.io/common/encryption"
"storj.io/common/macaroon"
"storj.io/common/pb"
"storj.io/common/rpc/rpcstatus"
@ -204,91 +202,91 @@ func isDigit(r byte) bool {
return r >= '0' && r <= '9'
}
func (endpoint *Endpoint) validatePointer(ctx context.Context, pointer *pb.Pointer, originalLimits []*pb.OrderLimit) (err error) {
defer mon.Task()(&ctx)(&err)
// func (endpoint *Endpoint) validatePointer(ctx context.Context, pointer *pb.Pointer, originalLimits []*pb.OrderLimit) (err error) {
// defer mon.Task()(&ctx)(&err)
if pointer == nil {
return Error.New("no pointer specified")
}
// if pointer == nil {
// return Error.New("no pointer specified")
// }
if pointer.Type == pb.Pointer_INLINE && pointer.Remote != nil {
return Error.New("pointer type is INLINE but remote segment is set")
}
// if pointer.Type == pb.Pointer_INLINE && pointer.Remote != nil {
// return Error.New("pointer type is INLINE but remote segment is set")
// }
if pointer.Type == pb.Pointer_REMOTE {
switch {
case pointer.Remote == nil:
return Error.New("no remote segment specified")
case pointer.Remote.RemotePieces == nil:
return Error.New("no remote segment pieces specified")
case pointer.Remote.Redundancy == nil:
return Error.New("no redundancy scheme specified")
}
// if pointer.Type == pb.Pointer_REMOTE {
// switch {
// case pointer.Remote == nil:
// return Error.New("no remote segment specified")
// case pointer.Remote.RemotePieces == nil:
// return Error.New("no remote segment pieces specified")
// case pointer.Remote.Redundancy == nil:
// return Error.New("no redundancy scheme specified")
// }
remote := pointer.Remote
// remote := pointer.Remote
if len(originalLimits) == 0 {
return Error.New("no order limits")
}
if int32(len(originalLimits)) != remote.Redundancy.Total {
return Error.New("invalid no order limit for piece")
}
// if len(originalLimits) == 0 {
// return Error.New("no order limits")
// }
// if int32(len(originalLimits)) != remote.Redundancy.Total {
// return Error.New("invalid no order limit for piece")
// }
maxAllowed, err := encryption.CalcEncryptedSize(endpoint.config.MaxSegmentSize.Int64(), storj.EncryptionParameters{
CipherSuite: storj.EncAESGCM,
BlockSize: 128, // intentionally low block size to allow maximum possible encryption overhead
})
if err != nil {
return err
}
// maxAllowed, err := encryption.CalcEncryptedSize(endpoint.config.MaxSegmentSize.Int64(), storj.EncryptionParameters{
// CipherSuite: storj.EncAESGCM,
// BlockSize: 128, // intentionally low block size to allow maximum possible encryption overhead
// })
// if err != nil {
// return err
// }
if pointer.SegmentSize > maxAllowed || pointer.SegmentSize < 0 {
return Error.New("segment size %v is out of range, maximum allowed is %v", pointer.SegmentSize, maxAllowed)
}
// if pointer.SegmentSize > maxAllowed || pointer.SegmentSize < 0 {
// return Error.New("segment size %v is out of range, maximum allowed is %v", pointer.SegmentSize, maxAllowed)
// }
pieceNums := make(map[int32]struct{})
nodeIds := make(map[storj.NodeID]struct{})
for _, piece := range remote.RemotePieces {
if piece.PieceNum >= int32(len(originalLimits)) {
return Error.New("invalid piece number")
}
// pieceNums := make(map[int32]struct{})
// nodeIds := make(map[storj.NodeID]struct{})
// for _, piece := range remote.RemotePieces {
// if piece.PieceNum >= int32(len(originalLimits)) {
// return Error.New("invalid piece number")
// }
limit := originalLimits[piece.PieceNum]
// limit := originalLimits[piece.PieceNum]
if limit == nil {
return Error.New("empty order limit for piece")
}
// if limit == nil {
// return Error.New("empty order limit for piece")
// }
err := endpoint.orders.VerifyOrderLimitSignature(ctx, limit)
if err != nil {
return err
}
// err := endpoint.orders.VerifyOrderLimitSignature(ctx, limit)
// if err != nil {
// return err
// }
// expect that too much time has not passed between order limit creation and now
if time.Since(limit.OrderCreation) > endpoint.config.MaxCommitInterval {
return Error.New("Segment not committed before max commit interval of %f minutes.", endpoint.config.MaxCommitInterval.Minutes())
}
// // expect that too much time has not passed between order limit creation and now
// if time.Since(limit.OrderCreation) > endpoint.config.MaxCommitInterval {
// return Error.New("Segment not committed before max commit interval of %f minutes.", endpoint.config.MaxCommitInterval.Minutes())
// }
derivedPieceID := remote.RootPieceId.Derive(piece.NodeId, piece.PieceNum)
if limit.PieceId.IsZero() || limit.PieceId != derivedPieceID {
return Error.New("invalid order limit piece id")
}
if piece.NodeId != limit.StorageNodeId {
return Error.New("piece NodeID != order limit NodeID")
}
// derivedPieceID := remote.RootPieceId.Derive(piece.NodeId, piece.PieceNum)
// if limit.PieceId.IsZero() || limit.PieceId != derivedPieceID {
// return Error.New("invalid order limit piece id")
// }
// if piece.NodeId != limit.StorageNodeId {
// return Error.New("piece NodeID != order limit NodeID")
// }
if _, ok := pieceNums[piece.PieceNum]; ok {
return Error.New("piece num %d is duplicated", piece.PieceNum)
}
// if _, ok := pieceNums[piece.PieceNum]; ok {
// return Error.New("piece num %d is duplicated", piece.PieceNum)
// }
if _, ok := nodeIds[piece.NodeId]; ok {
return Error.New("node id %s for piece num %d is duplicated", piece.NodeId.String(), piece.PieceNum)
}
// if _, ok := nodeIds[piece.NodeId]; ok {
// return Error.New("node id %s for piece num %d is duplicated", piece.NodeId.String(), piece.PieceNum)
// }
pieceNums[piece.PieceNum] = struct{}{}
nodeIds[piece.NodeId] = struct{}{}
}
}
// pieceNums[piece.PieceNum] = struct{}{}
// nodeIds[piece.NodeId] = struct{}{}
// }
// }
return nil
}
// return nil
// }

View File

@ -203,6 +203,76 @@ func (service *Service) CreateGetOrderLimits(ctx context.Context, bucket metabas
return signer.AddressedLimits, signer.PrivateKey, nil
}
// CreateGetOrderLimits2 creates the order limits for downloading the pieces of pointer.
func (service *Service) CreateGetOrderLimits2(ctx context.Context, bucket metabase.BucketLocation, segment metabase.Segment) (_ []*pb.AddressedOrderLimit, privateKey storj.PiecePrivateKey, err error) {
defer mon.Task()(&ctx)(&err)
redundancy, err := eestream.NewRedundancyStrategyFromStorj(segment.Redundancy)
if err != nil {
return nil, storj.PiecePrivateKey{}, Error.Wrap(err)
}
pieceSize := eestream.CalcPieceSize(int64(segment.EncryptedSize), redundancy)
nodeIDs := make([]storj.NodeID, len(segment.Pieces))
for i, piece := range segment.Pieces {
nodeIDs[i] = piece.StorageNode
}
nodes, err := service.overlay.GetOnlineNodesForGetDelete(ctx, nodeIDs)
if err != nil {
service.log.Debug("error getting nodes from overlay", zap.Error(err))
return nil, storj.PiecePrivateKey{}, Error.Wrap(err)
}
signer, err := NewSignerGet(service, segment.RootPieceID, time.Now(), pieceSize, bucket)
if err != nil {
return nil, storj.PiecePrivateKey{}, Error.Wrap(err)
}
neededLimits := segment.Redundancy.DownloadNodes()
pieces := segment.Pieces
for _, pieceIndex := range service.perm(len(pieces)) {
piece := pieces[pieceIndex]
node, ok := nodes[piece.StorageNode]
if !ok {
continue
}
address := node.Address.Address
if node.LastIPPort != "" {
address = node.LastIPPort
}
_, err := signer.Sign(ctx, storj.NodeURL{
ID: piece.StorageNode,
Address: address,
}, int32(piece.Number))
if err != nil {
return nil, storj.PiecePrivateKey{}, Error.Wrap(err)
}
if len(signer.AddressedLimits) >= int(neededLimits) {
break
}
}
if len(signer.AddressedLimits) < redundancy.RequiredCount() {
mon.Meter("download_failed_not_enough_pieces_uplink").Mark(1) //mon:locked
return nil, storj.PiecePrivateKey{}, ErrDownloadFailedNotEnoughPieces.New("not enough orderlimits: got %d, required %d", len(signer.AddressedLimits), redundancy.RequiredCount())
}
err = service.saveSerial(ctx, signer.Serial, bucket, signer.OrderExpiration)
if err != nil {
return nil, storj.PiecePrivateKey{}, Error.Wrap(err)
}
if err := service.updateBandwidth(ctx, bucket, signer.AddressedLimits...); err != nil {
return nil, storj.PiecePrivateKey{}, Error.Wrap(err)
}
return signer.AddressedLimits, signer.PrivateKey, nil
}
func (service *Service) perm(n int) []int {
service.rngMu.Lock()
defer service.rngMu.Unlock()