satellite/metainfo: Implement GetObjectIPs
Change-Id: Ibabbe7c555b790498d28a6ac4c95fcf2f7376978
This commit is contained in:
parent
2d01dd9732
commit
351aa70eb7
@ -1116,7 +1116,84 @@ func (endpoint *Endpoint) FinishDeleteObject(ctx context.Context, req *pb.Object
|
|||||||
func (endpoint *Endpoint) GetObjectIPs(ctx context.Context, req *pb.ObjectGetIPsRequest) (resp *pb.ObjectGetIPsResponse, err error) {
|
func (endpoint *Endpoint) GetObjectIPs(ctx context.Context, req *pb.ObjectGetIPsRequest) (resp *pb.ObjectGetIPsResponse, err error) {
|
||||||
defer mon.Task()(&ctx)(&err)
|
defer mon.Task()(&ctx)(&err)
|
||||||
|
|
||||||
return nil, rpcstatus.Error(rpcstatus.Unimplemented, "GetObjectIPs unimplemented")
|
keyInfo, err := endpoint.validateAuth(ctx, req.Header, macaroon.Action{
|
||||||
|
Op: macaroon.ActionRead,
|
||||||
|
Bucket: req.Bucket,
|
||||||
|
EncryptedPath: req.EncryptedPath,
|
||||||
|
Time: time.Now(),
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = endpoint.validateBucket(ctx, req.Bucket)
|
||||||
|
if err != nil {
|
||||||
|
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
lastPointer, _, err := endpoint.getPointer(ctx, keyInfo.ProjectID, lastSegment, req.Bucket, req.EncryptedPath)
|
||||||
|
if err != nil {
|
||||||
|
// endpoint.getPointer already returns valid rpcstatus errors
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var nodeIDs []storj.NodeID
|
||||||
|
addPointerToNodeIDs := func(pointer *pb.Pointer) {
|
||||||
|
if pointer.Remote != nil {
|
||||||
|
for _, piece := range pointer.Remote.RemotePieces {
|
||||||
|
nodeIDs = append(nodeIDs, piece.NodeId)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
addPointerToNodeIDs(lastPointer)
|
||||||
|
|
||||||
|
streamMeta := &pb.StreamMeta{}
|
||||||
|
err = pb.Unmarshal(lastPointer.Metadata, streamMeta)
|
||||||
|
if err != nil {
|
||||||
|
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
numSegmentsKnown := streamMeta.NumberOfSegments > 0
|
||||||
|
numberOfSegmentsToFetch := int(streamMeta.NumberOfSegments) - 1 // remove last segment since we've already fetch manually
|
||||||
|
|
||||||
|
// If we do not know the number of segments from the streamMeta, we want to
|
||||||
|
// continue to run this loop until it cannot find another segment (the
|
||||||
|
// break condition in the loop).
|
||||||
|
//
|
||||||
|
// If we do know the number of segments, we want to run the loop as long as
|
||||||
|
// the numberOfSegmentsToFetch is > 0 and until we have fetched that many
|
||||||
|
// segments.
|
||||||
|
for i := 0; !numSegmentsKnown || (numSegmentsKnown && numberOfSegmentsToFetch > 0 && i+1 >= numberOfSegmentsToFetch); i++ {
|
||||||
|
path, err := CreatePath(ctx, keyInfo.ProjectID, int64(i), req.Bucket, req.EncryptedPath)
|
||||||
|
if err != nil {
|
||||||
|
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
pointer, err := endpoint.metainfo.Get(ctx, path)
|
||||||
|
if err != nil {
|
||||||
|
if storj.ErrObjectNotFound.Has(err) {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
|
||||||
|
}
|
||||||
|
addPointerToNodeIDs(pointer)
|
||||||
|
}
|
||||||
|
|
||||||
|
nodes, err := endpoint.overlay.GetOnlineNodesForGetDelete(ctx, nodeIDs)
|
||||||
|
if err != nil {
|
||||||
|
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
resp = &pb.ObjectGetIPsResponse{}
|
||||||
|
for _, node := range nodes {
|
||||||
|
address := node.Address.GetAddress()
|
||||||
|
if address != "" {
|
||||||
|
resp.Ips = append(resp.Ips, []byte(address))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// BeginSegment begins segment uploading.
|
// BeginSegment begins segment uploading.
|
||||||
|
Loading…
Reference in New Issue
Block a user