satellite/metainfo: Improve docs & use common funcs
* Use unexported existent method in logic that was duplicated in some exported methods. * Log a forgotten internal error. * Improve the documentation adding more and fixing some to fit to our code style conventions. Change-Id: Ie6f8bc59f9089f92b8b0d1b4c09c2142c3f273f5
This commit is contained in:
parent
ff854b3955
commit
770123de10
@ -656,7 +656,7 @@ func (endpoint *Endpoint) mapNodesFor(ctx context.Context, pieces []*pb.RemotePi
|
|||||||
return peerIDMap, nil
|
return peerIDMap, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// CreatePath will create a Segment path
|
// CreatePath creates a Segment path.
|
||||||
func CreatePath(ctx context.Context, projectID uuid.UUID, segmentIndex int64, bucket, path []byte) (_ storj.Path, err error) {
|
func CreatePath(ctx context.Context, projectID uuid.UUID, segmentIndex int64, bucket, path []byte) (_ storj.Path, err error) {
|
||||||
defer mon.Task()(&ctx)(&err)
|
defer mon.Task()(&ctx)(&err)
|
||||||
if segmentIndex < -1 {
|
if segmentIndex < -1 {
|
||||||
@ -1370,7 +1370,7 @@ func (endpoint *Endpoint) ListObjects(ctx context.Context, req *pb.ObjectListReq
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// BeginDeleteObject begins object deletion process
|
// BeginDeleteObject begins object deletion process.
|
||||||
func (endpoint *Endpoint) BeginDeleteObject(ctx context.Context, req *pb.ObjectBeginDeleteRequest) (resp *pb.ObjectBeginDeleteResponse, err error) {
|
func (endpoint *Endpoint) BeginDeleteObject(ctx context.Context, req *pb.ObjectBeginDeleteRequest) (resp *pb.ObjectBeginDeleteResponse, err error) {
|
||||||
defer mon.Task()(&ctx)(&err)
|
defer mon.Task()(&ctx)(&err)
|
||||||
|
|
||||||
@ -1411,7 +1411,7 @@ func (endpoint *Endpoint) BeginDeleteObject(ctx context.Context, req *pb.ObjectB
|
|||||||
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
|
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
_, _, err = endpoint.getPointer(ctx, keyInfo.ProjectID, -1, satStreamID.Bucket, satStreamID.EncryptedPath)
|
_, _, err = endpoint.getPointer(ctx, keyInfo.ProjectID, lastSegment, satStreamID.Bucket, satStreamID.EncryptedPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -1856,17 +1856,9 @@ func (endpoint *Endpoint) ListSegments(ctx context.Context, req *pb.SegmentListR
|
|||||||
limit = listLimit
|
limit = listLimit
|
||||||
}
|
}
|
||||||
|
|
||||||
path, err := CreatePath(ctx, keyInfo.ProjectID, lastSegment, streamID.Bucket, streamID.EncryptedPath)
|
pointer, _, err := endpoint.getPointer(ctx, keyInfo.ProjectID, lastSegment, streamID.Bucket, streamID.EncryptedPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
|
return nil, err
|
||||||
}
|
|
||||||
|
|
||||||
pointer, err := endpoint.metainfo.Get(ctx, path)
|
|
||||||
if err != nil {
|
|
||||||
if storj.ErrObjectNotFound.Has(err) {
|
|
||||||
return nil, rpcstatus.Error(rpcstatus.NotFound, err.Error())
|
|
||||||
}
|
|
||||||
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
streamMeta := &pb.StreamMeta{}
|
streamMeta := &pb.StreamMeta{}
|
||||||
@ -2110,11 +2102,12 @@ func (endpoint *Endpoint) DownloadSegment(ctx context.Context, req *pb.SegmentDo
|
|||||||
return &pb.SegmentDownloadResponse{}, rpcstatus.Error(rpcstatus.Internal, "invalid type of pointer")
|
return &pb.SegmentDownloadResponse{}, rpcstatus.Error(rpcstatus.Internal, "invalid type of pointer")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// getPointer returns the pointer and the segment path projectID, bucket and
|
||||||
|
// encryptedPath. It returns an error with a specific RPC status.
|
||||||
func (endpoint *Endpoint) getPointer(
|
func (endpoint *Endpoint) getPointer(
|
||||||
ctx context.Context, projectID uuid.UUID, segmentIndex int64, bucket, encryptedPath []byte,
|
ctx context.Context, projectID uuid.UUID, segmentIndex int64, bucket, encryptedPath []byte,
|
||||||
) (_ *pb.Pointer, _ string, err error) {
|
) (_ *pb.Pointer, _ string, err error) {
|
||||||
defer mon.Task()(&ctx, projectID.String(), segmentIndex, bucket, encryptedPath)(&err)
|
defer mon.Task()(&ctx, projectID.String(), segmentIndex, bucket, encryptedPath)(&err)
|
||||||
|
|
||||||
path, err := CreatePath(ctx, projectID, segmentIndex, bucket, encryptedPath)
|
path, err := CreatePath(ctx, projectID, segmentIndex, bucket, encryptedPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, "", rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
|
return nil, "", rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
|
||||||
@ -2125,6 +2118,8 @@ func (endpoint *Endpoint) getPointer(
|
|||||||
if storj.ErrObjectNotFound.Has(err) {
|
if storj.ErrObjectNotFound.Has(err) {
|
||||||
return nil, "", rpcstatus.Error(rpcstatus.NotFound, err.Error())
|
return nil, "", rpcstatus.Error(rpcstatus.NotFound, err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
endpoint.log.Error("error getting the pointer from metainfo service", zap.Error(err))
|
||||||
return nil, "", rpcstatus.Error(rpcstatus.Internal, err.Error())
|
return nil, "", rpcstatus.Error(rpcstatus.Internal, err.Error())
|
||||||
}
|
}
|
||||||
return pointer, path, nil
|
return pointer, path, nil
|
||||||
|
@ -166,27 +166,18 @@ func (s *Service) UpdatePiecesCheckDuplicates(ctx context.Context, path string,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get gets pointer from db
|
// Get gets decoded pointer from DB.
|
||||||
func (s *Service) Get(ctx context.Context, path string) (pointer *pb.Pointer, err error) {
|
func (s *Service) Get(ctx context.Context, path string) (_ *pb.Pointer, err error) {
|
||||||
defer mon.Task()(&ctx)(&err)
|
defer mon.Task()(&ctx)(&err)
|
||||||
pointerBytes, err := s.db.Get(ctx, []byte(path))
|
_, pointer, err := s.GetWithBytes(ctx, path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if storage.ErrKeyNotFound.Has(err) {
|
return nil, err
|
||||||
err = storj.ErrObjectNotFound.Wrap(err)
|
|
||||||
}
|
|
||||||
return nil, Error.Wrap(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
pointer = &pb.Pointer{}
|
|
||||||
err = proto.Unmarshal(pointerBytes, pointer)
|
|
||||||
if err != nil {
|
|
||||||
return nil, Error.Wrap(err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return pointer, nil
|
return pointer, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetWithBytes gets pointer from db
|
// GetWithBytes gets the protocol buffers encoded and decoded pointer from the DB.
|
||||||
func (s *Service) GetWithBytes(ctx context.Context, path string) (pointerBytes []byte, pointer *pb.Pointer, err error) {
|
func (s *Service) GetWithBytes(ctx context.Context, path string) (pointerBytes []byte, pointer *pb.Pointer, err error) {
|
||||||
defer mon.Task()(&ctx)(&err)
|
defer mon.Task()(&ctx)(&err)
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user