Handle CTRL+C and clean up the partial segments and pieces (#381)
* merged the lasted master changes * debug working of handling ctrl+c * Handling of clean up of partially uploaded segments and pieces * code cleanup per code comment * updates based on code review comments
This commit is contained in:
parent
d8c0f18059
commit
cf329b1f37
@ -14,8 +14,8 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/cheggaaa/pb"
|
||||
"github.com/spf13/cobra"
|
||||
|
||||
"storj.io/storj/pkg/paths"
|
||||
"storj.io/storj/pkg/process"
|
||||
|
@ -8,8 +8,10 @@ import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"os"
|
||||
"os/signal"
|
||||
"strings"
|
||||
"sync"
|
||||
"syscall"
|
||||
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/spf13/pflag"
|
||||
@ -77,8 +79,16 @@ func Ctx(cmd *cobra.Command) context.Context {
|
||||
defer contextMtx.Unlock()
|
||||
ctx := contexts[cmd]
|
||||
if ctx == nil {
|
||||
return context.Background()
|
||||
ctx = context.Background()
|
||||
}
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
c := make(chan os.Signal, 1)
|
||||
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
|
||||
go func() {
|
||||
<-c
|
||||
signal.Stop(c)
|
||||
cancel()
|
||||
}()
|
||||
return ctx
|
||||
}
|
||||
|
||||
|
@ -82,7 +82,7 @@ func (ec *ecClient) Put(ctx context.Context, nodes []*pb.Node, rs eestream.Redun
|
||||
}
|
||||
|
||||
type info struct {
|
||||
i int
|
||||
i int
|
||||
err error
|
||||
}
|
||||
infos := make(chan info, len(nodes))
|
||||
@ -126,6 +126,18 @@ func (ec *ecClient) Put(ctx context.Context, nodes []*pb.Node, rs eestream.Redun
|
||||
}
|
||||
}
|
||||
|
||||
/* clean up the partially uploaded segment's pieces */
|
||||
defer func() {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
err = utils.CombineErrors(
|
||||
Error.New("upload cancelled by user"),
|
||||
ec.Delete(context.Background(), nodes, pieceID),
|
||||
)
|
||||
default:
|
||||
}
|
||||
}()
|
||||
|
||||
if successfulCount < rs.RepairThreshold() {
|
||||
return nil, Error.New("successful puts (%d) less than repair threshold (%d)", successfulCount, rs.RepairThreshold())
|
||||
}
|
||||
@ -153,7 +165,7 @@ func (ec *ecClient) Get(ctx context.Context, nodes []*pb.Node, es eestream.Erasu
|
||||
ch := make(chan rangerInfo, len(nodes))
|
||||
|
||||
for i, n := range nodes {
|
||||
if (n == nil) {
|
||||
if n == nil {
|
||||
ch <- rangerInfo{i: i, rr: nil, err: nil}
|
||||
continue
|
||||
}
|
||||
|
@ -14,6 +14,7 @@ import (
|
||||
|
||||
proto "github.com/gogo/protobuf/proto"
|
||||
"github.com/zeebo/errs"
|
||||
"go.uber.org/zap"
|
||||
monkit "gopkg.in/spacemonkeygo/monkit.v2"
|
||||
|
||||
"storj.io/storj/pkg/eestream"
|
||||
@ -100,6 +101,14 @@ func (s *streamStore) Put(ctx context.Context, path paths.Path, data io.Reader,
|
||||
var streamSize int64
|
||||
var putMeta segments.Meta
|
||||
|
||||
defer func() {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
s.cancelHandler(context.Background(), currentSegment, path)
|
||||
default:
|
||||
}
|
||||
}()
|
||||
|
||||
derivedKey, err := path.DeriveContentKey(s.rootKey)
|
||||
if err != nil {
|
||||
return Meta{}, err
|
||||
@ -418,3 +427,14 @@ func decryptRanger(ctx context.Context, rr ranger.Ranger, decryptedSize int64, c
|
||||
}
|
||||
return eestream.Unpad(rd, int(rd.Size()-decryptedSize))
|
||||
}
|
||||
|
||||
// CancelHandler handles clean up of segments on receiving CTRL+C
|
||||
func (s *streamStore) cancelHandler(ctx context.Context, totalSegments int64, path paths.Path) {
|
||||
for i := int64(0); i < totalSegments; i++ {
|
||||
currentPath := getSegmentPath(path, i)
|
||||
err := s.segments.Delete(ctx, currentPath)
|
||||
if err != nil {
|
||||
zap.S().Warnf("Failed deleting a segment %v %v", currentPath, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user