storj/scripts/test-network-stalls.go

231 lines
6.5 KiB
Go
Raw Permalink Normal View History

// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
//go:build ignore
// +build ignore
// Tests whether the uplink tool correctly times out when one of the storage nodes it's talking to
// suddenly stops responding. In particular, this currently tests that happening during a Delete
// operation, because that is where we have observed indefinite hangs before.
package main
import (
"bytes"
"context"
"flag"
"fmt"
"io"
"math/rand"
"os"
"os/exec"
"path/filepath"
"regexp"
"strconv"
"strings"
"syscall"
"time"
"github.com/zeebo/errs"
"storj.io/common/memory"
)
var (
numTries = flag.Int("num-tries", 20, "number of tries to cause a hang")
bucketName = flag.String("bucket", "bukkit", "name of bucket to use for test")
deleteTimeout = flag.Duration("timeout", 60*time.Second, "how long to wait for a delete to succeed or time out")
fileSize memory.Size = 5 * memory.MiB
tryAgain = errs.New("test needs to run again")
)
func init() {
flag.Var(&fileSize, "file-size", "size of test file to use")
}
type randDefaultSource struct{}
func (randSource *randDefaultSource) Read(p []byte) (int, error) {
return rand.Read(p)
}
func makeRandomContentsFile(path string, size memory.Size) (err error) {
outFile, err := os.Create(path)
if err != nil {
return err
}
defer func() {
err = errs.Combine(err, outFile.Close())
}()
if _, err := io.CopyN(outFile, &randDefaultSource{}, int64(size)); err != nil {
return err
}
return nil
}
type uplinkRunner struct {
execName string
configDir string
logLevel string
}
// Run runs the uplink executable with the given arguments, and hands back its
// output as well as an error if there were any problems with the execution or if
// the uplink exited non-zero.
func (ur *uplinkRunner) Run(ctx context.Context, args ...string) ([]byte, error) {
if ctx == nil {
ctx = context.Background()
}
cmdArgs := []string{"--config-dir", ur.configDir, "--log.level", ur.logLevel}
cmdArgs = append(cmdArgs, args...)
cmd := exec.CommandContext(ctx, ur.execName, cmdArgs...)
return cmd.CombinedOutput()
}
// skip the first four whitespace-delimited fields and keep the rest.
var lsOutputRegexp = regexp.MustCompile(`(?m)^\s*(?:\S+\s+){4}(.*)$`)
func (ur *uplinkRunner) doesRemoteExist(remotePath string) (bool, error) {
pathParts := strings.Split(remotePath, "/")
if len(pathParts) < 2 {
return false, errs.New("invalid remote path %q", remotePath)
}
bucketAndDir := strings.Join(pathParts[:len(pathParts)-1], "/")
filenamePart := []byte(pathParts[len(pathParts)-1])
output, err := ur.Run(nil, "ls", bucketAndDir)
if err != nil {
return false, err
}
for _, matches := range lsOutputRegexp.FindAllSubmatch(output, -1) {
if bytes.Equal(matches[1], filenamePart) {
return true, nil
}
}
return false, nil
}
func storeFileAndCheck(uplink *uplinkRunner, srcFile, dstFile string) error {
if _, err := uplink.Run(nil, "cp", srcFile, dstFile); err != nil {
return errs.New("Could not copy file into storj-sim network: %v", err)
}
if exists, err := uplink.doesRemoteExist(dstFile); err != nil {
return errs.New("Could not check if file exists: %v", err)
} else if !exists {
return errs.New("Copied file not present in storj-sim network!")
}
return nil
}
func stallNode(ctx context.Context, proc *os.Process) {
// send node a SIGSTOP, which causes it to freeze as if being traced
proc.Signal(syscall.SIGSTOP)
// until the context is done
<-ctx.Done()
// then let the node continue again
proc.Signal(syscall.SIGCONT)
}
func deleteWhileStallingAndCheck(uplink *uplinkRunner, dstFile string, nodeProc *os.Process) error {
ctx, cancel := context.WithTimeout(context.Background(), *deleteTimeout)
defer cancel()
go stallNode(ctx, nodeProc)
output, err := uplink.Run(ctx, "rm", dstFile)
if err != nil {
if ctx.Err() == context.DeadlineExceeded {
// (uplink did not time out, but this test did)
return errs.New("uplink DID NOT time out waiting for stalled node 0 while issuing a delete")
}
return errs.New("Unexpected error trying to delete file %q from storj-sim network: %v", dstFile, err)
}
if exists, err := uplink.doesRemoteExist(dstFile); err != nil {
return errs.New("Failed to check if remote file %q was deleted: %v", dstFile, err)
} else if exists {
return errs.New("Deleted file still present in storj-sim network!")
}
if strings.Contains(string(output), "context deadline exceeded") {
// the uplink correctly timed out when one of the target nodes was stalled! all is well
return nil
}
// delete worked fine, which means our stall didn't hit at the right time and we need to try again
return tryAgain
}
func runTest() error {
// check run environment
configDir := os.Getenv("GATEWAY_0_DIR")
if configDir == "" {
return errs.New("This test should be run under storj-sim test ($GATEWAY_0_DIR not found).")
}
nodePid, err := strconv.Atoi(os.Getenv("STORAGENODE_0_PID"))
if err != nil {
return errs.New("Empty or invalid $STORAGENODE_0_PID: %v", err)
}
nodeProc, err := os.FindProcess(nodePid)
if err != nil {
return errs.New("No such process %v! $STORAGENODE_0_PID is wrong", nodePid)
}
// set up test
uplink := &uplinkRunner{
execName: "uplink",
configDir: configDir,
logLevel: "error",
}
tempDir, err := os.MkdirTemp("", "storj-test-network-stalls.")
if err != nil {
return err
}
bucket := "sj://" + *bucketName
srcFile := filepath.Join(tempDir, "to-storj-sim")
dstFile := bucket + "/in-storj-sim"
if err := makeRandomContentsFile(srcFile, fileSize); err != nil {
return errs.New("could not create test file with random contents: %v", err)
}
if _, err := uplink.Run(nil, "mb", bucket); err != nil {
return errs.New("could not create test bucket: %v", err)
}
defer func() {
// explicitly ignoring errors here; we don't much care if they fail,
// because this is best-effort
_, _ = uplink.Run(nil, "rm", dstFile)
_, _ = uplink.Run(nil, "rb", bucket)
}()
// run test
for i := 0; i < *numTries; i++ {
fmt.Printf("%d\n", i)
if err := storeFileAndCheck(uplink, srcFile, dstFile); err != nil {
return err
}
err := deleteWhileStallingAndCheck(uplink, dstFile, nodeProc)
if err == nil {
// success!
break
}
if err != tryAgain {
// unexpected error
return err
}
}
// clean up test. this part isn't deferred and run unconditionally because
// we want to inspect things when the test has failed.
return os.RemoveAll(tempDir)
}
func main() {
flag.Parse()
if err := runTest(); err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
fmt.Println("SUCCESS")
}