From 351aa70eb7cc56772a49c023afe25505d79a5f0a Mon Sep 17 00:00:00 2001 From: Isaac Hess Date: Thu, 13 Aug 2020 10:43:21 -0600 Subject: [PATCH] satellite/metainfo: Implement GetObjectIPs Change-Id: Ibabbe7c555b790498d28a6ac4c95fcf2f7376978 --- satellite/metainfo/metainfo.go | 79 +++++++++++++++++++++++++++++++++- 1 file changed, 78 insertions(+), 1 deletion(-) diff --git a/satellite/metainfo/metainfo.go b/satellite/metainfo/metainfo.go index 2ed13d587..89048f5cd 100644 --- a/satellite/metainfo/metainfo.go +++ b/satellite/metainfo/metainfo.go @@ -1116,7 +1116,84 @@ func (endpoint *Endpoint) FinishDeleteObject(ctx context.Context, req *pb.Object func (endpoint *Endpoint) GetObjectIPs(ctx context.Context, req *pb.ObjectGetIPsRequest) (resp *pb.ObjectGetIPsResponse, err error) { defer mon.Task()(&ctx)(&err) - return nil, rpcstatus.Error(rpcstatus.Unimplemented, "GetObjectIPs unimplemented") + keyInfo, err := endpoint.validateAuth(ctx, req.Header, macaroon.Action{ + Op: macaroon.ActionRead, + Bucket: req.Bucket, + EncryptedPath: req.EncryptedPath, + Time: time.Now(), + }) + if err != nil { + return nil, err + } + + err = endpoint.validateBucket(ctx, req.Bucket) + if err != nil { + return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error()) + } + + lastPointer, _, err := endpoint.getPointer(ctx, keyInfo.ProjectID, lastSegment, req.Bucket, req.EncryptedPath) + if err != nil { + // endpoint.getPointer already returns valid rpcstatus errors + return nil, err + } + + var nodeIDs []storj.NodeID + addPointerToNodeIDs := func(pointer *pb.Pointer) { + if pointer.Remote != nil { + for _, piece := range pointer.Remote.RemotePieces { + nodeIDs = append(nodeIDs, piece.NodeId) + } + } + } + + addPointerToNodeIDs(lastPointer) + + streamMeta := &pb.StreamMeta{} + err = pb.Unmarshal(lastPointer.Metadata, streamMeta) + if err != nil { + return nil, rpcstatus.Error(rpcstatus.Internal, err.Error()) + } + + numSegmentsKnown := streamMeta.NumberOfSegments > 0 + numberOfSegmentsToFetch := int(streamMeta.NumberOfSegments) - 1 // remove last segment since we've already fetch manually + + // If we do not know the number of segments from the streamMeta, we want to + // continue to run this loop until it cannot find another segment (the + // break condition in the loop). + // + // If we do know the number of segments, we want to run the loop as long as + // the numberOfSegmentsToFetch is > 0 and until we have fetched that many + // segments. + for i := 0; !numSegmentsKnown || (numSegmentsKnown && numberOfSegmentsToFetch > 0 && i+1 >= numberOfSegmentsToFetch); i++ { + path, err := CreatePath(ctx, keyInfo.ProjectID, int64(i), req.Bucket, req.EncryptedPath) + if err != nil { + return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error()) + } + + pointer, err := endpoint.metainfo.Get(ctx, path) + if err != nil { + if storj.ErrObjectNotFound.Has(err) { + break + } + return nil, rpcstatus.Error(rpcstatus.Internal, err.Error()) + } + addPointerToNodeIDs(pointer) + } + + nodes, err := endpoint.overlay.GetOnlineNodesForGetDelete(ctx, nodeIDs) + if err != nil { + return nil, rpcstatus.Error(rpcstatus.Internal, err.Error()) + } + + resp = &pb.ObjectGetIPsResponse{} + for _, node := range nodes { + address := node.Address.GetAddress() + if address != "" { + resp.Ips = append(resp.Ips, []byte(address)) + } + } + + return resp, nil } // BeginSegment begins segment uploading.