storj/uplink/metainfo/client.go

226 lines
7.4 KiB
Go
Raw Normal View History

// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package metainfo
import (
"context"
"time"
"github.com/golang/protobuf/ptypes"
2019-03-20 21:12:00 +00:00
"github.com/golang/protobuf/ptypes/timestamp"
"github.com/zeebo/errs"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
monkit "gopkg.in/spacemonkeygo/monkit.v2"
"storj.io/storj/pkg/auth/grpcauth"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/storj"
"storj.io/storj/pkg/transport"
"storj.io/storj/storage"
)
var (
mon = monkit.Package()
// Error is the errs class of standard metainfo errors
Error = errs.Class("metainfo error")
)
// Metainfo creates a grpcClient
type Metainfo struct {
client pb.MetainfoClient
}
// New used as a public function
func New(gcclient pb.MetainfoClient) (metainfo *Metainfo) {
return &Metainfo{client: gcclient}
}
// a compiler trick to make sure *Metainfo implements Client
var _ Client = (*Metainfo)(nil)
// ListItem is a single item in a listing
type ListItem struct {
Path storj.Path
Pointer *pb.Pointer
IsPrefix bool
}
// Client interface for the Metainfo service
type Client interface {
CreateSegment(ctx context.Context, bucket string, path storj.Path, segmentIndex int64, redundancy *pb.RedundancyScheme, maxEncryptedSegmentSize int64, expiration time.Time) ([]*pb.AddressedOrderLimit, storj.PieceID, error)
CommitSegment(ctx context.Context, bucket string, path storj.Path, segmentIndex int64, pointer *pb.Pointer, originalLimits []*pb.OrderLimit2) (*pb.Pointer, error)
SegmentInfo(ctx context.Context, bucket string, path storj.Path, segmentIndex int64) (*pb.Pointer, error)
ReadSegment(ctx context.Context, bucket string, path storj.Path, segmentIndex int64) (*pb.Pointer, []*pb.AddressedOrderLimit, error)
DeleteSegment(ctx context.Context, bucket string, path storj.Path, segmentIndex int64) ([]*pb.AddressedOrderLimit, error)
ListSegments(ctx context.Context, bucket string, prefix, startAfter, endBefore storj.Path, recursive bool, limit int32, metaFlags uint32) (items []ListItem, more bool, err error)
}
// NewClient initializes a new metainfo client
func NewClient(ctx context.Context, tc transport.Client, address string, APIKey string) (*Metainfo, error) {
apiKeyInjector := grpcauth.NewAPIKeyInjector(APIKey)
conn, err := tc.DialAddress(
ctx,
address,
grpc.WithUnaryInterceptor(apiKeyInjector),
)
if err != nil {
return nil, Error.Wrap(err)
}
return &Metainfo{client: pb.NewMetainfoClient(conn)}, nil
}
// CreateSegment requests the order limits for creating a new segment
func (metainfo *Metainfo) CreateSegment(ctx context.Context, bucket string, path storj.Path, segmentIndex int64, redundancy *pb.RedundancyScheme, maxEncryptedSegmentSize int64, expiration time.Time) (limits []*pb.AddressedOrderLimit, rootPieceID storj.PieceID, err error) {
defer mon.Task()(&ctx)(&err)
2019-03-20 21:12:00 +00:00
var exp *timestamp.Timestamp
if !expiration.IsZero() {
exp, err = ptypes.TimestampProto(expiration)
if err != nil {
return nil, rootPieceID, err
}
}
response, err := metainfo.client.CreateSegment(ctx, &pb.SegmentWriteRequest{
Bucket: []byte(bucket),
Path: []byte(path),
Segment: segmentIndex,
Redundancy: redundancy,
MaxEncryptedSegmentSize: maxEncryptedSegmentSize,
Expiration: exp,
})
if err != nil {
return nil, rootPieceID, Error.Wrap(err)
}
return response.GetAddressedLimits(), response.RootPieceId, nil
}
// CommitSegment requests to store the pointer for the segment
func (metainfo *Metainfo) CommitSegment(ctx context.Context, bucket string, path storj.Path, segmentIndex int64, pointer *pb.Pointer, originalLimits []*pb.OrderLimit2) (savedPointer *pb.Pointer, err error) {
defer mon.Task()(&ctx)(&err)
response, err := metainfo.client.CommitSegment(ctx, &pb.SegmentCommitRequest{
Bucket: []byte(bucket),
Path: []byte(path),
Segment: segmentIndex,
Pointer: pointer,
OriginalLimits: originalLimits,
})
if err != nil {
return nil, Error.Wrap(err)
}
return response.GetPointer(), nil
}
// SegmentInfo requests the pointer of a segment
func (metainfo *Metainfo) SegmentInfo(ctx context.Context, bucket string, path storj.Path, segmentIndex int64) (pointer *pb.Pointer, err error) {
defer mon.Task()(&ctx)(&err)
response, err := metainfo.client.SegmentInfo(ctx, &pb.SegmentInfoRequest{
Bucket: []byte(bucket),
Path: []byte(path),
Segment: segmentIndex,
})
if err != nil {
if status.Code(err) == codes.NotFound {
return nil, storage.ErrKeyNotFound.Wrap(err)
}
return nil, Error.Wrap(err)
}
return response.GetPointer(), nil
}
// ReadSegment requests the order limits for reading a segment
func (metainfo *Metainfo) ReadSegment(ctx context.Context, bucket string, path storj.Path, segmentIndex int64) (pointer *pb.Pointer, limits []*pb.AddressedOrderLimit, err error) {
defer mon.Task()(&ctx)(&err)
response, err := metainfo.client.DownloadSegment(ctx, &pb.SegmentDownloadRequest{
Bucket: []byte(bucket),
Path: []byte(path),
Segment: segmentIndex,
})
if err != nil {
if status.Code(err) == codes.NotFound {
return nil, nil, storage.ErrKeyNotFound.Wrap(err)
}
return nil, nil, Error.Wrap(err)
}
return response.GetPointer(), sortLimits(response.GetAddressedLimits(), response.GetPointer()), nil
}
// sortLimits sorts order limits and fill missing ones with nil values
func sortLimits(limits []*pb.AddressedOrderLimit, pointer *pb.Pointer) []*pb.AddressedOrderLimit {
sorted := make([]*pb.AddressedOrderLimit, pointer.GetRemote().GetRedundancy().GetTotal())
for _, piece := range pointer.GetRemote().GetRemotePieces() {
sorted[piece.GetPieceNum()] = getLimitByStorageNodeID(limits, piece.NodeId)
}
return sorted
}
func getLimitByStorageNodeID(limits []*pb.AddressedOrderLimit, storageNodeID storj.NodeID) *pb.AddressedOrderLimit {
for _, limit := range limits {
if limit.GetLimit().StorageNodeId == storageNodeID {
return limit
}
}
return nil
}
// DeleteSegment requests the order limits for deleting a segment
func (metainfo *Metainfo) DeleteSegment(ctx context.Context, bucket string, path storj.Path, segmentIndex int64) (limits []*pb.AddressedOrderLimit, err error) {
defer mon.Task()(&ctx)(&err)
response, err := metainfo.client.DeleteSegment(ctx, &pb.SegmentDeleteRequest{
Bucket: []byte(bucket),
Path: []byte(path),
Segment: segmentIndex,
})
if err != nil {
if status.Code(err) == codes.NotFound {
return nil, storage.ErrKeyNotFound.Wrap(err)
}
return nil, Error.Wrap(err)
}
return response.GetAddressedLimits(), nil
}
// ListSegments lists the available segments
func (metainfo *Metainfo) ListSegments(ctx context.Context, bucket string, prefix, startAfter, endBefore storj.Path, recursive bool, limit int32, metaFlags uint32) (items []ListItem, more bool, err error) {
defer mon.Task()(&ctx)(&err)
response, err := metainfo.client.ListSegments(ctx, &pb.ListSegmentsRequest{
Bucket: []byte(bucket),
Prefix: []byte(prefix),
StartAfter: []byte(startAfter),
EndBefore: []byte(endBefore),
Recursive: recursive,
Limit: limit,
MetaFlags: metaFlags,
})
if err != nil {
return nil, false, Error.Wrap(err)
}
list := response.GetItems()
items = make([]ListItem, len(list))
for i, item := range list {
items[i] = ListItem{
Path: storj.Path(item.GetPath()),
Pointer: item.GetPointer(),
IsPrefix: item.IsPrefix,
}
}
return items, response.GetMore(), nil
}