cmd/uplink: cancel on failed copy
Also ensure that abort is given at least 5 seconds to clear up any pending uploads on cancellation. Change-Id: I814aa407ee5783f2609a76b54de2879dcd5f89bb
This commit is contained in:
parent
978e0f1a26
commit
847ddaaab0
@ -17,6 +17,7 @@ import (
|
|||||||
"github.com/zeebo/clingy"
|
"github.com/zeebo/clingy"
|
||||||
"github.com/zeebo/errs"
|
"github.com/zeebo/errs"
|
||||||
|
|
||||||
|
"storj.io/common/context2"
|
||||||
"storj.io/common/memory"
|
"storj.io/common/memory"
|
||||||
"storj.io/common/rpc/rpcpool"
|
"storj.io/common/rpc/rpcpool"
|
||||||
"storj.io/common/sync2"
|
"storj.io/common/sync2"
|
||||||
@ -299,7 +300,12 @@ func parallelCopy(
|
|||||||
|
|
||||||
defer limiter.Wait()
|
defer limiter.Wait()
|
||||||
defer func() { _ = src.Close() }()
|
defer func() { _ = src.Close() }()
|
||||||
defer func() { _ = dst.Abort(ctx) }()
|
defer func() {
|
||||||
|
nocancel := context2.WithoutCancellation(ctx)
|
||||||
|
timedctx, cancel := context.WithTimeout(nocancel, 5*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
_ = dst.Abort(timedctx)
|
||||||
|
}()
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
for i := 0; length != 0; i++ {
|
for i := 0; length != 0; i++ {
|
||||||
@ -343,11 +349,23 @@ func parallelCopy(
|
|||||||
w = bar.NewProxyWriter(w)
|
w = bar.NewProxyWriter(w)
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err := io.Copy(w, rh)
|
_, err := sync2.Copy(ctx, w, rh)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
err = wh.Commit()
|
err = wh.Commit()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
// abort all other concurrenty copies
|
||||||
|
cancel()
|
||||||
|
// TODO: it would be also nice to use wh.Abort and rh.Close directly
|
||||||
|
// to avoid some of the waiting that's caused by sync2.Copy.
|
||||||
|
//
|
||||||
|
// However, some of the source / destination implementations don't seem
|
||||||
|
// to have concurrent safe API with that regards.
|
||||||
|
//
|
||||||
|
// Also, we may want to check that it actually helps, before implementing it.
|
||||||
|
}
|
||||||
|
|
||||||
mu.Lock()
|
mu.Lock()
|
||||||
defer mu.Unlock()
|
defer mu.Unlock()
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user