Metainfo RPC objects methods (#2534)

This commit is contained in:
Michal Niewrzal 2019-07-16 12:39:23 +02:00 committed by GitHub
parent b51d3a69da
commit 260d9c49a8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 2527 additions and 120 deletions

View File

@ -94,10 +94,8 @@ func SerialNumber() storj.SerialNumber {
}
// StreamID creates a random stream ID
func StreamID() storj.StreamID {
var streamID storj.StreamID
Read(streamID[:])
return streamID
func StreamID(size int) storj.StreamID {
return storj.StreamID(BytesInt(size))
}
// UUID creates a random uuid.

View File

@ -88,3 +88,14 @@ func EncodeVoucher(ctx context.Context, voucher *pb.Voucher) (_ []byte, err erro
voucher.SatelliteSignature = signature
return out, err
}
// EncodeStreamID encodes stream ID into bytes for signing.
func EncodeStreamID(ctx context.Context, streamID *pb.SatStreamID) (_ []byte, err error) {
defer mon.Task()(&ctx)(&err)
signature := streamID.SatelliteSignature
// TODO verify if that can cause race
streamID.SatelliteSignature = nil
out, err := proto.Marshal(streamID)
streamID.SatelliteSignature = signature
return out, err
}

View File

@ -109,3 +109,21 @@ func SignVoucher(ctx context.Context, signer Signer, unsigned *pb.Voucher) (_ *p
return &signed, nil
}
// SignStreamID signs the stream ID using the specified signer
// Signer is a satellite
func SignStreamID(ctx context.Context, signer Signer, unsigned *pb.SatStreamID) (_ *pb.SatStreamID, err error) {
defer mon.Task()(&ctx)(&err)
bytes, err := EncodeStreamID(ctx, unsigned)
if err != nil {
return nil, Error.Wrap(err)
}
signed := *unsigned
signed.SatelliteSignature, err = signer.HashAndSign(ctx, bytes)
if err != nil {
return nil, Error.Wrap(err)
}
return &signed, nil
}

View File

@ -82,3 +82,14 @@ func VerifyVoucher(ctx context.Context, satellite Signee, signed *pb.Voucher) (e
return satellite.HashAndVerifySignature(ctx, bytes, signed.SatelliteSignature)
}
// VerifyStreamID verifies that the signature inside stream ID belongs to the satellite
func VerifyStreamID(ctx context.Context, satellite Signee, signed *pb.SatStreamID) (err error) {
defer mon.Task()(&ctx)(&err)
bytes, err := EncodeStreamID(ctx, signed)
if err != nil {
return Error.Wrap(err)
}
return satellite.HashAndVerifySignature(ctx, bytes, signed.SatelliteSignature)
}

File diff suppressed because it is too large Load Diff

View File

@ -15,11 +15,18 @@ import "orders.proto";
// Metainfo it's a satellite RPC service
service Metainfo {
// Bucket
rpc CreateBucket(BucketCreateRequest) returns (BucketCreateResponse);
rpc GetBucket(BucketGetRequest) returns (BucketGetResponse);
rpc DeleteBucket(BucketDeleteRequest) returns (BucketDeleteResponse);
rpc ListBuckets(BucketListRequest) returns (BucketListResponse);
rpc SetBucketAttribution(BucketSetAttributionRequest) returns (BucketSetAttributionResponse);
// Object
rpc BeginObject(ObjectBeginRequest) returns (ObjectBeginResponse);
rpc CommitObject(ObjectCommitRequest) returns (ObjectCommitResponse);
rpc ListObjects(ObjectListRequest) returns (ObjectListResponse);
rpc BeginDeleteObject(ObjectBeginDeleteRequest) returns (ObjectBeginDeleteResponse);
rpc FinishDeleteObject(ObjectFinishDeleteRequest) returns (ObjectFinishDeleteResponse);
rpc CreateSegmentOld(SegmentWriteRequestOld) returns (SegmentWriteResponseOld);
rpc CommitSegmentOld(SegmentCommitRequestOld) returns (SegmentCommitResponseOld);
@ -196,3 +203,129 @@ message ProjectInfoRequest {
message ProjectInfoResponse {
bytes project_salt = 1;
}
message Object {
enum Status {
INVALID = 0;
UPLOADING = 1;
COMMITTING = 2;
COMMITTED = 3;
DELETING = 4;
}
bytes bucket = 1;
bytes encrypted_path = 2;
int32 version = 3;
Status status = 4;
bytes stream_id = 5 [(gogoproto.customtype) = "StreamID", (gogoproto.nullable) = false];
google.protobuf.Timestamp created_at = 6 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
google.protobuf.Timestamp status_at = 7 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
google.protobuf.Timestamp expires_at = 8 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
bytes encrypted_metadata_nonce = 9 [(gogoproto.customtype) = "Nonce", (gogoproto.nullable) = false];
bytes encrypted_metadata = 10;
int64 fixed_segment_size = 11;
pointerdb.RedundancyScheme redundancy_scheme = 12;
encryption.EncryptionParameters encryption_parameters = 13;
int64 total_size = 14; // total size of object
int64 inline_size = 15; // size of inline part of object
int64 remote_size = 16; // size of remote part of object
}
message ObjectBeginRequest {
bytes bucket = 1;
bytes encrypted_path = 2;
int32 version = 3;
google.protobuf.Timestamp expires_at = 4 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
bytes encrypted_metadata_nonce = 5 [(gogoproto.customtype) = "Nonce", (gogoproto.nullable) = false];
bytes encrypted_metadata = 6; // TODO: set maximum size limit
pointerdb.RedundancyScheme redundancy_scheme = 7; // can be zero
encryption.EncryptionParameters encryption_parameters = 8; // can be zero
}
message ObjectBeginResponse {
bytes bucket = 1;
bytes encrypted_path = 2;
int32 version = 3;
bytes stream_id = 4 [(gogoproto.customtype) = "StreamID", (gogoproto.nullable) = false];
pointerdb.RedundancyScheme redundancy_scheme = 5;
encryption.EncryptionParameters encryption_parameters = 6;
}
message ObjectCommitRequest {
bytes stream_id = 1 [(gogoproto.customtype) = "StreamID", (gogoproto.nullable) = false];
}
message ObjectCommitResponse {
}
message ObjectListRequest {
bytes bucket = 1;
bytes encrypted_prefix = 2;
bytes encrypted_cursor = 3;
int32 limit = 4;
ObjectListItemIncludes object_includes = 5;
}
message ObjectListResponse {
repeated ObjectListItem items = 1;
bool more = 2;
}
message ObjectListItem {
bytes encrypted_path = 1;
int32 version = 2;
Object.Status status = 3;
google.protobuf.Timestamp created_at = 4 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
google.protobuf.Timestamp status_at = 5 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
google.protobuf.Timestamp expires_at = 6 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
bytes encrypted_metadata_nonce = 7 [(gogoproto.customtype) = "Nonce", (gogoproto.nullable) = false];
bytes encrypted_metadata = 8;
}
message ObjectListItemIncludes {
bool metadata = 1;
}
message ObjectBeginDeleteRequest {
bytes bucket = 1;
bytes encrypted_path = 2;
int32 version = 3;
}
message ObjectBeginDeleteResponse {
bytes stream_id = 1 [(gogoproto.customtype) = "StreamID", (gogoproto.nullable) = false];
}
message ObjectFinishDeleteRequest {
bytes stream_id = 1 [(gogoproto.customtype) = "StreamID", (gogoproto.nullable) = false];
}
message ObjectFinishDeleteResponse {
}
// only for satellite use
message SatStreamID {
bytes bucket = 1;
bytes encrypted_path = 2;
int32 version = 3;
pointerdb.RedundancyScheme redundancy = 4;
google.protobuf.Timestamp creation_date = 5 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
google.protobuf.Timestamp expiration_date = 6 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
bytes satellite_signature = 7;
}

View File

@ -28,3 +28,6 @@ type SerialNumber = storj.SerialNumber
// StreamID is an alias to storj.StreamID for use in generated protobuf code
type StreamID = storj.StreamID
// Nonce is an alias to storj.Nonce for use in generated protobuf code
type Nonce = storj.Nonce

View File

@ -3,6 +3,12 @@
package storj
import (
"encoding/base32"
"github.com/zeebo/errs"
)
// EncryptionParameters is the cipher suite and parameters used for encryption
type EncryptionParameters struct {
// CipherSuite specifies the cipher suite to be used for encryption.
@ -70,13 +76,85 @@ func (key *Key) IsZero() bool {
return key == nil || *key == (Key{})
}
// ErrNonce is used when something goes wrong with a stream ID
var ErrNonce = errs.Class("nonce error")
// nonceEncoding is base32 without padding
var nonceEncoding = base32.StdEncoding.WithPadding(base32.NoPadding)
// Nonce represents the largest nonce used by any encryption protocol
type Nonce [NonceSize]byte
// NonceFromString decodes an base32 encoded
func NonceFromString(s string) (Nonce, error) {
nonceBytes, err := nonceEncoding.DecodeString(s)
if err != nil {
return Nonce{}, ErrNonce.Wrap(err)
}
return NonceFromBytes(nonceBytes)
}
// NonceFromBytes converts a byte slice into a nonce
func NonceFromBytes(b []byte) (Nonce, error) {
if len(b) != len(Nonce{}) {
return Nonce{}, ErrNonce.New("not enough bytes to make a nonce; have %d, need %d", len(b), len(NodeID{}))
}
var nonce Nonce
copy(nonce[:], b)
return nonce, nil
}
// IsZero returns whether nonce is unassigned
func (nonce Nonce) IsZero() bool {
return nonce == Nonce{}
}
// String representation of the nonce
func (nonce Nonce) String() string { return nonceEncoding.EncodeToString(nonce.Bytes()) }
// Bytes returns bytes of the nonce
func (nonce Nonce) Bytes() []byte { return nonce[:] }
// Raw returns the nonce as a raw byte array pointer
func (nonce *Nonce) Raw() *[NonceSize]byte {
return (*[NonceSize]byte)(nonce)
}
// Marshal serializes a nonce
func (nonce Nonce) Marshal() ([]byte, error) {
return nonce.Bytes(), nil
}
// MarshalTo serializes a nonce into the passed byte slice
func (nonce *Nonce) MarshalTo(data []byte) (n int, err error) {
n = copy(data, nonce.Bytes())
return n, nil
}
// Unmarshal deserializes a nonce
func (nonce *Nonce) Unmarshal(data []byte) error {
var err error
*nonce, err = NonceFromBytes(data)
return err
}
// Size returns the length of a nonce (implements gogo's custom type interface)
func (nonce Nonce) Size() int {
return len(nonce)
}
// MarshalJSON serializes a nonce to a json string as bytes
func (nonce Nonce) MarshalJSON() ([]byte, error) {
return []byte(`"` + nonce.String() + `"`), nil
}
// UnmarshalJSON deserializes a json string (as bytes) to a nonce
func (nonce *Nonce) UnmarshalJSON(data []byte) error {
var err error
*nonce, err = NonceFromString(string(data))
return err
}
// EncryptedPrivateKey is a private key that has been encrypted
type EncryptedPrivateKey []byte

View File

@ -0,0 +1,20 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package storj
import (
"time"
)
// ObjectListItem represents listed object
type ObjectListItem struct {
EncryptedPath []byte
Version int32
Status int32
CreatedAt time.Time
StatusAt time.Time
ExpiresAt time.Time
EncryptedMetadataNonce Nonce
EncryptedMetadata []byte
}

View File

@ -17,7 +17,7 @@ var ErrStreamID = errs.Class("stream ID error")
var streamIDEncoding = base32.StdEncoding.WithPadding(base32.NoPadding)
// StreamID is the unique identifier for stream related to object
type StreamID [16]byte
type StreamID []byte
// StreamIDFromString decodes an base32 encoded
func StreamIDFromString(s string) (StreamID, error) {
@ -30,18 +30,14 @@ func StreamIDFromString(s string) (StreamID, error) {
// StreamIDFromBytes converts a byte slice into a stream ID
func StreamIDFromBytes(b []byte) (StreamID, error) {
if len(b) != len(StreamID{}) {
return StreamID{}, ErrStreamID.New("not enough bytes to make a stream ID; have %d, need %d", len(b), len(NodeID{}))
}
var id StreamID
copy(id[:], b)
id := make([]byte, len(b))
copy(id, b)
return id, nil
}
// IsZero returns whether stream ID is unassigned
func (id StreamID) IsZero() bool {
return id == StreamID{}
return len(id) == 0
}
// String representation of the stream ID
@ -69,7 +65,7 @@ func (id *StreamID) Unmarshal(data []byte) error {
}
// Size returns the length of a stream ID (implements gogo's custom type interface)
func (id *StreamID) Size() int {
func (id StreamID) Size() int {
return len(id)
}

View File

@ -6,28 +6,28 @@ package storj_test
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"storj.io/storj/internal/testrand"
"storj.io/storj/pkg/storj"
)
func TestStreamID_Encode(t *testing.T) {
_, err := storj.StreamIDFromString("likn43kilfzd")
assert.Error(t, err)
_, err = storj.StreamIDFromBytes([]byte{1, 2, 3, 4, 5})
assert.Error(t, err)
for i := 0; i < 10; i++ {
streamID := testrand.StreamID()
expectedSize := testrand.Intn(255)
streamID := testrand.StreamID(expectedSize)
fromString, err := storj.StreamIDFromString(streamID.String())
assert.NoError(t, err)
fromBytes, err := storj.StreamIDFromBytes(streamID.Bytes())
assert.NoError(t, err)
require.NoError(t, err)
require.Equal(t, streamID.String(), fromString.String())
assert.Equal(t, streamID, fromString)
assert.Equal(t, streamID, fromBytes)
fromBytes, err := storj.StreamIDFromBytes(streamID.Bytes())
require.NoError(t, err)
require.Equal(t, streamID.Bytes(), fromBytes.Bytes())
require.Equal(t, streamID, fromString)
require.Equal(t, expectedSize, fromString.Size())
require.Equal(t, streamID, fromBytes)
require.Equal(t, expectedSize, fromBytes.Size())
}
}

View File

@ -1569,6 +1569,32 @@
{
"protopath": "pkg:/:pb:/:metainfo.proto",
"def": {
"enums": [
{
"name": "Object.Status",
"enum_fields": [
{
"name": "INVALID"
},
{
"name": "UPLOADING",
"integer": 1
},
{
"name": "COMMITTING",
"integer": 2
},
{
"name": "COMMITTED",
"integer": 3
},
{
"name": "DELETING",
"integer": 4
}
]
}
],
"messages": [
{
"name": "Bucket",
@ -2144,6 +2170,538 @@
"type": "bytes"
}
]
},
{
"name": "Object",
"fields": [
{
"id": 1,
"name": "bucket",
"type": "bytes"
},
{
"id": 2,
"name": "encrypted_path",
"type": "bytes"
},
{
"id": 3,
"name": "version",
"type": "int32"
},
{
"id": 4,
"name": "status",
"type": "Status"
},
{
"id": 5,
"name": "stream_id",
"type": "bytes",
"options": [
{
"name": "(gogoproto.customtype)",
"value": "StreamID"
},
{
"name": "(gogoproto.nullable)",
"value": "false"
}
]
},
{
"id": 6,
"name": "created_at",
"type": "google.protobuf.Timestamp",
"options": [
{
"name": "(gogoproto.stdtime)",
"value": "true"
},
{
"name": "(gogoproto.nullable)",
"value": "false"
}
]
},
{
"id": 7,
"name": "status_at",
"type": "google.protobuf.Timestamp",
"options": [
{
"name": "(gogoproto.stdtime)",
"value": "true"
},
{
"name": "(gogoproto.nullable)",
"value": "false"
}
]
},
{
"id": 8,
"name": "expires_at",
"type": "google.protobuf.Timestamp",
"options": [
{
"name": "(gogoproto.stdtime)",
"value": "true"
},
{
"name": "(gogoproto.nullable)",
"value": "false"
}
]
},
{
"id": 9,
"name": "encrypted_metadata_nonce",
"type": "bytes",
"options": [
{
"name": "(gogoproto.customtype)",
"value": "Nonce"
},
{
"name": "(gogoproto.nullable)",
"value": "false"
}
]
},
{
"id": 10,
"name": "encrypted_metadata",
"type": "bytes"
},
{
"id": 11,
"name": "fixed_segment_size",
"type": "int64"
},
{
"id": 12,
"name": "redundancy_scheme",
"type": "pointerdb.RedundancyScheme"
},
{
"id": 13,
"name": "encryption_parameters",
"type": "encryption.EncryptionParameters"
},
{
"id": 14,
"name": "total_size",
"type": "int64"
},
{
"id": 15,
"name": "inline_size",
"type": "int64"
},
{
"id": 16,
"name": "remote_size",
"type": "int64"
}
]
},
{
"name": "ObjectBeginRequest",
"fields": [
{
"id": 1,
"name": "bucket",
"type": "bytes"
},
{
"id": 2,
"name": "encrypted_path",
"type": "bytes"
},
{
"id": 3,
"name": "version",
"type": "int32"
},
{
"id": 4,
"name": "expires_at",
"type": "google.protobuf.Timestamp",
"options": [
{
"name": "(gogoproto.stdtime)",
"value": "true"
},
{
"name": "(gogoproto.nullable)",
"value": "false"
}
]
},
{
"id": 5,
"name": "encrypted_metadata_nonce",
"type": "bytes",
"options": [
{
"name": "(gogoproto.customtype)",
"value": "Nonce"
},
{
"name": "(gogoproto.nullable)",
"value": "false"
}
]
},
{
"id": 6,
"name": "encrypted_metadata",
"type": "bytes"
},
{
"id": 7,
"name": "redundancy_scheme",
"type": "pointerdb.RedundancyScheme"
},
{
"id": 8,
"name": "encryption_parameters",
"type": "encryption.EncryptionParameters"
}
]
},
{
"name": "ObjectBeginResponse",
"fields": [
{
"id": 1,
"name": "bucket",
"type": "bytes"
},
{
"id": 2,
"name": "encrypted_path",
"type": "bytes"
},
{
"id": 3,
"name": "version",
"type": "int32"
},
{
"id": 4,
"name": "stream_id",
"type": "bytes",
"options": [
{
"name": "(gogoproto.customtype)",
"value": "StreamID"
},
{
"name": "(gogoproto.nullable)",
"value": "false"
}
]
},
{
"id": 5,
"name": "redundancy_scheme",
"type": "pointerdb.RedundancyScheme"
},
{
"id": 6,
"name": "encryption_parameters",
"type": "encryption.EncryptionParameters"
}
]
},
{
"name": "ObjectCommitRequest",
"fields": [
{
"id": 1,
"name": "stream_id",
"type": "bytes",
"options": [
{
"name": "(gogoproto.customtype)",
"value": "StreamID"
},
{
"name": "(gogoproto.nullable)",
"value": "false"
}
]
}
]
},
{
"name": "ObjectCommitResponse"
},
{
"name": "ObjectListRequest",
"fields": [
{
"id": 1,
"name": "bucket",
"type": "bytes"
},
{
"id": 2,
"name": "encrypted_prefix",
"type": "bytes"
},
{
"id": 3,
"name": "encrypted_cursor",
"type": "bytes"
},
{
"id": 4,
"name": "limit",
"type": "int32"
},
{
"id": 5,
"name": "object_includes",
"type": "ObjectListItemIncludes"
}
]
},
{
"name": "ObjectListResponse",
"fields": [
{
"id": 1,
"name": "items",
"type": "ObjectListItem",
"is_repeated": true
},
{
"id": 2,
"name": "more",
"type": "bool"
}
]
},
{
"name": "ObjectListItem",
"fields": [
{
"id": 1,
"name": "encrypted_path",
"type": "bytes"
},
{
"id": 2,
"name": "version",
"type": "int32"
},
{
"id": 3,
"name": "status",
"type": "Object.Status"
},
{
"id": 4,
"name": "created_at",
"type": "google.protobuf.Timestamp",
"options": [
{
"name": "(gogoproto.stdtime)",
"value": "true"
},
{
"name": "(gogoproto.nullable)",
"value": "false"
}
]
},
{
"id": 5,
"name": "status_at",
"type": "google.protobuf.Timestamp",
"options": [
{
"name": "(gogoproto.stdtime)",
"value": "true"
},
{
"name": "(gogoproto.nullable)",
"value": "false"
}
]
},
{
"id": 6,
"name": "expires_at",
"type": "google.protobuf.Timestamp",
"options": [
{
"name": "(gogoproto.stdtime)",
"value": "true"
},
{
"name": "(gogoproto.nullable)",
"value": "false"
}
]
},
{
"id": 7,
"name": "encrypted_metadata_nonce",
"type": "bytes",
"options": [
{
"name": "(gogoproto.customtype)",
"value": "Nonce"
},
{
"name": "(gogoproto.nullable)",
"value": "false"
}
]
},
{
"id": 8,
"name": "encrypted_metadata",
"type": "bytes"
}
]
},
{
"name": "ObjectListItemIncludes",
"fields": [
{
"id": 1,
"name": "metadata",
"type": "bool"
}
]
},
{
"name": "ObjectBeginDeleteRequest",
"fields": [
{
"id": 1,
"name": "bucket",
"type": "bytes"
},
{
"id": 2,
"name": "encrypted_path",
"type": "bytes"
},
{
"id": 3,
"name": "version",
"type": "int32"
}
]
},
{
"name": "ObjectBeginDeleteResponse",
"fields": [
{
"id": 1,
"name": "stream_id",
"type": "bytes",
"options": [
{
"name": "(gogoproto.customtype)",
"value": "StreamID"
},
{
"name": "(gogoproto.nullable)",
"value": "false"
}
]
}
]
},
{
"name": "ObjectFinishDeleteRequest",
"fields": [
{
"id": 1,
"name": "stream_id",
"type": "bytes",
"options": [
{
"name": "(gogoproto.customtype)",
"value": "StreamID"
},
{
"name": "(gogoproto.nullable)",
"value": "false"
}
]
}
]
},
{
"name": "ObjectFinishDeleteResponse"
},
{
"name": "SatStreamID",
"fields": [
{
"id": 1,
"name": "bucket",
"type": "bytes"
},
{
"id": 2,
"name": "encrypted_path",
"type": "bytes"
},
{
"id": 3,
"name": "version",
"type": "int32"
},
{
"id": 4,
"name": "redundancy",
"type": "pointerdb.RedundancyScheme"
},
{
"id": 5,
"name": "creation_date",
"type": "google.protobuf.Timestamp",
"options": [
{
"name": "(gogoproto.stdtime)",
"value": "true"
},
{
"name": "(gogoproto.nullable)",
"value": "false"
}
]
},
{
"id": 6,
"name": "expiration_date",
"type": "google.protobuf.Timestamp",
"options": [
{
"name": "(gogoproto.stdtime)",
"value": "true"
},
{
"name": "(gogoproto.nullable)",
"value": "false"
}
]
},
{
"id": 7,
"name": "satellite_signature",
"type": "bytes"
}
]
}
],
"services": [
@ -2175,6 +2733,31 @@
"in_type": "BucketSetAttributionRequest",
"out_type": "BucketSetAttributionResponse"
},
{
"name": "BeginObject",
"in_type": "ObjectBeginRequest",
"out_type": "ObjectBeginResponse"
},
{
"name": "CommitObject",
"in_type": "ObjectCommitRequest",
"out_type": "ObjectCommitResponse"
},
{
"name": "ListObjects",
"in_type": "ObjectListRequest",
"out_type": "ObjectListResponse"
},
{
"name": "BeginDeleteObject",
"in_type": "ObjectBeginDeleteRequest",
"out_type": "ObjectBeginDeleteResponse"
},
{
"name": "FinishDeleteObject",
"in_type": "ObjectFinishDeleteRequest",
"out_type": "ObjectFinishDeleteResponse"
},
{
"name": "CreateSegmentOld",
"in_type": "SegmentWriteRequestOld",

View File

@ -10,6 +10,7 @@ import (
"strconv"
"time"
"github.com/gogo/protobuf/proto"
"github.com/skyrings/skyring-common/tools/uuid"
"github.com/zeebo/errs"
"go.uber.org/zap"
@ -19,10 +20,12 @@ import (
"storj.io/storj/pkg/accounting"
"storj.io/storj/pkg/auth"
"storj.io/storj/pkg/auth/signing"
"storj.io/storj/pkg/eestream"
"storj.io/storj/pkg/macaroon"
"storj.io/storj/pkg/overlay"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/storage/meta"
"storj.io/storj/pkg/storj"
"storj.io/storj/satellite/attribution"
"storj.io/storj/satellite/console"
@ -30,7 +33,10 @@ import (
"storj.io/storj/storage"
)
const pieceHashExpiration = 2 * time.Hour
const (
pieceHashExpiration = 2 * time.Hour
satIDExpiration = 24 * time.Hour
)
var (
mon = monkit.Package()
@ -65,11 +71,12 @@ type Endpoint struct {
apiKeys APIKeys
createRequests *createRequests
rsConfig RSConfig
satellite signing.Signer
}
// NewEndpoint creates new metainfo endpoint instance
func NewEndpoint(log *zap.Logger, metainfo *Service, orders *orders.Service, cache *overlay.Cache, partnerinfo attribution.DB,
containment Containment, apiKeys APIKeys, projectUsage *accounting.ProjectUsage, rsConfig RSConfig) *Endpoint {
containment Containment, apiKeys APIKeys, projectUsage *accounting.ProjectUsage, rsConfig RSConfig, satellite signing.Signer) *Endpoint {
// TODO do something with too many params
return &Endpoint{
log: log,
@ -82,6 +89,7 @@ func NewEndpoint(log *zap.Logger, metainfo *Service, orders *orders.Service, cac
projectUsage: projectUsage,
createRequests: newCreateRequests(),
rsConfig: rsConfig,
satellite: satellite,
}
}
@ -847,3 +855,248 @@ func convertBucketToProto(ctx context.Context, bucket storj.Bucket) (pbBucket *p
},
}
}
// BeginObject begins object
func (endpoint *Endpoint) BeginObject(ctx context.Context, req *pb.ObjectBeginRequest) (resp *pb.ObjectBeginResponse, err error) {
defer mon.Task()(&ctx)(&err)
keyInfo, err := endpoint.validateAuth(ctx, macaroon.Action{
Op: macaroon.ActionWrite,
Bucket: req.Bucket,
EncryptedPath: req.EncryptedPath,
Time: time.Now(),
})
if err != nil {
return nil, status.Errorf(codes.Unauthenticated, err.Error())
}
err = endpoint.validateBucket(ctx, req.Bucket)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, err.Error())
}
bucket, err := endpoint.metainfo.GetBucket(ctx, req.Bucket, keyInfo.ProjectID)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, err.Error())
}
// take bucket RS values if not set in request
pbRS := req.RedundancyScheme
if pbRS.ErasureShareSize == 0 {
pbRS.ErasureShareSize = bucket.DefaultRedundancyScheme.ShareSize
}
if pbRS.MinReq == 0 {
pbRS.MinReq = int32(bucket.DefaultRedundancyScheme.RequiredShares)
}
if pbRS.RepairThreshold == 0 {
pbRS.RepairThreshold = int32(bucket.DefaultRedundancyScheme.RepairShares)
}
if pbRS.SuccessThreshold == 0 {
pbRS.SuccessThreshold = int32(bucket.DefaultRedundancyScheme.OptimalShares)
}
if pbRS.Total == 0 {
pbRS.Total = int32(bucket.DefaultRedundancyScheme.TotalShares)
}
pbEP := req.EncryptionParameters
if pbEP.CipherSuite == 0 {
pbEP.CipherSuite = pb.CipherSuite(bucket.DefaultEncryptionParameters.CipherSuite)
}
if pbEP.BlockSize == 0 {
pbEP.BlockSize = int64(bucket.DefaultEncryptionParameters.BlockSize)
}
satStreamID := &pb.SatStreamID{
Bucket: req.Bucket,
EncryptedPath: req.EncryptedPath,
Version: req.Version,
Redundancy: pbRS,
CreationDate: time.Now(),
ExpirationDate: req.ExpiresAt,
}
satStreamID, err = signing.SignStreamID(ctx, endpoint.satellite, satStreamID)
if err != nil {
return nil, status.Errorf(codes.Internal, err.Error())
}
encodedStreamID, err := proto.Marshal(satStreamID)
if err != nil {
return nil, status.Errorf(codes.Internal, err.Error())
}
streamID, err := storj.StreamIDFromBytes(encodedStreamID)
if err != nil {
return nil, status.Errorf(codes.Internal, err.Error())
}
return &pb.ObjectBeginResponse{
Bucket: req.Bucket,
EncryptedPath: req.EncryptedPath,
Version: req.Version,
StreamId: streamID,
RedundancyScheme: pbRS,
EncryptionParameters: pbEP,
}, nil
}
// CommitObject commits object when all segments are also committed
func (endpoint *Endpoint) CommitObject(ctx context.Context, req *pb.ObjectCommitRequest) (resp *pb.ObjectCommitResponse, err error) {
defer mon.Task()(&ctx)(&err)
streamID := &pb.SatStreamID{}
err = proto.Unmarshal(req.StreamId, streamID)
if err != nil {
return nil, status.Errorf(codes.Internal, err.Error())
}
err = signing.VerifyStreamID(ctx, endpoint.satellite, streamID)
if err != nil {
return nil, status.Errorf(codes.Unauthenticated, err.Error())
}
if streamID.CreationDate.Before(time.Now().Add(-satIDExpiration)) {
return nil, status.Errorf(codes.InvalidArgument, "stream ID expired")
}
_, err = endpoint.validateAuth(ctx, macaroon.Action{
Op: macaroon.ActionWrite,
Bucket: streamID.Bucket,
EncryptedPath: streamID.EncryptedPath,
Time: time.Now(),
})
if err != nil {
return nil, status.Errorf(codes.Unauthenticated, err.Error())
}
// we don't need to do anything for shim implementation
return &pb.ObjectCommitResponse{}, nil
}
// ListObjects list objects according to specific parameters
func (endpoint *Endpoint) ListObjects(ctx context.Context, req *pb.ObjectListRequest) (resp *pb.ObjectListResponse, err error) {
defer mon.Task()(&ctx)(&err)
keyInfo, err := endpoint.validateAuth(ctx, macaroon.Action{
Op: macaroon.ActionList,
Bucket: req.Bucket,
EncryptedPath: []byte{},
Time: time.Now(),
})
if err != nil {
return nil, status.Errorf(codes.Unauthenticated, err.Error())
}
err = endpoint.validateBucket(ctx, req.Bucket)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, err.Error())
}
prefix, err := CreatePath(ctx, keyInfo.ProjectID, -1, req.Bucket, req.EncryptedPrefix)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, err.Error())
}
metaflags := meta.All
// TODO use flags
// TODO find out how EncryptedCursor -> startAfter/endAfter
segments, more, err := endpoint.metainfo.List(ctx, prefix, "", "", false, req.Limit, metaflags)
if err != nil {
return nil, status.Errorf(codes.Internal, err.Error())
}
items := make([]*pb.ObjectListItem, len(segments))
for i, segment := range segments {
items[i] = &pb.ObjectListItem{
EncryptedPath: []byte(segment.Path),
CreatedAt: segment.Pointer.CreationDate,
ExpiresAt: segment.Pointer.ExpirationDate,
}
}
return &pb.ObjectListResponse{
Items: items,
More: more,
}, nil
}
// BeginDeleteObject begins object deletion process
func (endpoint *Endpoint) BeginDeleteObject(ctx context.Context, req *pb.ObjectBeginDeleteRequest) (resp *pb.ObjectBeginDeleteResponse, err error) {
defer mon.Task()(&ctx)(&err)
_, err = endpoint.validateAuth(ctx, macaroon.Action{
Op: macaroon.ActionDelete,
Bucket: req.Bucket,
EncryptedPath: req.EncryptedPath,
Time: time.Now(),
})
if err != nil {
return nil, status.Errorf(codes.Unauthenticated, err.Error())
}
err = endpoint.validateBucket(ctx, req.Bucket)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, err.Error())
}
satStreamID := &pb.SatStreamID{
Bucket: req.Bucket,
EncryptedPath: req.EncryptedPath,
Version: req.Version,
CreationDate: time.Now(),
}
satStreamID, err = signing.SignStreamID(ctx, endpoint.satellite, satStreamID)
if err != nil {
return nil, status.Errorf(codes.Internal, err.Error())
}
encodedStreamID, err := proto.Marshal(satStreamID)
if err != nil {
return nil, status.Errorf(codes.Internal, err.Error())
}
streamID, err := storj.StreamIDFromBytes(encodedStreamID)
if err != nil {
return nil, status.Errorf(codes.Internal, err.Error())
}
return &pb.ObjectBeginDeleteResponse{
StreamId: streamID,
}, nil
}
// FinishDeleteObject finishes object deletion
func (endpoint *Endpoint) FinishDeleteObject(ctx context.Context, req *pb.ObjectFinishDeleteRequest) (resp *pb.ObjectFinishDeleteResponse, err error) {
defer mon.Task()(&ctx)(&err)
streamID := &pb.SatStreamID{}
err = proto.Unmarshal(req.StreamId, streamID)
if err != nil {
return nil, status.Errorf(codes.Internal, err.Error())
}
err = signing.VerifyStreamID(ctx, endpoint.satellite, streamID)
if err != nil {
return nil, status.Errorf(codes.Unauthenticated, err.Error())
}
if streamID.CreationDate.Before(time.Now().Add(-satIDExpiration)) {
return nil, status.Errorf(codes.InvalidArgument, "stream ID expired")
}
_, err = endpoint.validateAuth(ctx, macaroon.Action{
Op: macaroon.ActionDelete,
Bucket: streamID.Bucket,
EncryptedPath: streamID.EncryptedPath,
Time: time.Now(),
})
if err != nil {
return nil, status.Errorf(codes.Unauthenticated, err.Error())
}
// we don't need to do anything for shim implementation
return &pb.ObjectFinishDeleteResponse{}, nil
}

View File

@ -6,6 +6,7 @@ package metainfo_test
import (
"context"
"sort"
"strconv"
"testing"
"time"
@ -813,3 +814,103 @@ func TestBucketNameValidation(t *testing.T) {
}
})
}
func TestBeginCommitObject(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 0, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
apiKey := planet.Uplinks[0].APIKey[planet.Satellites[0].ID()]
uplink := planet.Uplinks[0]
config := uplink.GetConfig(planet.Satellites[0])
metainfoService := planet.Satellites[0].Metainfo.Service
projects, err := planet.Satellites[0].DB.Console().Projects().GetAll(ctx)
require.NoError(t, err)
projectID := projects[0].ID
bucket := storj.Bucket{
Name: "initial-bucket",
ProjectID: projectID,
PathCipher: config.GetEncryptionParameters().CipherSuite,
}
_, err = metainfoService.CreateBucket(ctx, bucket)
require.NoError(t, err)
metainfo, err := planet.Uplinks[0].DialMetainfo(ctx, planet.Satellites[0], apiKey)
require.NoError(t, err)
defer ctx.Check(metainfo.Close)
streamID, err := metainfo.BeginObject(
ctx,
[]byte(bucket.Name),
[]byte("encrypted-path"),
1,
storj.RedundancyScheme{},
storj.EncryptionParameters{},
time.Time{},
testrand.Nonce(),
testrand.Bytes(memory.KiB),
)
require.NoError(t, err)
err = metainfo.CommitObject(ctx, streamID)
require.NoError(t, err)
})
}
func TestBeginFinishDeleteObject(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 0, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
apiKey := planet.Uplinks[0].APIKey[planet.Satellites[0].ID()]
metainfo, err := planet.Uplinks[0].DialMetainfo(ctx, planet.Satellites[0], apiKey)
require.NoError(t, err)
defer ctx.Check(metainfo.Close)
streamID, err := metainfo.BeginDeleteObject(
ctx,
[]byte("initial-bucket"),
[]byte("encrypted-path"),
1,
)
require.NoError(t, err)
err = metainfo.FinishDeleteObject(ctx, streamID)
require.NoError(t, err)
})
}
func TestListObjects(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 0, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
apiKey := planet.Uplinks[0].APIKey[planet.Satellites[0].ID()]
uplink := planet.Uplinks[0]
files := make([]string, 10)
data := testrand.Bytes(1 * memory.KiB)
for i := 0; i < len(files); i++ {
files[i] = "path" + strconv.Itoa(i)
err := uplink.Upload(ctx, planet.Satellites[0], "testbucket", files[i], data)
require.NoError(t, err)
}
metainfo, err := planet.Uplinks[0].DialMetainfo(ctx, planet.Satellites[0], apiKey)
require.NoError(t, err)
defer ctx.Check(metainfo.Close)
items, _, err := metainfo.ListObjects(ctx, []byte("testbucket"), []byte(""), []byte(""), 0)
require.NoError(t, err)
require.Equal(t, len(files), len(items))
for _, item := range items {
require.NotEmpty(t, item.EncryptedPath)
require.True(t, item.CreatedAt.Before(time.Now()))
}
items, _, err = metainfo.ListObjects(ctx, []byte("testbucket"), []byte(""), []byte(""), 3)
require.NoError(t, err)
require.Equal(t, 3, len(items))
})
}

View File

@ -418,6 +418,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, config *Config, ve
peer.DB.Console().APIKeys(),
peer.Accounting.ProjectUsage,
config.Metainfo.RS,
signing.SignerFromFullIdentity(peer.Identity),
)
pb.RegisterMetainfoServer(peer.Server.GRPC(), peer.Metainfo.Endpoint2)

View File

@ -341,3 +341,103 @@ func convertProtoToBucket(pbBucket *pb.Bucket) storj.Bucket {
},
}
}
// BeginObject begins object creation
func (client *Client) BeginObject(ctx context.Context, bucket []byte, encryptedPath []byte, version int32,
rs storj.RedundancyScheme, ep storj.EncryptionParameters, expiresAt time.Time, nonce storj.Nonce, encryptedMetadata []byte) (_ storj.StreamID, err error) {
defer mon.Task()(&ctx)(&err)
// TODO do proper algorithm conversion
response, err := client.client.BeginObject(ctx, &pb.ObjectBeginRequest{
Bucket: bucket,
EncryptedPath: encryptedPath,
Version: version,
ExpiresAt: expiresAt,
EncryptedMetadataNonce: nonce,
EncryptedMetadata: encryptedMetadata,
RedundancyScheme: &pb.RedundancyScheme{
Type: pb.RedundancyScheme_RS,
ErasureShareSize: rs.ShareSize,
MinReq: int32(rs.RequiredShares),
RepairThreshold: int32(rs.RepairShares),
SuccessThreshold: int32(rs.OptimalShares),
Total: int32(rs.TotalShares),
},
EncryptionParameters: &pb.EncryptionParameters{
CipherSuite: pb.CipherSuite(ep.CipherSuite),
BlockSize: int64(ep.BlockSize),
},
})
if err != nil {
return nil, Error.Wrap(err)
}
return response.StreamId, nil
}
// CommitObject commits created object
func (client *Client) CommitObject(ctx context.Context, streamID storj.StreamID) (err error) {
defer mon.Task()(&ctx)(&err)
_, err = client.client.CommitObject(ctx, &pb.ObjectCommitRequest{
StreamId: streamID,
})
return Error.Wrap(err)
}
// BeginDeleteObject begins object deletion process
func (client *Client) BeginDeleteObject(ctx context.Context, bucket []byte, encryptedPath []byte, version int32) (_ storj.StreamID, err error) {
defer mon.Task()(&ctx)(&err)
response, err := client.client.BeginDeleteObject(ctx, &pb.ObjectBeginDeleteRequest{
Bucket: bucket,
EncryptedPath: encryptedPath,
Version: version,
})
if err != nil {
return storj.StreamID{}, Error.Wrap(err)
}
return response.StreamId, nil
}
// FinishDeleteObject finishes object deletion process
func (client *Client) FinishDeleteObject(ctx context.Context, streamID storj.StreamID) (err error) {
defer mon.Task()(&ctx)(&err)
_, err = client.client.FinishDeleteObject(ctx, &pb.ObjectFinishDeleteRequest{
StreamId: streamID,
})
return Error.Wrap(err)
}
// ListObjects lists objects according to specific parameters
func (client *Client) ListObjects(ctx context.Context, bucket []byte, encryptedPrefix []byte, encryptedCursor []byte, limit int32) (_ []storj.ObjectListItem, more bool, err error) {
defer mon.Task()(&ctx)(&err)
response, err := client.client.ListObjects(ctx, &pb.ObjectListRequest{
Bucket: bucket,
EncryptedPrefix: encryptedPrefix,
EncryptedCursor: encryptedCursor,
Limit: limit,
})
if err != nil {
return []storj.ObjectListItem{}, false, Error.Wrap(err)
}
objects := make([]storj.ObjectListItem, len(response.Items))
for i, object := range response.Items {
objects[i] = storj.ObjectListItem{
EncryptedPath: object.EncryptedPath,
Version: object.Version,
Status: int32(object.Status),
StatusAt: object.StatusAt,
CreatedAt: object.CreatedAt,
ExpiresAt: object.ExpiresAt,
EncryptedMetadataNonce: object.EncryptedMetadataNonce,
EncryptedMetadata: object.EncryptedMetadata,
}
}
return objects, response.More, Error.Wrap(err)
}