cmd/storagenode-updater: simplify and reorder update sqeuence (#3487)

This commit is contained in:
Bryan White 2019-11-12 13:31:57 +01:00 committed by GitHub
parent b7a04eb881
commit 7cc4217fef
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 85 additions and 137 deletions

View File

@ -74,8 +74,6 @@ var (
identityDir string
)
type renameFunc func(currentVersion version.SemVer) error
func init() {
// TODO: this will probably generate warnings for mismatched config fields.
defaultConfDir := fpath.ApplicationDir("storj", "storagenode")
@ -119,17 +117,17 @@ func cmdRun(cmd *cobra.Command, args []string) (err error) {
}()
loopFunc := func(ctx context.Context) (err error) {
if err := update(ctx, runCfg.BinaryLocation, runCfg.ServiceName, renameStoragenode); err != nil {
if err := update(ctx, runCfg.BinaryLocation, runCfg.ServiceName); err != nil {
// don't finish loop in case of error just wait for another execution
log.Println(err)
}
// TODO: enable self-autoupdate back when having reliable recovery mechanism
// updaterBinName := os.Args[0]
// if err := update(ctx, updaterBinName, updaterServiceName, renameUpdater); err != nil {
// // don't finish loop in case of error just wait for another execution
// log.Println(err)
// }
//updaterBinName := os.Args[0]
//if err := update(ctx, updaterBinName, updaterServiceName); err != nil {
// // don't finish loop in case of error just wait for another execution
// log.Println(err)
//}
return nil
}
@ -150,7 +148,7 @@ func cmdRun(cmd *cobra.Command, args []string) (err error) {
return nil
}
func update(ctx context.Context, binPath, serviceName string, renameBinary renameFunc) (err error) {
func update(ctx context.Context, binPath, serviceName string) (err error) {
if nodeID.IsZero() {
log.Fatal("empty node ID")
}
@ -168,95 +166,93 @@ func update(ctx context.Context, binPath, serviceName string, renameBinary renam
client := checker.New(runCfg.ClientConfig)
log.Println("downloading versions from", runCfg.ServerAddress)
shouldUpdate, newVersion, err := client.ShouldUpdate(ctx, serviceName, nodeID)
processVersion, err := client.Process(ctx, serviceName)
if err != nil {
return errs.Wrap(err)
}
if shouldUpdate {
// TODO: consolidate semver.Version and version.SemVer
suggestedVersion, err := newVersion.SemVer()
if err != nil {
return errs.Wrap(err)
}
if currentVersion.Compare(suggestedVersion) < 0 {
tempArchive, err := ioutil.TempFile(os.TempDir(), serviceName)
if err != nil {
return errs.New("cannot create temporary archive: %v", err)
}
defer func() { err = errs.Combine(err, os.Remove(tempArchive.Name())) }()
downloadURL := parseDownloadURL(newVersion.URL)
log.Println("start downloading", downloadURL, "to", tempArchive.Name())
err = downloadArchive(ctx, tempArchive, downloadURL)
if err != nil {
return errs.Wrap(err)
}
log.Println("finished downloading", downloadURL, "to", tempArchive.Name())
err = renameBinary(currentVersion)
if err != nil {
return errs.Wrap(err)
}
err = unpackBinary(ctx, tempArchive.Name(), binPath)
if err != nil {
return errs.Wrap(err)
}
// TODO add here recovery even before starting service (if version command cannot be executed)
downloadedVersion, err := binaryVersion(binPath)
if err != nil {
return errs.Wrap(err)
}
if suggestedVersion.Compare(downloadedVersion) != 0 {
return errs.New("invalid version downloaded: wants %s got %s", suggestedVersion.String(), downloadedVersion.String())
}
log.Println("restarting service", serviceName)
err = restartService(serviceName)
if err != nil {
// TODO: should we try to recover from this?
return errs.New("unable to restart service: %v", err)
}
log.Println("service", serviceName, "restarted successfully")
// TODO remove old binary ??
return nil
}
}
log.Printf("%s version is up to date\n", runCfg.ServiceName)
return nil
}
func renameStoragenode(currentVersion version.SemVer) error {
extension := filepath.Ext(runCfg.BinaryLocation)
dir := filepath.Dir(runCfg.BinaryLocation)
backupExec := filepath.Join(dir, runCfg.ServiceName+".old."+currentVersion.String()+extension)
if err := os.Rename(runCfg.BinaryLocation, backupExec); err != nil {
// TODO: consolidate semver.Version and version.SemVer
suggestedVersion, err := processVersion.Suggested.SemVer()
if err != nil {
return errs.Wrap(err)
}
if currentVersion.Compare(suggestedVersion) >= 0 {
log.Printf("%s version is up to date\n", serviceName)
return nil
}
if !version.ShouldUpdate(processVersion.Rollout, nodeID) {
log.Printf("new %s version available but not rolled out to this nodeID yet\n", serviceName)
return nil
}
tempArchive, err := ioutil.TempFile(os.TempDir(), serviceName)
if err != nil {
return errs.New("cannot create temporary archive: %v", err)
}
defer func() { err = errs.Combine(err, os.Remove(tempArchive.Name())) }()
downloadURL := parseDownloadURL(processVersion.Suggested.URL)
log.Println("start downloading", downloadURL, "to", tempArchive.Name())
err = downloadArchive(ctx, tempArchive, downloadURL)
if err != nil {
return errs.Wrap(err)
}
log.Println("finished downloading", downloadURL, "to", tempArchive.Name())
newVersionPath := prependExtension(binPath, suggestedVersion.String())
err = unpackBinary(ctx, tempArchive.Name(), newVersionPath)
if err != nil {
return errs.Wrap(err)
}
// TODO add here recovery even before starting service (if version command cannot be executed)
downloadedVersion, err := binaryVersion(newVersionPath)
if err != nil {
return errs.Wrap(err)
}
if suggestedVersion.Compare(downloadedVersion) != 0 {
return errs.New("invalid version downloaded: wants %s got %s", suggestedVersion.String(), downloadedVersion.String())
}
// backup original binary
var backupPath string
if serviceName == updaterServiceName {
// NB: don't include old version number for updater binary backup
backupPath = prependExtension(binPath, "old")
} else {
backupPath = prependExtension(binPath, "old."+currentVersion.String())
}
if err := os.Rename(binPath, backupPath); err != nil {
return errs.Wrap(err)
}
// rename new binary to replace original
if err := os.Rename(newVersionPath, binPath); err != nil {
return errs.Wrap(err)
}
log.Println("restarting service", serviceName)
err = restartService(serviceName)
if err != nil {
// TODO: should we try to recover from this?
return errs.New("unable to restart service: %v", err)
}
log.Println("service", serviceName, "restarted successfully")
// TODO remove old binary ??
return nil
}
// func renameUpdater(_ version.SemVer) error {
// updaterBinName := os.Args[0]
// extension := filepath.Ext(updaterBinName)
// dir := filepath.Dir(updaterBinName)
// base := filepath.Base(updaterBinName)
// base = base[:len(base)-len(extension)]
// backupExec := filepath.Join(dir, base+".old"+extension)
// if err := os.Rename(updaterBinName, backupExec); err != nil {
// return errs.Wrap(err)
// }
// return nil
// }
func prependExtension(path, ext string) string {
originalExt := filepath.Ext(path)
dir, base := filepath.Split(path)
base = base[:len(base)-len(originalExt)]
return filepath.Join(dir, base+"."+ext+originalExt)
}
func parseDownloadURL(template string) string {
url := strings.Replace(template, "{os}", runtime.GOOS, 1)

View File

@ -17,7 +17,6 @@ import (
"gopkg.in/spacemonkeygo/monkit.v2"
"storj.io/storj/internal/version"
"storj.io/storj/pkg/storj"
)
var (
@ -124,23 +123,6 @@ func (client *Client) Process(ctx context.Context, processName string) (process
return process, nil
}
// ShouldUpdate downloads the rollout state from the versioncontrol server and
// checks if a user with the given nodeID should update, and if so, to what version.
func (client *Client) ShouldUpdate(ctx context.Context, processName string, nodeID storj.NodeID) (_ bool, _ version.Version, err error) {
defer mon.Task()(&ctx, processName)(&err)
process, err := client.Process(ctx, processName)
if err != nil {
return false, version.Version{}, Error.Wrap(err)
}
shouldUpdate := version.ShouldUpdate(process.Rollout, nodeID)
if shouldUpdate {
return true, process.Suggested, nil
}
return false, version.Version{}, nil
}
func kebabToPascal(str string) string {
return strings.ReplaceAll(strings.Title(str), "-", "")
}

View File

@ -15,7 +15,6 @@ import (
"storj.io/storj/internal/testcontext"
"storj.io/storj/internal/version"
"storj.io/storj/internal/version/checker"
"storj.io/storj/pkg/storj"
"storj.io/storj/versioncontrol"
)
@ -87,35 +86,6 @@ func TestClient_Process(t *testing.T) {
}
}
func TestClient_ShouldUpdate(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
peer := newTestPeer(t, ctx)
defer ctx.Check(peer.Close)
clientConfig := checker.ClientConfig{
ServerAddress: "http://" + peer.Addr(),
RequestTimeout: 0,
}
client := checker.New(clientConfig)
processesType := reflect.TypeOf(version.Processes{})
fieldCount := processesType.NumField()
for i := 1; i < fieldCount; i++ {
field := processesType.Field(i - 1)
expectedVersionStr := fmt.Sprintf("v%d.%d.%d", i, i+1, i+2)
// NB: test cursor is 100%; rollout/nodeID should-update calculation is tested elsewhere.
shouldUpdate, ver, err := client.ShouldUpdate(ctx, field.Name, storj.NodeID{})
require.NoError(t, err)
require.True(t, shouldUpdate)
require.Equal(t, expectedVersionStr, ver.Version)
}
}
func newTestPeer(t *testing.T, ctx *testcontext.Context) *versioncontrol.Peer {
t.Helper()