storagenode/pieces: ensure we can call Commit or Cancel only once (#1511)
This commit is contained in:
parent
117edec54c
commit
80916ffb53
@ -32,18 +32,23 @@ func (blob *blobReader) Size() (int64, error) {
|
|||||||
|
|
||||||
// blobWriter implements writing blobs
|
// blobWriter implements writing blobs
|
||||||
type blobWriter struct {
|
type blobWriter struct {
|
||||||
ref storage.BlobRef
|
ref storage.BlobRef
|
||||||
store *Store
|
store *Store
|
||||||
|
closed bool
|
||||||
|
|
||||||
*os.File
|
*os.File
|
||||||
}
|
}
|
||||||
|
|
||||||
func newBlobWriter(ref storage.BlobRef, store *Store, file *os.File) *blobWriter {
|
func newBlobWriter(ref storage.BlobRef, store *Store, file *os.File) *blobWriter {
|
||||||
return &blobWriter{ref, store, file}
|
return &blobWriter{ref, store, false, file}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Cancel discards the blob.
|
// Cancel discards the blob.
|
||||||
func (blob *blobWriter) Cancel() error {
|
func (blob *blobWriter) Cancel() error {
|
||||||
|
if blob.closed {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
blob.closed = true
|
||||||
err := blob.File.Close()
|
err := blob.File.Close()
|
||||||
removeErr := os.Remove(blob.File.Name())
|
removeErr := os.Remove(blob.File.Name())
|
||||||
return Error.Wrap(errs.Combine(err, removeErr))
|
return Error.Wrap(errs.Combine(err, removeErr))
|
||||||
@ -51,6 +56,10 @@ func (blob *blobWriter) Cancel() error {
|
|||||||
|
|
||||||
// Commit moves the file to the target location.
|
// Commit moves the file to the target location.
|
||||||
func (blob *blobWriter) Commit() error {
|
func (blob *blobWriter) Commit() error {
|
||||||
|
if blob.closed {
|
||||||
|
return Error.New("already closed")
|
||||||
|
}
|
||||||
|
blob.closed = true
|
||||||
err := blob.store.dir.Commit(blob.File, blob.ref)
|
err := blob.store.dir.Commit(blob.File, blob.ref)
|
||||||
return Error.Wrap(err)
|
return Error.Wrap(err)
|
||||||
}
|
}
|
||||||
|
@ -58,6 +58,10 @@ func TestStoreLoad(t *testing.T) {
|
|||||||
require.Equal(t, n, len(data))
|
require.Equal(t, n, len(data))
|
||||||
|
|
||||||
require.NoError(t, writer.Commit())
|
require.NoError(t, writer.Commit())
|
||||||
|
// after committing we should be able to call cancel without an error
|
||||||
|
require.NoError(t, writer.Cancel())
|
||||||
|
// two commits should fail
|
||||||
|
require.Error(t, writer.Commit())
|
||||||
}
|
}
|
||||||
|
|
||||||
namespace = randomValue()
|
namespace = randomValue()
|
||||||
@ -114,6 +118,8 @@ func TestStoreLoad(t *testing.T) {
|
|||||||
require.Equal(t, n, len(data))
|
require.Equal(t, n, len(data))
|
||||||
|
|
||||||
require.NoError(t, writer.Cancel())
|
require.NoError(t, writer.Cancel())
|
||||||
|
// commit after cancel should return an error
|
||||||
|
require.Error(t, writer.Commit())
|
||||||
|
|
||||||
_, err = store.Open(ctx, ref)
|
_, err = store.Open(ctx, ref)
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
|
@ -20,6 +20,8 @@ type Writer struct {
|
|||||||
hash hash.Hash
|
hash hash.Hash
|
||||||
blob storage.BlobWriter
|
blob storage.BlobWriter
|
||||||
size int64
|
size int64
|
||||||
|
|
||||||
|
closed bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewWriter creates a new writer for storage.BlobWriter.
|
// NewWriter creates a new writer for storage.BlobWriter.
|
||||||
@ -47,14 +49,22 @@ func (w *Writer) Hash() []byte { return w.hash.Sum(nil) }
|
|||||||
|
|
||||||
// Commit commits piece to permanent storage.
|
// Commit commits piece to permanent storage.
|
||||||
func (w *Writer) Commit() error {
|
func (w *Writer) Commit() error {
|
||||||
|
if w.closed {
|
||||||
|
return Error.New("already closed")
|
||||||
|
}
|
||||||
|
w.closed = true
|
||||||
if err := w.buf.Flush(); err != nil {
|
if err := w.buf.Flush(); err != nil {
|
||||||
return Error.Wrap(errs.Combine(err, w.Cancel()))
|
return Error.Wrap(errs.Combine(err, w.blob.Cancel()))
|
||||||
}
|
}
|
||||||
return Error.Wrap(w.blob.Commit())
|
return Error.Wrap(w.blob.Commit())
|
||||||
}
|
}
|
||||||
|
|
||||||
// Cancel deletes any temporarily written data.
|
// Cancel deletes any temporarily written data.
|
||||||
func (w *Writer) Cancel() error {
|
func (w *Writer) Cancel() error {
|
||||||
|
if w.closed {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
w.closed = true
|
||||||
w.buf.Reset(nil)
|
w.buf.Reset(nil)
|
||||||
return Error.Wrap(w.blob.Cancel())
|
return Error.Wrap(w.blob.Cancel())
|
||||||
}
|
}
|
||||||
|
@ -56,6 +56,8 @@ func TestPieces(t *testing.T) {
|
|||||||
|
|
||||||
// commit
|
// commit
|
||||||
require.NoError(t, writer.Commit())
|
require.NoError(t, writer.Commit())
|
||||||
|
// after commit we should be able to call cancel without an error
|
||||||
|
require.NoError(t, writer.Cancel())
|
||||||
}
|
}
|
||||||
|
|
||||||
{ // valid reads
|
{ // valid reads
|
||||||
@ -72,6 +74,8 @@ func TestPieces(t *testing.T) {
|
|||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, int(length), n)
|
require.Equal(t, int(length), n)
|
||||||
|
|
||||||
|
require.NoError(t, reader.Close())
|
||||||
|
|
||||||
return data
|
return data
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -99,10 +103,11 @@ func TestPieces(t *testing.T) {
|
|||||||
|
|
||||||
// cancel writing
|
// cancel writing
|
||||||
require.NoError(t, writer.Cancel())
|
require.NoError(t, writer.Cancel())
|
||||||
|
// commit should not fail
|
||||||
|
require.Error(t, writer.Commit())
|
||||||
|
|
||||||
// read should fail
|
// read should fail
|
||||||
_, err = store.Reader(ctx, satelliteID, cancelledPieceID)
|
_, err = store.Reader(ctx, satelliteID, cancelledPieceID)
|
||||||
assert.Error(t, err)
|
assert.Error(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user