2021-03-31 16:56:34 +01:00
// Copyright (C) 2021 Storj Labs, Inc.
// See LICENSE for copying information.
package main
import (
2021-12-09 19:07:01 +00:00
"context"
"errors"
2021-04-06 20:19:11 +01:00
"fmt"
"io"
2021-03-31 16:56:34 +01:00
"strconv"
2021-12-09 19:07:01 +00:00
"strings"
2021-06-25 02:55:13 +01:00
"sync"
2022-03-07 00:54:48 +00:00
"time"
2021-03-31 16:56:34 +01:00
2021-04-06 20:19:11 +01:00
progressbar "github.com/cheggaaa/pb/v3"
2021-03-31 16:56:34 +01:00
"github.com/zeebo/clingy"
2021-04-06 20:19:11 +01:00
"github.com/zeebo/errs"
2021-05-06 17:56:57 +01:00
2022-04-22 12:40:38 +01:00
"storj.io/common/context2"
2021-12-09 19:07:01 +00:00
"storj.io/common/memory"
2022-04-15 15:10:39 +01:00
"storj.io/common/rpc/rpcpool"
2021-06-25 02:55:13 +01:00
"storj.io/common/sync2"
2022-01-06 19:55:46 +00:00
"storj.io/storj/cmd/uplink/ulext"
"storj.io/storj/cmd/uplink/ulfs"
"storj.io/storj/cmd/uplink/ulloc"
2021-03-31 16:56:34 +01:00
)
type cmdCp struct {
2021-05-26 21:19:29 +01:00
ex ulext . External
2021-03-31 16:56:34 +01:00
2021-12-16 15:19:50 +00:00
access string
recursive bool
transfers int
dryrun bool
progress bool
byteRange string
2022-03-07 00:54:48 +00:00
expires time . Time
2022-05-13 11:21:01 +01:00
metadata map [ string ] string
2021-04-06 20:19:11 +01:00
2021-12-16 15:19:50 +00:00
parallelism int
parallelismChunkSize memory . Size
2021-12-09 19:07:01 +00:00
2021-05-06 17:56:57 +01:00
source ulloc . Location
dest ulloc . Location
2021-03-31 16:56:34 +01:00
}
2022-05-12 20:35:02 +01:00
const maxPartCount int64 = 10000
2021-05-26 21:19:29 +01:00
func newCmdCp ( ex ulext . External ) * cmdCp {
return & cmdCp { ex : ex }
}
2021-03-31 16:56:34 +01:00
2021-05-26 21:19:29 +01:00
func ( c * cmdCp ) Setup ( params clingy . Parameters ) {
2021-06-25 17:51:05 +01:00
c . access = params . Flag ( "access" , "Access name or value to use" , "" ) . ( string )
2021-05-25 00:11:50 +01:00
c . recursive = params . Flag ( "recursive" , "Peform a recursive copy" , false ,
2021-03-31 16:56:34 +01:00
clingy . Short ( 'r' ) ,
2021-12-09 19:21:52 +00:00
clingy . Transform ( strconv . ParseBool ) , clingy . Boolean ,
2021-03-31 16:56:34 +01:00
) . ( bool )
2021-12-16 15:19:50 +00:00
c . transfers = params . Flag ( "transfers" , "Controls how many uploads/downloads to perform in parallel" , 1 ,
clingy . Short ( 't' ) ,
2021-06-25 02:55:13 +01:00
clingy . Transform ( strconv . Atoi ) ,
clingy . Transform ( func ( n int ) ( int , error ) {
if n <= 0 {
2022-01-04 14:50:14 +00:00
return 0 , errs . New ( "transfers must be at least 1" )
2021-06-25 02:55:13 +01:00
}
return n , nil
} ) ,
) . ( int )
2021-12-09 19:21:52 +00:00
c . dryrun = params . Flag ( "dry-run" , "Print what operations would happen but don't execute them" , false ,
clingy . Transform ( strconv . ParseBool ) , clingy . Boolean ,
2021-04-06 20:19:11 +01:00
) . ( bool )
2021-05-25 00:11:50 +01:00
c . progress = params . Flag ( "progress" , "Show a progress bar when possible" , true ,
2021-12-09 19:21:52 +00:00
clingy . Transform ( strconv . ParseBool ) , clingy . Boolean ,
2021-05-12 17:16:56 +01:00
) . ( bool )
2021-10-26 11:49:03 +01:00
c . byteRange = params . Flag ( "range" , "Downloads the specified range bytes of an object. For more information about the HTTP Range header, see https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35" , "" ) . ( string )
2021-03-31 16:56:34 +01:00
2022-04-26 21:34:14 +01:00
c . parallelism = params . Flag ( "parallelism" , "Controls how many parallel chunks to upload/download from a file" , 1 ,
2021-12-16 15:19:50 +00:00
clingy . Short ( 'p' ) ,
2021-12-09 19:07:01 +00:00
clingy . Transform ( strconv . Atoi ) ,
clingy . Transform ( func ( n int ) ( int , error ) {
if n <= 0 {
2022-01-04 14:50:14 +00:00
return 0 , errs . New ( "parallelism must be at least 1" )
2021-12-09 19:07:01 +00:00
}
return n , nil
} ) ,
) . ( int )
2022-04-22 12:07:26 +01:00
c . parallelismChunkSize = params . Flag ( "parallelism-chunk-size" , "Set the size of the chunks for parallelism, 0 means automatic adjustment" , memory . Size ( 0 ) ,
2021-12-09 19:07:01 +00:00
clingy . Transform ( memory . ParseString ) ,
2022-04-22 12:07:26 +01:00
clingy . Transform ( func ( n int64 ) ( memory . Size , error ) {
if n < 0 {
return 0 , errs . New ( "parallelism-chunk-size cannot be below 0" )
}
return memory . Size ( n ) , nil
} ) ,
2021-12-09 19:07:01 +00:00
) . ( memory . Size )
2022-03-07 00:54:48 +00:00
c . expires = params . Flag ( "expires" ,
"Schedule removal after this time (e.g. '+2h', 'now', '2020-01-02T15:04:05Z0700')" ,
2022-03-14 12:45:47 +00:00
time . Time { } , clingy . Transform ( parseHumanDate ) , clingy . Type ( "relative_date" ) ) . ( time . Time )
2022-03-07 00:54:48 +00:00
2022-05-13 11:21:01 +01:00
c . metadata = params . Flag ( "metadata" ,
"optional metadata for the object. Please use a single level JSON object of string to string only" ,
nil , clingy . Transform ( parseJSON ) , clingy . Type ( "string" ) ) . ( map [ string ] string )
2022-05-26 17:24:27 +01:00
c . source = params . Arg ( "source" , "Source to copy, use - for standard input" , clingy . Transform ( ulloc . Parse ) ) . ( ulloc . Location )
c . dest = params . Arg ( "dest" , "Destination to copy, use - for standard output" , clingy . Transform ( ulloc . Parse ) ) . ( ulloc . Location )
2021-03-31 16:56:34 +01:00
}
2022-08-30 10:51:31 +01:00
func ( c * cmdCp ) Execute ( ctx context . Context ) error {
2022-04-15 15:10:39 +01:00
fs , err := c . ex . OpenFilesystem ( ctx , c . access , ulext . ConnectionPoolOptions ( rpcpool . Options {
Capacity : 100 * c . parallelism ,
KeyCapacity : 5 ,
IdleExpiration : 2 * time . Minute ,
} ) )
2021-04-06 20:19:11 +01:00
if err != nil {
return err
}
defer func ( ) { _ = fs . Close ( ) } ( )
2021-06-25 02:55:13 +01:00
// we ensure the source and destination are lexically directoryish
// if they map to directories. the destination is always converted to be
// directoryish if the copy is recursive.
if fs . IsLocalDir ( ctx , c . source ) {
c . source = c . source . AsDirectoryish ( )
}
if c . recursive || fs . IsLocalDir ( ctx , c . dest ) {
c . dest = c . dest . AsDirectoryish ( )
}
2021-04-06 20:19:11 +01:00
if c . recursive {
2021-12-09 19:07:01 +00:00
if c . byteRange != "" {
return errs . New ( "unable to do recursive copy with byte range" )
}
2021-04-06 20:19:11 +01:00
return c . copyRecursive ( ctx , fs )
}
2021-06-25 02:55:13 +01:00
// if the destination is directoryish, we add the basename of the source
// to the end of the destination to pick a filename.
var base string
if c . dest . Directoryish ( ) && ! c . source . Std ( ) {
// we undirectoryish the source so that we ignore any trailing slashes
// when finding the base name.
var ok bool
base , ok = c . source . Undirectoryish ( ) . Base ( )
if ! ok {
return errs . New ( "destination is a directory and cannot find base name for source %q" , c . source )
}
}
c . dest = joinDestWith ( c . dest , base )
if ! c . source . Std ( ) && ! c . dest . Std ( ) {
2022-08-30 10:51:31 +01:00
fmt . Fprintln ( clingy . Stdout ( ctx ) , copyVerb ( c . source , c . dest ) , c . source , "to" , c . dest )
2021-06-25 02:55:13 +01:00
}
2021-05-12 17:16:56 +01:00
return c . copyFile ( ctx , fs , c . source , c . dest , c . progress )
2021-04-06 20:19:11 +01:00
}
2022-08-30 10:51:31 +01:00
func ( c * cmdCp ) copyRecursive ( ctx context . Context , fs ulfs . Filesystem ) error {
2021-04-06 20:19:11 +01:00
if c . source . Std ( ) || c . dest . Std ( ) {
return errs . New ( "cannot recursively copy to stdin/stdout" )
}
2021-10-02 00:47:53 +01:00
iter , err := fs . List ( ctx , c . source , & ulfs . ListOptions {
Recursive : true ,
} )
2021-04-06 20:19:11 +01:00
if err != nil {
return err
}
2021-06-25 02:55:13 +01:00
var (
2021-12-16 15:19:50 +00:00
limiter = sync2 . NewLimiter ( c . transfers )
2021-06-25 02:55:13 +01:00
es errs . Group
mu sync . Mutex
)
fprintln := func ( w io . Writer , args ... interface { } ) {
mu . Lock ( )
defer mu . Unlock ( )
fmt . Fprintln ( w , args ... )
}
addError := func ( err error ) {
2022-06-01 07:19:09 +01:00
if err == nil {
return
}
2021-06-25 02:55:13 +01:00
mu . Lock ( )
defer mu . Unlock ( )
es . Add ( err )
}
2021-04-06 20:19:11 +01:00
for iter . Next ( ) {
2021-06-25 02:55:13 +01:00
source := iter . Item ( ) . Loc
rel , err := c . source . RelativeTo ( source )
2021-04-06 20:19:11 +01:00
if err != nil {
return err
}
2021-06-25 02:55:13 +01:00
dest := joinDestWith ( c . dest , rel )
2021-04-06 20:19:11 +01:00
2021-06-25 02:55:13 +01:00
ok := limiter . Go ( ctx , func ( ) {
2022-08-30 10:51:31 +01:00
fprintln ( clingy . Stdout ( ctx ) , copyVerb ( source , dest ) , source , "to" , dest )
2021-04-06 20:19:11 +01:00
2021-06-25 02:55:13 +01:00
if err := c . copyFile ( ctx , fs , source , dest , false ) ; err != nil {
2022-08-30 10:51:31 +01:00
fprintln ( clingy . Stdout ( ctx ) , copyVerb ( source , dest ) , "failed:" , err . Error ( ) )
2021-06-25 02:55:13 +01:00
addError ( err )
}
} )
if ! ok {
break
2021-04-06 20:19:11 +01:00
}
}
2021-06-25 02:55:13 +01:00
limiter . Wait ( )
2021-04-06 20:19:11 +01:00
if err := iter . Err ( ) ; err != nil {
return errs . Wrap ( err )
2021-06-25 02:55:13 +01:00
} else if len ( es ) > 0 {
return es . Err ( )
2021-04-06 20:19:11 +01:00
}
2021-03-31 16:56:34 +01:00
return nil
}
2021-04-06 20:19:11 +01:00
2022-08-30 10:51:31 +01:00
func ( c * cmdCp ) copyFile ( ctx context . Context , fs ulfs . Filesystem , source , dest ulloc . Location , progress bool ) error {
2021-04-06 20:19:11 +01:00
if c . dryrun {
return nil
}
2022-03-15 10:07:19 +00:00
if dest . Remote ( ) && source . Remote ( ) {
return fs . Copy ( ctx , source , dest )
}
2021-12-09 19:07:01 +00:00
offset , length , err := parseRange ( c . byteRange )
if err != nil {
return errs . Wrap ( err )
2021-12-09 18:36:11 +00:00
}
2021-10-26 11:49:03 +01:00
2021-12-09 18:36:11 +00:00
mrh , err := fs . Open ( ctx , source )
if err != nil {
return err
}
defer func ( ) { _ = mrh . Close ( ) } ( )
2022-05-13 11:21:01 +01:00
mwh , err := fs . Create ( ctx , dest , & ulfs . CreateOptions {
Expires : c . expires ,
Metadata : c . metadata ,
} )
2021-12-09 19:03:42 +00:00
if err != nil {
return err
}
defer func ( ) { _ = mwh . Abort ( ctx ) } ( )
2021-04-06 20:19:11 +01:00
var bar * progressbar . ProgressBar
2021-12-09 19:07:01 +00:00
if progress && ! c . dest . Std ( ) {
2022-08-30 10:51:31 +01:00
bar = progressbar . New64 ( 0 ) . SetWriter ( clingy . Stdout ( ctx ) )
2021-04-06 20:19:11 +01:00
defer bar . Finish ( )
}
2022-05-12 21:40:26 +01:00
partSize , err := c . calculatePartSize ( mrh . Length ( ) , c . parallelismChunkSize . Int64 ( ) )
2022-05-12 20:35:02 +01:00
if err != nil {
return err
}
2022-05-27 14:16:58 +01:00
return errs . Wrap ( c . parallelCopy (
2021-12-09 19:07:01 +00:00
ctx ,
mwh , mrh ,
2022-05-12 20:35:02 +01:00
c . parallelism , partSize ,
2021-12-09 19:07:01 +00:00
offset , length ,
bar ,
) )
2021-04-06 20:19:11 +01:00
}
2022-05-12 20:35:02 +01:00
// calculatePartSize returns the needed part size in order to upload the file with size of 'length'.
// It hereby respects if the client requests/prefers a certain size and only increases if needed.
2022-05-12 21:40:26 +01:00
// If length is -1 (ie. stdin input), then this will limit to 64MiB and the total file length to 640GB.
2022-05-12 20:35:02 +01:00
func ( c * cmdCp ) calculatePartSize ( length , preferredSize int64 ) ( requiredSize int64 , err error ) {
segC := ( length / maxPartCount / ( memory . MiB * 64 ) . Int64 ( ) ) + 1
requiredSize = segC * ( memory . MiB * 64 ) . Int64 ( )
switch {
case preferredSize == 0 :
return requiredSize , nil
case requiredSize <= preferredSize :
return preferredSize , nil
default :
return 0 , errs . New ( fmt . Sprintf ( "the specified chunk size %s is too small, requires %s or larger" ,
memory . FormatBytes ( preferredSize ) , memory . FormatBytes ( requiredSize ) ) )
}
}
2021-05-06 17:56:57 +01:00
func copyVerb ( source , dest ulloc . Location ) string {
2021-04-06 20:19:11 +01:00
switch {
2021-05-06 17:56:57 +01:00
case dest . Remote ( ) :
2021-04-06 20:19:11 +01:00
return "upload"
2021-05-06 17:56:57 +01:00
case source . Remote ( ) :
2021-04-06 20:19:11 +01:00
return "download"
default :
return "copy"
}
}
2021-06-25 02:55:13 +01:00
func joinDestWith ( dest ulloc . Location , suffix string ) ulloc . Location {
dest = dest . AppendKey ( suffix )
// if the destination is local and directoryish, remove any
// trailing slashes that it has. this makes it so that if
// a remote file is name "foo/", then we copy it down as
// just "foo".
if dest . Local ( ) && dest . Directoryish ( ) {
dest = dest . Undirectoryish ( )
}
return dest
}
2021-12-09 19:07:01 +00:00
2022-05-27 14:16:58 +01:00
func ( c * cmdCp ) parallelCopy (
2022-08-30 10:51:31 +01:00
clctx context . Context ,
2021-12-09 19:07:01 +00:00
dst ulfs . MultiWriteHandle ,
src ulfs . MultiReadHandle ,
p int , chunkSize int64 ,
offset , length int64 ,
bar * progressbar . ProgressBar ) error {
if offset != 0 {
if err := src . SetOffset ( offset ) ; err != nil {
return err
}
}
var (
limiter = sync2 . NewLimiter ( p )
es errs . Group
mu sync . Mutex
)
ctx , cancel := context . WithCancel ( clctx )
defer func ( ) { _ = src . Close ( ) } ( )
2022-04-22 12:40:38 +01:00
defer func ( ) {
nocancel := context2 . WithoutCancellation ( ctx )
timedctx , cancel := context . WithTimeout ( nocancel , 5 * time . Second )
defer cancel ( )
_ = dst . Abort ( timedctx )
} ( )
2021-12-09 19:07:01 +00:00
defer cancel ( )
2022-05-27 16:08:29 +01:00
addError := func ( err error ) {
2022-06-01 07:19:09 +01:00
if err == nil {
return
}
2022-05-27 16:08:29 +01:00
mu . Lock ( )
defer mu . Unlock ( )
es . Add ( err )
// abort all other concurrenty copies
cancel ( )
}
2022-06-01 07:19:09 +01:00
var readBufs * ulfs . BytesPool
if p > 1 && c . dest . Std ( ) {
// Create the read buffer pool only for downloads to stdout with parallelism > 1.
readBufs = ulfs . NewBytesPool ( int ( chunkSize ) )
}
2021-12-09 19:07:01 +00:00
for i := 0 ; length != 0 ; i ++ {
i := i
chunk := chunkSize
if length > 0 && chunkSize > length {
chunk = length
}
length -= chunk
rh , err := src . NextPart ( ctx , chunk )
2022-05-27 16:08:29 +01:00
if err != nil {
if ! errors . Is ( err , io . EOF ) {
addError ( errs . New ( "error getting reader for part %d: %v" , i , err ) )
}
2021-12-09 19:07:01 +00:00
break
}
wh , err := dst . NextPart ( ctx , chunk )
if err != nil {
_ = rh . Close ( )
2022-05-27 16:08:29 +01:00
addError ( errs . New ( "error getting writer for part %d: %v" , i , err ) )
break
2021-12-09 19:07:01 +00:00
}
ok := limiter . Go ( ctx , func ( ) {
defer func ( ) { _ = rh . Close ( ) } ( )
defer func ( ) { _ = wh . Abort ( ) } ( )
2022-06-01 07:19:09 +01:00
if readBufs != nil {
buf := readBufs . Get ( )
defer readBufs . Put ( buf )
rh = ulfs . NewBufferedReadHandle ( ctx , rh , buf )
}
2021-12-09 19:07:01 +00:00
var w io . Writer = wh
if bar != nil {
bar . SetTotal ( rh . Info ( ) . ContentLength ) . Start ( )
w = bar . NewProxyWriter ( w )
}
2022-04-22 12:40:38 +01:00
_ , err := sync2 . Copy ( ctx , w , rh )
2021-12-09 19:07:01 +00:00
if err == nil {
err = wh . Commit ( )
}
2022-04-22 12:40:38 +01:00
if err != nil {
// TODO: it would be also nice to use wh.Abort and rh.Close directly
// to avoid some of the waiting that's caused by sync2.Copy.
//
// However, some of the source / destination implementations don't seem
// to have concurrent safe API with that regards.
//
// Also, we may want to check that it actually helps, before implementing it.
2021-12-09 19:07:01 +00:00
2022-05-27 16:08:29 +01:00
addError ( errs . New ( "failed to %s part %d: %v" , copyVerb ( c . source , c . dest ) , i , err ) )
2022-05-27 14:16:58 +01:00
}
2021-12-09 19:07:01 +00:00
} )
if ! ok {
2022-05-27 14:16:58 +01:00
break
2021-12-09 19:07:01 +00:00
}
}
limiter . Wait ( )
2022-05-27 14:16:58 +01:00
// don't try to commit if any error occur
if len ( es ) == 0 {
es . Add ( dst . Commit ( ctx ) )
}
2021-12-09 19:07:01 +00:00
2022-05-27 14:16:58 +01:00
return errs . Wrap ( combineErrs ( es ) )
2021-12-09 19:07:01 +00:00
}
func parseRange ( r string ) ( offset , length int64 , err error ) {
r = strings . TrimPrefix ( strings . TrimSpace ( r ) , "bytes=" )
if r == "" {
return 0 , - 1 , nil
}
if strings . Contains ( r , "," ) {
return 0 , 0 , errs . New ( "invalid range: must be single range" )
}
idx := strings . Index ( r , "-" )
if idx < 0 {
return 0 , 0 , errs . New ( ` invalid range: no "-" ` )
}
start , end := strings . TrimSpace ( r [ : idx ] ) , strings . TrimSpace ( r [ idx + 1 : ] )
var starti , endi int64
if start != "" {
starti , err = strconv . ParseInt ( start , 10 , 64 )
if err != nil {
return 0 , 0 , errs . New ( "invalid range: %w" , err )
}
}
if end != "" {
endi , err = strconv . ParseInt ( end , 10 , 64 )
if err != nil {
return 0 , 0 , errs . New ( "invalid range: %w" , err )
}
}
switch {
case start == "" && end == "" :
return 0 , 0 , errs . New ( "invalid range" )
case start == "" :
return - endi , - 1 , nil
case end == "" :
return starti , - 1 , nil
case starti < 0 :
return 0 , 0 , errs . New ( "invalid range: negative start: %q" , start )
case starti > endi :
return 0 , 0 , errs . New ( "invalid range: %v > %v" , starti , endi )
default :
return starti , endi - starti + 1 , nil
}
}
2022-05-27 14:16:58 +01:00
// combineErrs makes a more readable error message from the errors group.
func combineErrs ( group errs . Group ) error {
if len ( group ) == 0 {
return nil
}
errstrings := make ( [ ] string , len ( group ) )
for i , err := range group {
errstrings [ i ] = err . Error ( )
}
return fmt . Errorf ( "%s" , strings . Join ( errstrings , "\n" ) )
}