Abort trying to upload segments when reader fails (#326)

* abort trying to upload segments when reader fails

* also return the error when it fails
This commit is contained in:
Egon Elbre 2018-09-08 18:41:40 +03:00 committed by JT Olio
parent a575a8e293
commit 4671ac6c5b
2 changed files with 13 additions and 4 deletions

View File

@ -9,17 +9,20 @@ import "io"
type EOFAwareLimitReader struct {
reader io.Reader
eof bool
err error
}
// EOFAwareReader keeps track of the state, has the internal reader reached EOF
func EOFAwareReader(r io.Reader) *EOFAwareLimitReader {
return &EOFAwareLimitReader{reader: r, eof: false}
return &EOFAwareLimitReader{reader: r}
}
func (r *EOFAwareLimitReader) Read(p []byte) (n int, err error) {
n, err = r.reader.Read(p)
if err == io.EOF {
r.eof = true
} else if err != nil && r.err == nil {
r.err = err
}
return n, err
}
@ -27,3 +30,7 @@ func (r *EOFAwareLimitReader) Read(p []byte) (n int, err error) {
func (r *EOFAwareLimitReader) isEOF() bool {
return r.eof
}
func (r *EOFAwareLimitReader) hasError() bool {
return r.err != nil
}

View File

@ -85,12 +85,11 @@ func (s *streamStore) Put(ctx context.Context, path paths.Path, data io.Reader,
awareLimitReader := EOFAwareReader(data)
for !awareLimitReader.isEOF() {
for !awareLimitReader.isEOF() && !awareLimitReader.hasError() {
segmentPath := path.Prepend(fmt.Sprintf("s%d", totalSegments))
segmentData := io.LimitReader(awareLimitReader, s.segmentSize)
putMeta, err := s.segments.Put(ctx, segmentPath, segmentData,
nil, expiration)
putMeta, err := s.segments.Put(ctx, segmentPath, segmentData, nil, expiration)
if err != nil {
return Meta{}, err
}
@ -98,6 +97,9 @@ func (s *streamStore) Put(ctx context.Context, path paths.Path, data io.Reader,
totalSize = totalSize + putMeta.Size
totalSegments = totalSegments + 1
}
if awareLimitReader.hasError() {
return Meta{}, awareLimitReader.err
}
lastSegmentPath := path.Prepend("l")