// Copyright (C) 2019 Storj Labs, Inc. // See LICENSE for copying information. package metainfo import ( "context" "crypto/sha256" "errors" "strconv" "time" "github.com/skyrings/skyring-common/tools/uuid" "github.com/zeebo/errs" "go.uber.org/zap" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" monkit "gopkg.in/spacemonkeygo/monkit.v2" "storj.io/storj/pkg/accounting" "storj.io/storj/pkg/eestream" "storj.io/storj/pkg/identity" "storj.io/storj/pkg/macaroon" "storj.io/storj/pkg/overlay" "storj.io/storj/pkg/pb" "storj.io/storj/pkg/storj" "storj.io/storj/satellite/attribution" "storj.io/storj/satellite/console" "storj.io/storj/satellite/orders" "storj.io/storj/storage" ) const pieceHashExpiration = 2 * time.Hour var ( mon = monkit.Package() // Error general metainfo error Error = errs.Class("metainfo error") ) // APIKeys is api keys store methods used by endpoint type APIKeys interface { GetByHead(ctx context.Context, head []byte) (*console.APIKeyInfo, error) } // Revocations is the revocations store methods used by the endpoint type Revocations interface { GetByProjectID(ctx context.Context, projectID uuid.UUID) ([][]byte, error) } // Containment is a copy/paste of containment interface to avoid import cycle error type Containment interface { Delete(ctx context.Context, nodeID pb.NodeID) (bool, error) } // Endpoint metainfo endpoint type Endpoint struct { log *zap.Logger metainfo *Service orders *orders.Service cache *overlay.Cache partnerinfo attribution.DB projectUsage *accounting.ProjectUsage containment Containment apiKeys APIKeys createRequests *createRequests rsConfig RSConfig } // NewEndpoint creates new metainfo endpoint instance func NewEndpoint(log *zap.Logger, metainfo *Service, orders *orders.Service, cache *overlay.Cache, partnerinfo attribution.DB, containment Containment, apiKeys APIKeys, projectUsage *accounting.ProjectUsage, rsConfig RSConfig) *Endpoint { // TODO do something with too many params return &Endpoint{ log: log, metainfo: metainfo, orders: orders, cache: cache, partnerinfo: partnerinfo, containment: containment, apiKeys: apiKeys, projectUsage: projectUsage, createRequests: newCreateRequests(), rsConfig: rsConfig, } } // Close closes resources func (endpoint *Endpoint) Close() error { return nil } // SegmentInfoOld returns segment metadata info func (endpoint *Endpoint) SegmentInfoOld(ctx context.Context, req *pb.SegmentInfoRequestOld) (resp *pb.SegmentInfoResponseOld, err error) { defer mon.Task()(&ctx)(&err) keyInfo, err := endpoint.validateAuth(ctx, macaroon.Action{ Op: macaroon.ActionRead, Bucket: req.Bucket, EncryptedPath: req.Path, Time: time.Now(), }) if err != nil { return nil, status.Errorf(codes.Unauthenticated, err.Error()) } err = endpoint.validateBucket(ctx, req.Bucket) if err != nil { return nil, status.Errorf(codes.InvalidArgument, err.Error()) } path, err := CreatePath(ctx, keyInfo.ProjectID, req.Segment, req.Bucket, req.Path) if err != nil { return nil, status.Errorf(codes.InvalidArgument, err.Error()) } // TODO refactor to use []byte directly pointer, err := endpoint.metainfo.Get(ctx, path) if err != nil { if storage.ErrKeyNotFound.Has(err) { return nil, status.Errorf(codes.NotFound, err.Error()) } return nil, status.Errorf(codes.Internal, err.Error()) } return &pb.SegmentInfoResponseOld{Pointer: pointer}, nil } // CreateSegmentOld will generate requested number of OrderLimit with coresponding node addresses for them func (endpoint *Endpoint) CreateSegmentOld(ctx context.Context, req *pb.SegmentWriteRequestOld) (resp *pb.SegmentWriteResponseOld, err error) { defer mon.Task()(&ctx)(&err) keyInfo, err := endpoint.validateAuth(ctx, macaroon.Action{ Op: macaroon.ActionWrite, Bucket: req.Bucket, EncryptedPath: req.Path, Time: time.Now(), }) if err != nil { return nil, status.Errorf(codes.Unauthenticated, err.Error()) } err = endpoint.validateBucket(ctx, req.Bucket) if err != nil { return nil, status.Errorf(codes.InvalidArgument, err.Error()) } err = endpoint.validateRedundancy(ctx, req.Redundancy) if err != nil { return nil, status.Errorf(codes.InvalidArgument, err.Error()) } exceeded, limit, err := endpoint.projectUsage.ExceedsStorageUsage(ctx, keyInfo.ProjectID) if err != nil { endpoint.log.Error("retrieving project storage totals", zap.Error(err)) } if exceeded { endpoint.log.Sugar().Errorf("monthly project limits are %s of storage and bandwidth usage. This limit has been exceeded for storage for projectID %s", limit, keyInfo.ProjectID, ) return nil, status.Errorf(codes.ResourceExhausted, "Exceeded Usage Limit") } redundancy, err := eestream.NewRedundancyStrategyFromProto(req.GetRedundancy()) if err != nil { return nil, err } maxPieceSize := eestream.CalcPieceSize(req.GetMaxEncryptedSegmentSize(), redundancy) request := overlay.FindStorageNodesRequest{ RequestedCount: int(req.Redundancy.Total), FreeBandwidth: maxPieceSize, FreeDisk: maxPieceSize, } nodes, err := endpoint.cache.FindStorageNodes(ctx, request) if err != nil { return nil, status.Errorf(codes.Internal, err.Error()) } uplinkIdentity, err := identity.PeerIdentityFromContext(ctx) if err != nil { return nil, status.Errorf(codes.Internal, err.Error()) } bucketID := createBucketID(keyInfo.ProjectID, req.Bucket) rootPieceID, addressedLimits, err := endpoint.orders.CreatePutOrderLimits(ctx, uplinkIdentity, bucketID, nodes, req.Expiration, maxPieceSize) if err != nil { return nil, Error.Wrap(err) } if len(addressedLimits) > 0 { endpoint.createRequests.Put(addressedLimits[0].Limit.SerialNumber, &createRequest{ Expiration: req.Expiration, Redundancy: req.Redundancy, }) } return &pb.SegmentWriteResponseOld{AddressedLimits: addressedLimits, RootPieceId: rootPieceID}, nil } func calculateSpaceUsed(ptr *pb.Pointer) (inlineSpace, remoteSpace int64) { inline := ptr.GetInlineSegment() if inline != nil { return int64(len(inline)), 0 } segmentSize := ptr.GetSegmentSize() remote := ptr.GetRemote() if remote == nil { return 0, 0 } minReq := remote.GetRedundancy().GetMinReq() pieceSize := segmentSize / int64(minReq) pieces := remote.GetRemotePieces() return 0, pieceSize * int64(len(pieces)) } // CommitSegmentOld commits segment metadata func (endpoint *Endpoint) CommitSegmentOld(ctx context.Context, req *pb.SegmentCommitRequestOld) (resp *pb.SegmentCommitResponseOld, err error) { defer mon.Task()(&ctx)(&err) keyInfo, err := endpoint.validateAuth(ctx, macaroon.Action{ Op: macaroon.ActionWrite, Bucket: req.Bucket, EncryptedPath: req.Path, Time: time.Now(), }) if err != nil { return nil, status.Errorf(codes.Unauthenticated, err.Error()) } err = endpoint.validateBucket(ctx, req.Bucket) if err != nil { return nil, status.Errorf(codes.InvalidArgument, err.Error()) } err = endpoint.validateCommitSegment(ctx, req) if err != nil { return nil, status.Errorf(codes.Internal, err.Error()) } err = endpoint.filterValidPieces(ctx, req.Pointer, req.OriginalLimits) if err != nil { return nil, status.Errorf(codes.Internal, err.Error()) } path, err := CreatePath(ctx, keyInfo.ProjectID, req.Segment, req.Bucket, req.Path) if err != nil { return nil, status.Errorf(codes.InvalidArgument, err.Error()) } inlineUsed, remoteUsed := calculateSpaceUsed(req.Pointer) if err := endpoint.projectUsage.AddProjectStorageUsage(ctx, keyInfo.ProjectID, inlineUsed, remoteUsed); err != nil { endpoint.log.Sugar().Errorf("Could not track new storage usage by project %v: %v", keyInfo.ProjectID, err) // but continue. it's most likely our own fault that we couldn't track it, and the only thing // that will be affected is our per-project bandwidth and storage limits. } err = endpoint.metainfo.Put(ctx, path, req.Pointer) if err != nil { return nil, status.Errorf(codes.Internal, err.Error()) } if req.Pointer.Type == pb.Pointer_INLINE { // TODO or maybe use pointer.SegmentSize ?? err = endpoint.orders.UpdatePutInlineOrder(ctx, keyInfo.ProjectID, req.Bucket, int64(len(req.Pointer.InlineSegment))) if err != nil { return nil, status.Errorf(codes.Internal, err.Error()) } } pointer, err := endpoint.metainfo.Get(ctx, path) if err != nil { return nil, status.Errorf(codes.Internal, err.Error()) } if len(req.OriginalLimits) > 0 { endpoint.createRequests.Remove(req.OriginalLimits[0].SerialNumber) } return &pb.SegmentCommitResponseOld{Pointer: pointer}, nil } // DownloadSegmentOld gets Pointer incase of INLINE data or list of OrderLimit necessary to download remote data func (endpoint *Endpoint) DownloadSegmentOld(ctx context.Context, req *pb.SegmentDownloadRequestOld) (resp *pb.SegmentDownloadResponseOld, err error) { defer mon.Task()(&ctx)(&err) keyInfo, err := endpoint.validateAuth(ctx, macaroon.Action{ Op: macaroon.ActionRead, Bucket: req.Bucket, EncryptedPath: req.Path, Time: time.Now(), }) if err != nil { return nil, status.Errorf(codes.Unauthenticated, err.Error()) } err = endpoint.validateBucket(ctx, req.Bucket) if err != nil { return nil, status.Errorf(codes.InvalidArgument, err.Error()) } bucketID := createBucketID(keyInfo.ProjectID, req.Bucket) exceeded, limit, err := endpoint.projectUsage.ExceedsBandwidthUsage(ctx, keyInfo.ProjectID, bucketID) if err != nil { endpoint.log.Error("retrieving project bandwidth total", zap.Error(err)) } if exceeded { endpoint.log.Sugar().Errorf("monthly project limits are %s of storage and bandwidth usage. This limit has been exceeded for bandwidth for projectID %s.", limit, keyInfo.ProjectID, ) return nil, status.Errorf(codes.ResourceExhausted, "Exceeded Usage Limit") } path, err := CreatePath(ctx, keyInfo.ProjectID, req.Segment, req.Bucket, req.Path) if err != nil { return nil, status.Errorf(codes.InvalidArgument, err.Error()) } // TODO refactor to use []byte directly pointer, err := endpoint.metainfo.Get(ctx, path) if err != nil { if storage.ErrKeyNotFound.Has(err) { return nil, status.Errorf(codes.NotFound, err.Error()) } return nil, status.Errorf(codes.Internal, err.Error()) } if pointer.Type == pb.Pointer_INLINE { // TODO or maybe use pointer.SegmentSize ?? err := endpoint.orders.UpdateGetInlineOrder(ctx, keyInfo.ProjectID, req.Bucket, int64(len(pointer.InlineSegment))) if err != nil { return nil, status.Errorf(codes.Internal, err.Error()) } return &pb.SegmentDownloadResponseOld{Pointer: pointer}, nil } else if pointer.Type == pb.Pointer_REMOTE && pointer.Remote != nil { uplinkIdentity, err := identity.PeerIdentityFromContext(ctx) if err != nil { return nil, status.Errorf(codes.Internal, err.Error()) } limits, err := endpoint.orders.CreateGetOrderLimits(ctx, uplinkIdentity, bucketID, pointer) if err != nil { return nil, status.Errorf(codes.Internal, err.Error()) } return &pb.SegmentDownloadResponseOld{Pointer: pointer, AddressedLimits: limits}, nil } return &pb.SegmentDownloadResponseOld{}, nil } // DeleteSegmentOld deletes segment metadata from satellite and returns OrderLimit array to remove them from storage node func (endpoint *Endpoint) DeleteSegmentOld(ctx context.Context, req *pb.SegmentDeleteRequestOld) (resp *pb.SegmentDeleteResponseOld, err error) { defer mon.Task()(&ctx)(&err) keyInfo, err := endpoint.validateAuth(ctx, macaroon.Action{ Op: macaroon.ActionDelete, Bucket: req.Bucket, EncryptedPath: req.Path, Time: time.Now(), }) if err != nil { return nil, status.Errorf(codes.Unauthenticated, err.Error()) } err = endpoint.validateBucket(ctx, req.Bucket) if err != nil { return nil, status.Errorf(codes.InvalidArgument, err.Error()) } path, err := CreatePath(ctx, keyInfo.ProjectID, req.Segment, req.Bucket, req.Path) if err != nil { return nil, status.Errorf(codes.InvalidArgument, err.Error()) } // TODO refactor to use []byte directly pointer, err := endpoint.metainfo.Get(ctx, path) if err != nil { if storage.ErrKeyNotFound.Has(err) { return nil, status.Errorf(codes.NotFound, err.Error()) } return nil, status.Errorf(codes.Internal, err.Error()) } err = endpoint.metainfo.Delete(ctx, path) if err != nil { return nil, status.Errorf(codes.Internal, err.Error()) } if pointer.Type == pb.Pointer_REMOTE && pointer.Remote != nil { uplinkIdentity, err := identity.PeerIdentityFromContext(ctx) if err != nil { return nil, status.Errorf(codes.Internal, err.Error()) } for _, piece := range pointer.GetRemote().GetRemotePieces() { _, err := endpoint.containment.Delete(ctx, piece.NodeId) if err != nil { return nil, status.Errorf(codes.Internal, err.Error()) } } bucketID := createBucketID(keyInfo.ProjectID, req.Bucket) limits, err := endpoint.orders.CreateDeleteOrderLimits(ctx, uplinkIdentity, bucketID, pointer) if err != nil { return nil, status.Errorf(codes.Internal, err.Error()) } return &pb.SegmentDeleteResponseOld{AddressedLimits: limits}, nil } return &pb.SegmentDeleteResponseOld{}, nil } // ListSegmentsOld returns all Path keys in the Pointers bucket func (endpoint *Endpoint) ListSegmentsOld(ctx context.Context, req *pb.ListSegmentsRequestOld) (resp *pb.ListSegmentsResponseOld, err error) { defer mon.Task()(&ctx)(&err) keyInfo, err := endpoint.validateAuth(ctx, macaroon.Action{ Op: macaroon.ActionList, Bucket: req.Bucket, EncryptedPath: req.Prefix, Time: time.Now(), }) if err != nil { return nil, status.Errorf(codes.Unauthenticated, err.Error()) } prefix, err := CreatePath(ctx, keyInfo.ProjectID, -1, req.Bucket, req.Prefix) if err != nil { return nil, status.Errorf(codes.InvalidArgument, err.Error()) } items, more, err := endpoint.metainfo.List(ctx, prefix, string(req.StartAfter), string(req.EndBefore), req.Recursive, req.Limit, req.MetaFlags) if err != nil { return nil, status.Errorf(codes.Internal, "ListV2: %v", err) } segmentItems := make([]*pb.ListSegmentsResponseOld_Item, len(items)) for i, item := range items { segmentItems[i] = &pb.ListSegmentsResponseOld_Item{ Path: []byte(item.Path), Pointer: item.Pointer, IsPrefix: item.IsPrefix, } } return &pb.ListSegmentsResponseOld{Items: segmentItems, More: more}, nil } func createBucketID(projectID uuid.UUID, bucket []byte) []byte { entries := make([]string, 0) entries = append(entries, projectID.String()) entries = append(entries, string(bucket)) return []byte(storj.JoinPaths(entries...)) } func (endpoint *Endpoint) filterValidPieces(ctx context.Context, pointer *pb.Pointer, limits []*pb.OrderLimit) (err error) { defer mon.Task()(&ctx)(&err) if pointer.Type == pb.Pointer_REMOTE { var remotePieces []*pb.RemotePiece remote := pointer.Remote allSizesValid := true lastPieceSize := int64(0) for _, piece := range remote.RemotePieces { // TODO enable verification // err := auth.VerifyMsg(piece.Hash, piece.NodeId) // if err == nil { // // set to nil after verification to avoid storing in DB // piece.Hash = nil // remotePieces = append(remotePieces, piece) // } else { // // TODO satellite should send Delete request for piece that failed // s.logger.Warn("unable to verify piece hash: %v", zap.Error(err)) // } err = endpoint.validatePieceHash(ctx, piece, limits) if err != nil { // TODO maybe this should be logged also to uplink too endpoint.log.Sugar().Warn(err) continue } if piece.Hash.PieceSize <= 0 || (lastPieceSize > 0 && lastPieceSize != piece.Hash.PieceSize) { allSizesValid = false break } lastPieceSize = piece.Hash.PieceSize remotePieces = append(remotePieces, piece) } if allSizesValid { redundancy, err := eestream.NewRedundancyStrategyFromProto(pointer.GetRemote().GetRedundancy()) if err != nil { return Error.Wrap(err) } expectedPieceSize := eestream.CalcPieceSize(pointer.SegmentSize, redundancy) if expectedPieceSize != lastPieceSize { return Error.New("expected piece size is different from provided (%v != %v)", expectedPieceSize, lastPieceSize) } } else { return Error.New("all pieces needs to have the same size") } // we repair when the number of healthy files is less than or equal to the repair threshold // except for the case when the repair and success thresholds are the same (a case usually seen during testing) if int32(len(remotePieces)) <= remote.Redundancy.RepairThreshold && int32(len(remotePieces)) < remote.Redundancy.SuccessThreshold { return Error.New("Number of valid pieces (%d) is less than or equal to the repair threshold (%d)", len(remotePieces), remote.Redundancy.RepairThreshold, ) } remote.RemotePieces = remotePieces } return nil } // CreatePath will create a Segment path func CreatePath(ctx context.Context, projectID uuid.UUID, segmentIndex int64, bucket, path []byte) (_ storj.Path, err error) { defer mon.Task()(&ctx)(&err) if segmentIndex < -1 { return "", errors.New("invalid segment index") } segment := "l" if segmentIndex > -1 { segment = "s" + strconv.FormatInt(segmentIndex, 10) } entries := make([]string, 0) entries = append(entries, projectID.String()) entries = append(entries, segment) if len(bucket) != 0 { entries = append(entries, string(bucket)) } if len(path) != 0 { entries = append(entries, string(path)) } return storj.JoinPaths(entries...), nil } // SetAttributionOld tries to add attribution to the bucket. func (endpoint *Endpoint) SetAttributionOld(ctx context.Context, req *pb.SetAttributionRequestOld) (_ *pb.SetAttributionResponseOld, err error) { defer mon.Task()(&ctx)(&err) // try to add an attribution that doesn't exist partnerID, err := bytesToUUID(req.GetPartnerId()) if err != nil { return nil, Error.Wrap(err) } keyInfo, err := endpoint.validateAuth(ctx, macaroon.Action{ Op: macaroon.ActionList, Bucket: req.BucketName, EncryptedPath: []byte(""), Time: time.Now(), }) if err != nil { return nil, status.Errorf(codes.Unauthenticated, err.Error()) } // check if attribution is set for given bucket _, err = endpoint.partnerinfo.Get(ctx, keyInfo.ProjectID, req.GetBucketName()) if err == nil { endpoint.log.Sugar().Info("Bucket:", string(req.BucketName), " PartnerID:", partnerID.String(), "already attributed") return &pb.SetAttributionResponseOld{}, nil } if !attribution.ErrBucketNotAttributed.Has(err) { // try only to set the attribution, when it's missing return nil, Error.Wrap(err) } prefix, err := CreatePath(ctx, keyInfo.ProjectID, -1, req.BucketName, []byte("")) if err != nil { return nil, Error.Wrap(err) } items, _, err := endpoint.metainfo.List(ctx, prefix, "", "", true, 1, 0) if err != nil { return nil, Error.Wrap(err) } if len(items) > 0 { return nil, Error.New("Bucket(%q) , PartnerID(%s) cannot be attributed", req.BucketName, req.PartnerId) } _, err = endpoint.partnerinfo.Insert(ctx, &attribution.Info{ ProjectID: keyInfo.ProjectID, BucketName: req.GetBucketName(), PartnerID: partnerID, }) if err != nil { return nil, Error.Wrap(err) } return &pb.SetAttributionResponseOld{}, nil } // bytesToUUID is used to convert []byte to UUID func bytesToUUID(data []byte) (uuid.UUID, error) { var id uuid.UUID copy(id[:], data) if len(id) != len(data) { return uuid.UUID{}, errs.New("Invalid uuid") } return id, nil } // ProjectInfo returns allowed ProjectInfo for the provided API key func (endpoint *Endpoint) ProjectInfo(ctx context.Context, req *pb.ProjectInfoRequest) (_ *pb.ProjectInfoResponse, err error) { defer mon.Task()(&ctx)(&err) keyInfo, err := endpoint.validateAuth(ctx, macaroon.Action{ Op: macaroon.ActionProjectInfo, Time: time.Now(), }) if err != nil { return nil, status.Errorf(codes.Unauthenticated, err.Error()) } salt := sha256.Sum256(keyInfo.ProjectID[:]) return &pb.ProjectInfoResponse{ ProjectSalt: salt[:], }, nil } // CreateBucket creates a bucket func (endpoint *Endpoint) CreateBucket(ctx context.Context, req *pb.BucketCreateRequest) (_ *pb.BucketCreateResponse, err error) { defer mon.Task()(&ctx)(&err) // TODO: placeholder to implement pb.MetainfoServer interface. return &pb.BucketCreateResponse{}, err } // GetBucket gets a bucket func (endpoint *Endpoint) GetBucket(ctx context.Context, req *pb.BucketGetRequest) (_ *pb.BucketGetResponse, err error) { defer mon.Task()(&ctx)(&err) // TODO: placeholder to implement pb.MetainfoServer interface. return &pb.BucketGetResponse{}, err } // DeleteBucket deletes a bucket func (endpoint *Endpoint) DeleteBucket(ctx context.Context, req *pb.BucketDeleteRequest) (_ *pb.BucketDeleteResponse, err error) { defer mon.Task()(&ctx)(&err) // TODO: placeholder to implement pb.MetainfoServer interface. return &pb.BucketDeleteResponse{}, err } // ListBuckets returns a list of buckets func (endpoint *Endpoint) ListBuckets(ctx context.Context, req *pb.BucketListRequest) (_ *pb.BucketListResponse, err error) { defer mon.Task()(&ctx)(&err) // TODO: placeholder to implement pb.MetainfoServer interface. return &pb.BucketListResponse{}, err } // SetBucketAttribution returns a list of buckets func (endpoint *Endpoint) SetBucketAttribution(ctx context.Context, req *pb.BucketSetAttributionRequest) (_ *pb.BucketSetAttributionResponse, err error) { defer mon.Task()(&ctx)(&err) // TODO: placeholder to implement pb.MetainfoServer interface. return &pb.BucketSetAttributionResponse{}, err }