storj/pkg/storage/segments/store.go
JT Olio 9aee5efd99
pointerdb: separate client and server packages (#261)
* pointerdb: separate client and server packages

the reason for this is so that things the server needs (bolt, auth)
don't get imported if all you need is the client. will result in
smaller binaries and less flag definitions

* review comments
2018-08-22 09:07:00 -06:00

309 lines
8.1 KiB
Go

// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information.
package segments
import (
"context"
"io"
"time"
"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/timestamp"
"github.com/vivint/infectious"
"go.uber.org/zap"
monkit "gopkg.in/spacemonkeygo/monkit.v2"
"storj.io/storj/pkg/eestream"
"storj.io/storj/pkg/kademlia"
"storj.io/storj/pkg/overlay"
"storj.io/storj/pkg/paths"
"storj.io/storj/pkg/piecestore/rpc/client"
"storj.io/storj/pkg/pointerdb/pdbclient"
"storj.io/storj/pkg/ranger"
"storj.io/storj/pkg/storage/ec"
opb "storj.io/storj/protos/overlay"
ppb "storj.io/storj/protos/pointerdb"
)
var (
mon = monkit.Package()
)
// Meta info about a segment
type Meta struct {
Modified time.Time
Expiration time.Time
Size int64
Data []byte
}
// ListItem is a single item in a listing
type ListItem struct {
Path paths.Path
Meta Meta
}
// Store for segments
type Store interface {
Meta(ctx context.Context, path paths.Path) (meta Meta, err error)
Get(ctx context.Context, path paths.Path) (rr ranger.RangeCloser,
meta Meta, err error)
Put(ctx context.Context, path paths.Path, data io.Reader, metadata []byte,
expiration time.Time) (meta Meta, err error)
Delete(ctx context.Context, path paths.Path) (err error)
List(ctx context.Context, prefix, startAfter, endBefore paths.Path,
recursive bool, limit int, metaFlags uint32) (items []ListItem,
more bool, err error)
}
type segmentStore struct {
oc overlay.Client
ec ecclient.Client
pdb pdbclient.Client
rs eestream.RedundancyStrategy
thresholdSize int
}
// NewSegmentStore creates a new instance of segmentStore
func NewSegmentStore(oc overlay.Client, ec ecclient.Client,
pdb pdbclient.Client, rs eestream.RedundancyStrategy, t int) Store {
return &segmentStore{oc: oc, ec: ec, pdb: pdb, rs: rs, thresholdSize: t}
}
// Meta retrieves the metadata of the segment
func (s *segmentStore) Meta(ctx context.Context, path paths.Path) (meta Meta,
err error) {
defer mon.Task()(&ctx)(&err)
pr, err := s.pdb.Get(ctx, path)
if err != nil {
return Meta{}, Error.Wrap(err)
}
return convertMeta(pr), nil
}
// Put uploads a segment to an erasure code client
func (s *segmentStore) Put(ctx context.Context, path paths.Path, data io.Reader,
metadata []byte, expiration time.Time) (meta Meta, err error) {
defer mon.Task()(&ctx)(&err)
var p *ppb.Pointer
exp, err := ptypes.TimestampProto(expiration)
if err != nil {
return Meta{}, Error.Wrap(err)
}
peekReader := NewPeekThresholdReader(data)
remoteSized, err := peekReader.IsLargerThan(s.thresholdSize)
if err != nil {
return Meta{}, err
}
if !remoteSized {
p = &ppb.Pointer{
Type: ppb.Pointer_INLINE,
InlineSegment: peekReader.thresholdBuf,
Size: int64(len(peekReader.thresholdBuf)),
ExpirationDate: exp,
Metadata: metadata,
}
} else {
// uses overlay client to request a list of nodes
nodes, err := s.oc.Choose(ctx, s.rs.TotalCount(), 0)
if err != nil {
return Meta{}, Error.Wrap(err)
}
pieceID := client.NewPieceID()
sizedReader := SizeReader(peekReader)
// puts file to ecclient
err = s.ec.Put(ctx, nodes, s.rs, pieceID, sizedReader, expiration)
if err != nil {
return Meta{}, Error.Wrap(err)
}
p, err = s.makeRemotePointer(nodes, pieceID, sizedReader.Size(), exp, metadata)
if err != nil {
return Meta{}, err
}
}
// puts pointer to pointerDB
err = s.pdb.Put(ctx, path, p)
if err != nil {
return Meta{}, Error.Wrap(err)
}
// get the metadata for the newly uploaded segment
m, err := s.Meta(ctx, path)
if err != nil {
return Meta{}, Error.Wrap(err)
}
return m, nil
}
// makeRemotePointer creates a pointer of type remote
func (s *segmentStore) makeRemotePointer(nodes []*opb.Node, pieceID client.PieceID, readerSize int64,
exp *timestamp.Timestamp, metadata []byte) (pointer *ppb.Pointer, err error) {
var remotePieces []*ppb.RemotePiece
for i := range nodes {
remotePieces = append(remotePieces, &ppb.RemotePiece{
PieceNum: int32(i),
NodeId: nodes[i].Id,
})
}
pointer = &ppb.Pointer{
Type: ppb.Pointer_REMOTE,
Remote: &ppb.RemoteSegment{
Redundancy: &ppb.RedundancyScheme{
Type: ppb.RedundancyScheme_RS,
MinReq: int32(s.rs.RequiredCount()),
Total: int32(s.rs.TotalCount()),
RepairThreshold: int32(s.rs.Min),
SuccessThreshold: int32(s.rs.Opt),
ErasureShareSize: int32(s.rs.EncodedBlockSize()),
},
PieceId: string(pieceID),
RemotePieces: remotePieces,
},
Size: readerSize,
ExpirationDate: exp,
Metadata: metadata,
}
return pointer, nil
}
// Get retrieves a segment using erasure code, overlay, and pointerdb clients
func (s *segmentStore) Get(ctx context.Context, path paths.Path) (
rr ranger.RangeCloser, meta Meta, err error) {
defer mon.Task()(&ctx)(&err)
pr, err := s.pdb.Get(ctx, path)
if err != nil {
return nil, Meta{}, Error.Wrap(err)
}
if pr.GetType() == ppb.Pointer_REMOTE {
seg := pr.GetRemote()
pid := client.PieceID(seg.PieceId)
nodes, err := s.lookupNodes(ctx, seg)
if err != nil {
return nil, Meta{}, Error.Wrap(err)
}
es, err := makeErasureScheme(pr.GetRemote().GetRedundancy())
if err != nil {
return nil, Meta{}, err
}
rr, err = s.ec.Get(ctx, nodes, es, pid, pr.GetSize())
if err != nil {
return nil, Meta{}, Error.Wrap(err)
}
} else {
rr = ranger.ByteRangeCloser(pr.InlineSegment)
}
return rr, convertMeta(pr), nil
}
func makeErasureScheme(rs *ppb.RedundancyScheme) (eestream.ErasureScheme, error) {
fc, err := infectious.NewFEC(int(rs.GetMinReq()), int(rs.GetTotal()))
if err != nil {
return nil, Error.Wrap(err)
}
es := eestream.NewRSScheme(fc, int(rs.GetErasureShareSize()))
return es, nil
}
// Delete tells piece stores to delete a segment and deletes pointer from pointerdb
func (s *segmentStore) Delete(ctx context.Context, path paths.Path) (err error) {
defer mon.Task()(&ctx)(&err)
pr, err := s.pdb.Get(ctx, path)
if err != nil {
return Error.Wrap(err)
}
if pr.GetType() == ppb.Pointer_REMOTE {
seg := pr.GetRemote()
pid := client.PieceID(seg.PieceId)
nodes, err := s.lookupNodes(ctx, seg)
if err != nil {
return Error.Wrap(err)
}
// ecclient sends delete request
err = s.ec.Delete(ctx, nodes, pid)
if err != nil {
return Error.Wrap(err)
}
}
// deletes pointer from pointerdb
return s.pdb.Delete(ctx, path)
}
// lookupNodes calls Lookup to get node addresses from the overlay
func (s *segmentStore) lookupNodes(ctx context.Context, seg *ppb.RemoteSegment) (
nodes []*opb.Node, err error) {
nodes = make([]*opb.Node, len(seg.GetRemotePieces()))
for i, p := range seg.GetRemotePieces() {
node, err := s.oc.Lookup(ctx, kademlia.StringToNodeID(p.GetNodeId()))
if err != nil {
// TODO(kaloyan): better error handling: failing to lookup a few
// nodes should not fail the request
return nil, Error.Wrap(err)
}
nodes[i] = node
}
return nodes, nil
}
// List retrieves paths to segments and their metadata stored in the pointerdb
func (s *segmentStore) List(ctx context.Context, prefix, startAfter,
endBefore paths.Path, recursive bool, limit int, metaFlags uint32) (
items []ListItem, more bool, err error) {
defer mon.Task()(&ctx)(&err)
pdbItems, more, err := s.pdb.List(ctx, prefix, startAfter, endBefore,
recursive, limit, metaFlags)
if err != nil {
return nil, false, err
}
items = make([]ListItem, len(pdbItems))
for i, itm := range pdbItems {
items[i] = ListItem{
Path: itm.Path,
Meta: convertMeta(itm.Pointer),
}
}
return items, more, nil
}
// convertMeta converts pointer to segment metadata
func convertMeta(pr *ppb.Pointer) Meta {
return Meta{
Modified: convertTime(pr.GetCreationDate()),
Expiration: convertTime(pr.GetExpirationDate()),
Size: pr.GetSize(),
Data: pr.GetMetadata(),
}
}
// convertTime converts gRPC timestamp to Go time
func convertTime(ts *timestamp.Timestamp) time.Time {
if ts == nil {
return time.Time{}
}
t, err := ptypes.Timestamp(ts)
if err != nil {
zap.S().Warnf("Failed converting timestamp %v: %v", ts, err)
}
return t
}