storagenode-updater: add autoupdating (#3422)

This commit is contained in:
Michal Niewrzal 2019-10-31 05:27:53 -07:00 committed by GitHub
parent 7e7de8a29d
commit 015350e230
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 380 additions and 123 deletions

View File

@ -9,6 +9,7 @@ import (
"bytes"
"context"
"errors"
"fmt"
"io"
"io/ioutil"
"log"
@ -37,10 +38,15 @@ import (
"storj.io/storj/pkg/storj"
)
const minCheckInterval = time.Minute
const (
updaterServiceName = "storagenode-updater"
minCheckInterval = time.Minute
)
var (
cancel context.CancelFunc
// TODO: replace with config value of random bytes in storagenode config.
nodeID storj.NodeID
rootCmd = &cobra.Command{
Use: "storagenode-updater",
@ -60,6 +66,7 @@ var (
BinaryLocation string `help:"the storage node executable binary location" default:"storagenode.exe"`
ServiceName string `help:"storage node OS service name" default:"storagenode"`
// NB: can't use `log.output` because windows service command args containing "." are bugged.
Log string `help:"path to log file, if empty standard output will be used" default:""`
}
@ -67,6 +74,8 @@ var (
identityDir string
)
type renameFunc func(currentVersion version.SemVer) error
func init() {
// TODO: this will probably generate warnings for mismatched config fields.
defaultConfDir := fpath.ApplicationDir("storj", "storagenode")
@ -81,14 +90,8 @@ func init() {
}
func cmdRun(cmd *cobra.Command, args []string) (err error) {
if runCfg.Log != "" {
logFile, err := os.OpenFile(runCfg.Log, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
if err != nil {
log.Fatalf("error opening log file: %s", err)
}
defer func() { err = errs.Combine(err, logFile.Close()) }()
log.SetOutput(logFile)
}
closeLog, err := openLog()
defer func() { err = errs.Combine(err, closeLog()) }()
if !fileExists(runCfg.BinaryLocation) {
log.Fatal("unable to find storage node executable binary")
@ -98,9 +101,13 @@ func cmdRun(cmd *cobra.Command, args []string) (err error) {
if err != nil {
log.Fatalf("error loading identity: %s", err)
}
nodeID = ident.ID
if nodeID.IsZero() {
log.Fatal("empty node ID")
}
var ctx context.Context
ctx, cancel = context.WithCancel(context.Background())
ctx, cancel = process.Ctx(cmd)
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
@ -112,7 +119,13 @@ func cmdRun(cmd *cobra.Command, args []string) (err error) {
}()
loopFunc := func(ctx context.Context) (err error) {
if err := update(ctx, ident.ID); err != nil {
if err := update(ctx, runCfg.BinaryLocation, runCfg.ServiceName, renameStoragenode); err != nil {
// don't finish loop in case of error just wait for another execution
log.Println(err)
}
updaterBinName := os.Args[0]
if err := update(ctx, updaterBinName, updaterServiceName, renameUpdater); err != nil {
// don't finish loop in case of error just wait for another execution
log.Println(err)
}
@ -136,24 +149,30 @@ func cmdRun(cmd *cobra.Command, args []string) (err error) {
return nil
}
func update(ctx context.Context, nodeID storj.NodeID) (err error) {
client := checker.New(runCfg.ClientConfig)
func update(ctx context.Context, binPath, serviceName string, renameBinary renameFunc) (err error) {
if nodeID.IsZero() {
log.Fatal("empty node ID")
}
currentVersion, err := binaryVersion(runCfg.BinaryLocation)
var currentVersion version.SemVer
if serviceName == updaterServiceName {
// TODO: find better way to check this binary version
currentVersion = version.Build.Version
} else {
currentVersion, err = binaryVersion(binPath)
if err != nil {
return errs.Wrap(err)
}
}
client := checker.New(runCfg.ClientConfig)
log.Println("downloading versions from", runCfg.ServerAddress)
shouldUpdate, newVersion, err := client.ShouldUpdate(ctx, runCfg.ServiceName, nodeID)
shouldUpdate, newVersion, err := client.ShouldUpdate(ctx, serviceName, nodeID)
if err != nil {
return errs.Wrap(err)
}
if shouldUpdate {
downloadURL := newVersion.URL
downloadURL = strings.Replace(downloadURL, "{os}", runtime.GOOS, 1)
downloadURL = strings.Replace(downloadURL, "{arch}", runtime.GOARCH, 1)
// TODO: consolidate semver.Version and version.SemVer
suggestedVersion, err := newVersion.SemVer()
if err != nil {
@ -161,12 +180,13 @@ func update(ctx context.Context, nodeID storj.NodeID) (err error) {
}
if currentVersion.Compare(suggestedVersion) < 0 {
tempArchive, err := ioutil.TempFile(os.TempDir(), runCfg.ServiceName)
tempArchive, err := ioutil.TempFile(os.TempDir(), serviceName)
if err != nil {
return errs.New("cannot create temporary archive: %v", err)
}
defer func() { err = errs.Combine(err, os.Remove(tempArchive.Name())) }()
downloadURL := parseDownloadURL(newVersion.URL)
log.Println("start downloading", downloadURL, "to", tempArchive.Name())
err = downloadArchive(ctx, tempArchive, downloadURL)
if err != nil {
@ -174,24 +194,19 @@ func update(ctx context.Context, nodeID storj.NodeID) (err error) {
}
log.Println("finished downloading", downloadURL, "to", tempArchive.Name())
extension := filepath.Ext(runCfg.BinaryLocation)
if extension != "" {
extension = "." + extension
}
dir := filepath.Dir(runCfg.BinaryLocation)
backupExec := filepath.Join(dir, runCfg.ServiceName+".old."+currentVersion.String()+extension)
if err = os.Rename(runCfg.BinaryLocation, backupExec); err != nil {
return errs.Wrap(err)
}
err = unpackBinary(ctx, tempArchive.Name(), runCfg.BinaryLocation)
err = renameBinary(currentVersion)
if err != nil {
return errs.Wrap(err)
}
downloadedVersion, err := binaryVersion(runCfg.BinaryLocation)
err = unpackBinary(ctx, tempArchive.Name(), binPath)
if err != nil {
return errs.Wrap(err)
}
// TODO add here recovery even before starting service (if version command cannot be executed)
downloadedVersion, err := binaryVersion(binPath)
if err != nil {
return errs.Wrap(err)
}
@ -200,13 +215,13 @@ func update(ctx context.Context, nodeID storj.NodeID) (err error) {
return errs.New("invalid version downloaded: wants %s got %s", suggestedVersion.String(), downloadedVersion.String())
}
log.Println("restarting service", runCfg.ServiceName)
err = restartSNService(runCfg.ServiceName)
log.Println("restarting service", serviceName)
err = restartService(serviceName)
if err != nil {
// TODO: should we try to recover from this?
return errs.New("unable to restart service: %v", err)
}
log.Println("service", runCfg.ServiceName, "restarted successfully")
log.Println("service", serviceName, "restarted successfully")
// TODO remove old binary ??
return nil
@ -217,9 +232,41 @@ func update(ctx context.Context, nodeID storj.NodeID) (err error) {
return nil
}
func renameStoragenode(currentVersion version.SemVer) error {
extension := filepath.Ext(runCfg.BinaryLocation)
dir := filepath.Dir(runCfg.BinaryLocation)
backupExec := filepath.Join(dir, runCfg.ServiceName+".old."+currentVersion.String()+extension)
if err := os.Rename(runCfg.BinaryLocation, backupExec); err != nil {
return errs.Wrap(err)
}
return nil
}
func renameUpdater(_ version.SemVer) error {
updaterBinName := os.Args[0]
extension := filepath.Ext(updaterBinName)
dir := filepath.Dir(updaterBinName)
base := filepath.Base(updaterBinName)
base = base[:len(base)-len(extension)]
backupExec := filepath.Join(dir, base+".old"+extension)
if err := os.Rename(updaterBinName, backupExec); err != nil {
return errs.Wrap(err)
}
return nil
}
func parseDownloadURL(template string) string {
url := strings.Replace(template, "{os}", runtime.GOOS, 1)
url = strings.Replace(url, "{arch}", runtime.GOARCH, 1)
return url
}
func binaryVersion(location string) (version.SemVer, error) {
out, err := exec.Command(location, "version").Output()
out, err := exec.Command(location, "version").CombinedOutput()
if err != nil {
log.Printf("command output: %s", out)
return version.SemVer{}, err
}
@ -283,16 +330,26 @@ func unpackBinary(ctx context.Context, archive, target string) (err error) {
return nil
}
func restartSNService(name string) error {
func restartService(name string) error {
switch runtime.GOOS {
case "windows":
// TODO how run this as one command `net stop servicename && net start servicename`?
// TODO: combine stdout with err if err
_, err := exec.Command("net", "stop", name).CombinedOutput()
restartSvcBatPath := filepath.Join(os.TempDir(), "restartservice.bat")
restartSvcBat, err := os.Create(restartSvcBatPath)
if err != nil {
return err
}
_, err = exec.Command("net", "start", name).CombinedOutput()
restartStr := fmt.Sprintf("net stop %s && net start %s", name, name)
_, err = restartSvcBat.WriteString(restartStr)
if err != nil {
return err
}
if err := restartSvcBat.Close(); err != nil {
return err
}
_, err = exec.Command(restartSvcBat.Name()).CombinedOutput()
if err != nil {
return err
}
@ -311,5 +368,22 @@ func fileExists(filename string) bool {
}
func main() {
_ = rootCmd.Execute()
process.Exec(rootCmd)
}
// TODO: improve logging; other commands use zap but due to an apparent
// windows bug we're unable to use the existing process logging infrastructure.
func openLog() (closeFunc func() error, err error) {
closeFunc = func() error { return nil }
if runCfg.Log != "" {
logFile, err := os.OpenFile(runCfg.Log, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
if err != nil {
log.Printf("error opening log file: %s", err)
return closeFunc, err
}
log.SetOutput(logFile)
return logFile.Close, nil
}
return closeFunc, nil
}

View File

@ -4,68 +4,197 @@
package main_test
import (
"archive/zip"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/http/httptest"
"os"
"os/exec"
"path/filepath"
"runtime"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
"storj.io/storj/internal/testcontext"
"storj.io/storj/internal/testidentity"
"storj.io/storj/internal/version"
"storj.io/storj/pkg/identity"
"storj.io/storj/pkg/storj"
"storj.io/storj/versioncontrol"
)
func TestAutoUpdater(t *testing.T) {
const (
oldVersion = "v0.19.0"
newVersion = "v0.19.5"
)
func TestAutoUpdater_unix(t *testing.T) {
if runtime.GOOS == "windows" {
t.Skip("requires storagenode and storagenode-updater to be installed as windows services")
}
// TODO cleanup `.exe` extension for different OS
ctx := testcontext.New(t)
defer ctx.Cleanup()
testFiles := []struct {
src string
dst string
perms os.FileMode
}{
{
"testdata/fake-storagenode",
ctx.File("storagenode"),
0755,
},
{
"testdata/fake-ident.cert",
ctx.File("identity.cert"),
0644,
},
{
"testdata/fake-ident.key",
ctx.File("identity.key"),
0600,
},
}
for _, file := range testFiles {
content, err := ioutil.ReadFile(file.src)
oldSemVer, err := version.NewSemVer(oldVersion)
require.NoError(t, err)
err = ioutil.WriteFile(file.dst, content, file.perms)
newSemVer, err := version.NewSemVer(newVersion)
require.NoError(t, err)
oldInfo := version.Info{
Timestamp: time.Now(),
CommitHash: "",
Version: oldSemVer,
Release: false,
}
// build real bin with old version, will be used for both storagenode and updater
oldBin := ctx.CompileWithVersion("", oldInfo)
storagenodePath := ctx.File("fake", "storagenode.exe")
copy(ctx, t, oldBin, storagenodePath)
updaterPath := ctx.File("fake", "storagenode-updater.exe")
move(t, oldBin, updaterPath)
// build real storagenode and updater with new version
newInfo := version.Info{
Timestamp: time.Now(),
CommitHash: "",
Version: newSemVer,
Release: false,
}
newBin := ctx.CompileWithVersion("", newInfo)
updateBins := map[string]string{
"storagenode": newBin,
"storagenode-updater": newBin,
}
// run versioncontrol and update zips http servers
versionControlPeer, cleanupVersionControl := testVersionControlWithUpdates(ctx, t, updateBins)
defer cleanupVersionControl()
logPath := ctx.File("storagenode-updater.log")
// write identity files to disk for use in rollout calculation
identConfig := testIdentityFiles(ctx, t)
// run updater (update)
args := []string{"run"}
args = append(args, "--config-dir", ctx.Dir())
args = append(args, "--server-address", "http://"+versionControlPeer.Addr())
args = append(args, "--binary-location", storagenodePath)
args = append(args, "--check-interval", "0s")
args = append(args, "--identity.cert-path", identConfig.CertPath)
args = append(args, "--identity.key-path", identConfig.KeyPath)
args = append(args, "--log", logPath)
// NB: updater currently uses `log.SetOutput` so all output after that call
// only goes to the log file.
out, err := exec.Command(updaterPath, args...).CombinedOutput()
logData, logErr := ioutil.ReadFile(logPath)
if assert.NoError(t, logErr) {
logStr := string(logData)
t.Log(logStr)
if !assert.Contains(t, logStr, "storagenode restarted successfully") {
t.Log(logStr)
}
if !assert.Contains(t, logStr, "storagenode-updater restarted successfully") {
t.Log(logStr)
}
} else {
t.Log(string(out))
}
if !assert.NoError(t, err) {
t.FailNow()
}
oldStoragenode := ctx.File("fake", "storagenode"+".old."+oldVersion+".exe")
oldStoragenodeInfo, err := os.Stat(oldStoragenode)
require.NoError(t, err)
require.NotNil(t, oldStoragenodeInfo)
require.NotZero(t, oldStoragenodeInfo.Size())
backupUpdater := ctx.File("fake", "storagenode-updater.old.exe")
backupUpdaterInfo, err := os.Stat(backupUpdater)
require.NoError(t, err)
require.NotNil(t, backupUpdaterInfo)
require.NotZero(t, backupUpdaterInfo.Size())
}
func move(t *testing.T, src, dst string) {
err := os.Rename(src, dst)
require.NoError(t, err)
}
func copy(ctx *testcontext.Context, t *testing.T, src, dst string) {
s, err := os.Open(src)
require.NoError(t, err)
defer ctx.Check(s.Close)
d, err := os.OpenFile(dst, os.O_WRONLY|os.O_CREATE, 0755)
require.NoError(t, err)
defer ctx.Check(d.Close)
_, err = io.Copy(d, s)
require.NoError(t, err)
}
func testIdentityFiles(ctx *testcontext.Context, t *testing.T) identity.Config {
t.Helper()
ident, err := testidentity.PregeneratedIdentity(0, storj.LatestIDVersion())
require.NoError(t, err)
identConfig := identity.Config{
CertPath: ctx.File("identity", "identity.cert"),
KeyPath: ctx.File("identity", "identity.Key"),
}
err = identConfig.Save(ident)
require.NoError(t, err)
configData := fmt.Sprintf(
"identity.cert-path: %s\nidentity.key-path: %s",
identConfig.CertPath,
identConfig.KeyPath,
)
err = ioutil.WriteFile(ctx.File("config.yaml"), []byte(configData), 0644)
require.NoError(t, err)
return identConfig
}
func testVersionControlWithUpdates(ctx *testcontext.Context, t *testing.T, updateBins map[string]string) (peer *versioncontrol.Peer, cleanup func()) {
t.Helper()
var mux http.ServeMux
content, err := ioutil.ReadFile("testdata/fake-storagenode.zip")
for name, src := range updateBins {
dst := ctx.File("updates", name+".zip")
zipBin(ctx, t, dst, src)
zipData, err := ioutil.ReadFile(dst)
require.NoError(t, err)
mux.HandleFunc("/download", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, err := w.Write(content)
mux.HandleFunc("/"+name, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, err := w.Write(zipData)
require.NoError(t, err)
}))
}
ts := httptest.NewServer(&mux)
defer ts.Close()
config := &versioncontrol.Config{
Address: "127.0.0.1:0",
// TODO: add STORJ_VERSION_SERVER_ADDR property to Product.wxs for testing
// TODO: set address back to `127.0.0.1:0`
Address: "127.0.0.1:10000",
// NB: this config field is required for versioncontrol to run.
Versions: versioncontrol.OldVersionConfig{
Satellite: "v0.0.1",
Storagenode: "v0.0.1",
@ -73,11 +202,22 @@ func TestAutoUpdater(t *testing.T) {
Gateway: "v0.0.1",
Identity: "v0.0.1",
},
// TODO use random seed
Binary: versioncontrol.ProcessesConfig{
Storagenode: versioncontrol.ProcessConfig{
Suggested: versioncontrol.VersionConfig{
Version: "0.19.5",
URL: ts.URL + "/download",
Version: newVersion,
URL: ts.URL + "/storagenode",
},
Rollout: versioncontrol.RolloutConfig{
Seed: "0000000000000000000000000000000000000000000000000000000000000001",
Cursor: 100,
},
},
StoragenodeUpdater: versioncontrol.ProcessConfig{
Suggested: versioncontrol.VersionConfig{
Version: newVersion,
URL: ts.URL + "/storagenode-updater",
},
Rollout: versioncontrol.RolloutConfig{
Seed: "0000000000000000000000000000000000000000000000000000000000000001",
@ -91,31 +231,30 @@ func TestAutoUpdater(t *testing.T) {
ctx.Go(func() error {
return peer.Run(ctx)
})
defer ctx.Check(peer.Close)
args := make([]string, 0)
args = append(args, "run")
args = append(args, "main.go")
args = append(args, "run")
args = append(args, "--server-address")
args = append(args, "http://"+peer.Addr())
args = append(args, "--binary-location")
args = append(args, testFiles[0].dst)
args = append(args, "--check-interval")
args = append(args, "0s")
args = append(args, "--identity.cert-path")
args = append(args, testFiles[1].dst)
args = append(args, "--identity.key-path")
args = append(args, testFiles[2].dst)
out, err := exec.Command("go", args...).CombinedOutput()
result := string(out)
if !assert.NoError(t, err) {
t.Log(result)
t.Fatal(err)
}
if !assert.Contains(t, result, "restarted successfully") {
t.Log(result)
return peer, func() {
ts.Close()
ctx.Check(peer.Close)
}
}
func zipBin(ctx *testcontext.Context, t *testing.T, dst, src string) {
t.Helper()
zipFile, err := os.Create(dst)
require.NoError(t, err)
base := filepath.Base(dst)
base = base[:len(base)-len(".zip")]
writer := zip.NewWriter(zipFile)
defer ctx.Check(writer.Close)
contents, err := writer.Create(base)
require.NoError(t, err)
data, err := ioutil.ReadFile(src)
require.NoError(t, err)
_, err = contents.Write(data)
require.NoError(t, err)
}

View File

@ -9,7 +9,10 @@ import (
"path"
"path/filepath"
"runtime"
"strconv"
"testing"
"storj.io/storj/internal/version"
)
// CLibMath is the standard C math library (see `man math.h`).
@ -24,7 +27,7 @@ type CompileCOptions struct {
}
// Compile compiles the specified package and returns the executable name.
func (ctx *Context) Compile(pkg string) string {
func (ctx *Context) Compile(pkg string, preArgs ...string) string {
ctx.test.Helper()
var binName string
@ -37,7 +40,7 @@ func (ctx *Context) Compile(pkg string) string {
exe := ctx.File("build", binName+".exe")
args := []string{"build"}
args := append([]string{"build"}, preArgs...)
if raceEnabled {
args = append(args, "-race")
}
@ -58,6 +61,36 @@ func (ctx *Context) Compile(pkg string) string {
return exe
}
// CompileWithVersion compiles the specified package with the version variables set
// to the passed version info values and returns the executable name.
func (ctx *Context) CompileWithVersion(pkg string, info version.Info) string {
ctx.test.Helper()
ldFlagsX := map[string]string{
"storj.io/storj/internal/version.buildTimestamp": strconv.Itoa(int(info.Timestamp.Unix())),
"storj.io/storj/internal/version.buildCommitHash": info.CommitHash,
"storj.io/storj/internal/version.buildVersion": info.Version.String(),
"storj.io/storj/internal/version.buildRelease": strconv.FormatBool(info.Release),
}
return ctx.CompileWithLDFlagsX(pkg, ldFlagsX)
}
// CompileWithLDFlagsX compiles the specified package with the -ldflags flag set to
// "-s -w [-X <key>=<value>,...]" given the passed map and returns the executable name.
func (ctx *Context) CompileWithLDFlagsX(pkg string, ldFlagsX map[string]string) string {
ctx.test.Helper()
var ldFlags = "-s -w"
if ldFlagsX != nil {
for key, value := range ldFlagsX {
ldFlags += (" -X " + key + "=" + value)
}
}
return ctx.Compile(pkg, "-ldflags", ldFlags)
}
// CompileShared compiles pkg as c-shared.
// TODO: support inclusion from other directories
// (cgo header paths are currently relative to package root)

View File

@ -110,7 +110,7 @@ func (client *Client) Process(ctx context.Context, processName string) (process
}
processesValue := reflect.ValueOf(versions.Processes)
field := processesValue.FieldByName(strings.Title(processName))
field := processesValue.FieldByName(kebabToPascal(processName))
processNameErr := Error.New("invalid process name: %s\n", processName)
if field == (reflect.Value{}) {
@ -140,3 +140,7 @@ func (client *Client) ShouldUpdate(ctx context.Context, processName string, node
}
return false, version.Version{}, nil
}
func kebabToPascal(str string) string {
return strings.ReplaceAll(strings.Title(str), "-", "")
}

View File

@ -82,6 +82,7 @@ type AllowedVersions struct {
type Processes struct {
Satellite Process `json:"satellite"`
Storagenode Process `json:"storagenode"`
StoragenodeUpdater Process `json:"storagenode-updater"`
Uplink Process `json:"uplink"`
Gateway Process `json:"gateway"`
Identity Process `json:"identity"`

View File

@ -115,7 +115,7 @@ func TestShouldUpdate(t *testing.T) {
tolerance := total * 2 / 100 // 2%
for p := 10; p < 100; p += 10 {
var updates int
var rollouts int
percentage := p
cursor := version.PercentageToCursor(percentage)
@ -131,12 +131,12 @@ func TestShouldUpdate(t *testing.T) {
require.NoError(t, err)
if version.ShouldUpdate(rollout, nodeID) {
updates++
rollouts++
}
}
assert.Condition(t, func() bool {
diff := updates - (total * percentage / 100)
diff := rollouts - (total * percentage / 100)
return int(math.Abs(float64(diff))) < tolerance
})
}

View File

@ -51,6 +51,7 @@ type OldVersionConfig struct {
type ProcessesConfig struct {
Satellite ProcessConfig
Storagenode ProcessConfig
StoragenodeUpdater ProcessConfig
Uplink ProcessConfig
Gateway ProcessConfig
Identity ProcessConfig
@ -161,6 +162,11 @@ func New(log *zap.Logger, config *Config) (peer *Peer, err error) {
return nil, RolloutErr.Wrap(err)
}
peer.Versions.Processes.StoragenodeUpdater, err = configToProcess(config.Binary.StoragenodeUpdater)
if err != nil {
return nil, RolloutErr.Wrap(err)
}
peer.Versions.Processes.Uplink, err = configToProcess(config.Binary.Uplink)
if err != nil {
return nil, RolloutErr.Wrap(err)