storj/satellite/metainfo/metabase/raw.go
Michal Niewrzal 809eb14ac2 satellite/metainfo/metabase: move metainfo PoC into storj repo
Change-Id: I39356d8bc7305b4a8ea0c1fb5603010ad72a68b9
2020-10-29 09:44:55 +01:00

201 lines
4.6 KiB
Go

// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package metabase
import (
"context"
"time"
"github.com/zeebo/errs"
"storj.io/common/storj"
"storj.io/common/uuid"
)
// RawObject defines the full object that is stored in the database. It should be rarely used directly.
type RawObject struct {
ObjectStream
CreatedAt time.Time
ExpiresAt *time.Time
Status ObjectStatus
SegmentCount int32
EncryptedMetadataNonce []byte
EncryptedMetadata []byte
TotalEncryptedSize int64
FixedSegmentSize int32
Encryption storj.EncryptionParameters
// ZombieDeletionDeadline defines when the pending raw object should be deleted from the database.
// This is as a safeguard against objects that failed to upload and the client has not indicated
// whether they want to continue uploading or delete the already uploaded data.
ZombieDeletionDeadline *time.Time
}
// RawSegment defines the full segment that is stored in the database. It should be rarely used directly.
type RawSegment struct {
StreamID uuid.UUID
Position SegmentPosition
RootPieceID storj.PieceID
EncryptedKeyNonce []byte
EncryptedKey []byte
EncryptedSize int32 // size of the whole segment (not a piece)
PlainSize int32
PlainOffset int64
// TODO: add fields for proofs/chains
Redundancy storj.RedundancyScheme
InlineData []byte
Pieces Pieces
}
// RawState contains full state of a table.
type RawState struct {
Objects []RawObject
Segments []RawSegment
}
// TestingGetState returns the state of the database.
func (db *DB) TestingGetState(ctx context.Context) (_ *RawState, err error) {
state := &RawState{}
state.Objects, err = db.testingGetAllObjects(ctx)
if err != nil {
return nil, Error.New("GetState: %w", err)
}
state.Segments, err = db.testingGetAllSegments(ctx)
if err != nil {
return nil, Error.New("GetState: %w", err)
}
return state, nil
}
// TestingDeleteAll deletes all objects and segments from the database.
func (db *DB) TestingDeleteAll(ctx context.Context) (err error) {
_, err = db.db.ExecContext(ctx, `
DELETE FROM objects;
DELETE FROM segments;
`)
return Error.Wrap(err)
}
// testingGetAllObjects returns the state of the database.
func (db *DB) testingGetAllObjects(ctx context.Context) (_ []RawObject, err error) {
objs := []RawObject{}
rows, err := db.db.Query(ctx, `
SELECT
project_id, bucket_name, object_key, version, stream_id,
created_at, expires_at,
status, segment_count,
encrypted_metadata_nonce, encrypted_metadata,
total_encrypted_size, fixed_segment_size,
encryption,
zombie_deletion_deadline
FROM objects
`)
if err != nil {
return nil, Error.New("testingGetAllObjects query: %w", err)
}
defer func() { err = errs.Combine(err, rows.Close()) }()
for rows.Next() {
var obj RawObject
err := rows.Scan(
&obj.ProjectID,
&obj.BucketName,
&obj.ObjectKey,
&obj.Version,
&obj.StreamID,
&obj.CreatedAt,
&obj.ExpiresAt,
&obj.Status, // TODO: fix encoding
&obj.SegmentCount,
&obj.EncryptedMetadataNonce,
&obj.EncryptedMetadata,
&obj.TotalEncryptedSize,
&obj.FixedSegmentSize,
encryptionParameters{&obj.Encryption},
&obj.ZombieDeletionDeadline,
)
if err != nil {
return nil, Error.New("testingGetAllObjects scan failed: %w", err)
}
objs = append(objs, obj)
}
if err := rows.Err(); err != nil {
return nil, Error.New("testingGetAllObjects scan failed: %w", err)
}
if len(objs) == 0 {
return nil, nil
}
return objs, nil
}
// testingGetAllSegments returns the state of the database.
func (db *DB) testingGetAllSegments(ctx context.Context) (_ []RawSegment, err error) {
segs := []RawSegment{}
rows, err := db.db.Query(ctx, `
SELECT
stream_id, position,
root_piece_id, encrypted_key_nonce, encrypted_key,
encrypted_size,
plain_offset, plain_size,
redundancy,
inline_data, remote_pieces
FROM segments
`)
if err != nil {
return nil, Error.New("testingGetAllSegments query: %w", err)
}
defer func() { err = errs.Combine(err, rows.Close()) }()
for rows.Next() {
var seg RawSegment
err := rows.Scan(
&seg.StreamID,
&seg.Position,
&seg.RootPieceID,
&seg.EncryptedKeyNonce,
&seg.EncryptedKey,
&seg.EncryptedSize,
&seg.PlainOffset,
&seg.PlainSize,
redundancyScheme{&seg.Redundancy},
&seg.InlineData,
&seg.Pieces,
)
if err != nil {
return nil, Error.New("testingGetAllSegments scan failed: %w", err)
}
segs = append(segs, seg)
}
if err := rows.Err(); err != nil {
return nil, Error.New("testingGetAllSegments scan failed: %w", err)
}
if len(segs) == 0 {
return nil, nil
}
return segs, nil
}