From 5f47b7028d1074acdf44d514e3df0328278fb021 Mon Sep 17 00:00:00 2001 From: Bryan White Date: Fri, 21 Jun 2019 14:24:06 +0200 Subject: [PATCH] Uplink C bindings part 3 (#2258) * add object upload and download --- internal/testcontext/compile.go | 11 +- lib/uplinkc/bucket.go | 2 +- lib/uplinkc/object.go | 164 ++++++++++++++++++ lib/uplinkc/project.go | 18 +- lib/uplinkc/testdata/apikey_test.c | 1 + lib/uplinkc/testdata/bucket_test.c | 8 +- .../testdata/{helpers2.h => helpers.h} | 25 ++- lib/uplinkc/testdata/object_test.c | 109 ++++++++++++ lib/uplinkc/testdata/project_test.c | 4 +- lib/uplinkc/testdata_test.go | 3 + lib/uplinkc/uplink_definitions.h | 34 ++-- 11 files changed, 351 insertions(+), 28 deletions(-) create mode 100644 lib/uplinkc/object.go rename lib/uplinkc/testdata/{helpers2.h => helpers.h} (78%) create mode 100644 lib/uplinkc/testdata/object_test.c diff --git a/internal/testcontext/compile.go b/internal/testcontext/compile.go index 0bc4d10f2..c3c63f6b4 100644 --- a/internal/testcontext/compile.go +++ b/internal/testcontext/compile.go @@ -70,6 +70,12 @@ func (ctx *Context) CompileC(t *testing.T, file string, includes ...Include) str args = append(args, "-I", filepath.Dir(inc.Header)) } if inc.Library != "" { + if inc.Standard { + args = append(args, + "-l"+inc.Library, + ) + continue + } if runtime.GOOS == "windows" { args = append(args, "-L"+filepath.Dir(inc.Library), @@ -97,6 +103,7 @@ func (ctx *Context) CompileC(t *testing.T, file string, includes ...Include) str // Include defines an includable library for gcc. type Include struct { - Header string - Library string + Header string + Library string + Standard bool } diff --git a/lib/uplinkc/bucket.go b/lib/uplinkc/bucket.go index c37634fe5..e0ed0427d 100644 --- a/lib/uplinkc/bucket.go +++ b/lib/uplinkc/bucket.go @@ -85,7 +85,7 @@ func open_bucket(projectHandle C.ProjectRef, name *C.char, encryptionAccess C.En var access uplink.EncryptionAccess for i := range access.Key { - access.Key[i] = byte(encryptionAccess.key[0]) + access.Key[i] = byte(encryptionAccess.key[i]) } scope := project.scope.child() diff --git a/lib/uplinkc/object.go b/lib/uplinkc/object.go new file mode 100644 index 000000000..4d8bbcac3 --- /dev/null +++ b/lib/uplinkc/object.go @@ -0,0 +1,164 @@ +// Copyright (C) 2019 Storj Labs, Inc. +// See LICENSE for copying information. + +package main + +// #include "uplink_definitions.h" +import "C" + +import ( + "io" + "time" + "unsafe" + + "storj.io/storj/lib/uplink" +) + +type Upload struct { + scope + wc io.WriteCloser // 🚽 +} + +// upload uploads a new object, if authorized. +//export upload +func upload(cBucket C.BucketRef, path *C.char, cOpts *C.UploadOptions, cErr **C.char) (downloader C.UploaderRef) { + bucket, ok := universe.Get(cBucket._handle).(*Bucket) + if !ok { + *cErr = C.CString("invalid bucket") + return + } + + scope := bucket.scope.child() + + var opts *uplink.UploadOptions + if cOpts != nil { + var metadata map[string]string + + opts = &uplink.UploadOptions{ + ContentType: C.GoString(cOpts.content_type), + Metadata: metadata, + Expires: time.Unix(int64(cOpts.expires), 0), + } + } + + writeCloser, err := bucket.NewWriter(scope.ctx, C.GoString(path), opts) + if err != nil { + *cErr = C.CString(err.Error()) + return + } + + return C.UploaderRef{universe.Add(&Upload{ + scope: scope, + wc: writeCloser, + })} +} + +//export upload_write +func upload_write(uploader C.UploaderRef, bytes *C.uint8_t, length C.int, cErr **C.char) (writeLength C.int) { + upload, ok := universe.Get(uploader._handle).(*Upload) + if !ok { + *cErr = C.CString("invalid uploader") + return C.int(0) + } + + buf := (*[1 << 30]byte)(unsafe.Pointer(bytes))[:length] + + n, err := upload.wc.Write(buf) + if err == io.EOF { + return C.EOF + } + + return C.int(n) +} + +//export upload_commit +func upload_commit(uploader C.UploaderRef, cErr **C.char) { + upload, ok := universe.Get(uploader._handle).(*Upload) + if !ok { + *cErr = C.CString("invalid uploader") + return + } + + universe.Del(uploader._handle) + defer upload.cancel() + + err := upload.wc.Close() + if err != nil { + *cErr = C.CString(err.Error()) + return + } +} + +type Download struct { + scope + rc interface { + io.Reader + io.Seeker + io.Closer + } +} + +// download returns an Object's data. A length of -1 will mean +// (Object.Size - offset). +//export download +func download(bucketRef C.BucketRef, path *C.char, cErr **C.char) (downloader C.DownloaderRef) { + bucket, ok := universe.Get(bucketRef._handle).(*Bucket) + if !ok { + *cErr = C.CString("invalid bucket") + return + } + + scope := bucket.scope.child() + + rc, err := bucket.NewReader(scope.ctx, C.GoString(path)) + if err != nil { + *cErr = C.CString(err.Error()) + return + } + + return C.DownloaderRef{universe.Add(&Download{ + scope: scope, + rc: rc, + })} +} + +//export download_read +func download_read(downloader C.DownloaderRef, bytes *C.uint8_t, length C.int, cErr **C.char) (readLength C.int) { + download, ok := universe.Get(downloader._handle).(*Download) + if !ok { + *cErr = C.CString("invalid downloader") + return C.int(0) + } + + buf := (*[1 << 30]byte)(unsafe.Pointer(bytes))[:length] + + n, err := download.rc.Read(buf) + if err == io.EOF { + return C.EOF + } + + return C.int(n) +} + +//export download_close +func download_close(downloader C.DownloaderRef, cErr **C.char) { + download, ok := universe.Get(downloader._handle).(*Download) + if !ok { + *cErr = C.CString("invalid downloader") + } + + universe.Del(downloader._handle) + defer download.cancel() + + err := download.rc.Close() + if err != nil { + *cErr = C.CString(err.Error()) + return + } +} + +//export free_upload_opts +func free_upload_opts(uploadOpts *C.UploadOptions) { + C.free(unsafe.Pointer(uploadOpts.content_type)) + uploadOpts.content_type = nil +} diff --git a/lib/uplinkc/project.go b/lib/uplinkc/project.go index 47b1d041d..f7837da73 100644 --- a/lib/uplinkc/project.go +++ b/lib/uplinkc/project.go @@ -5,9 +5,11 @@ package main // #include "uplink_definitions.h" import "C" - import ( + "unsafe" + libuplink "storj.io/storj/lib/uplink" + "storj.io/storj/pkg/storj" ) // Project is a scoped uplink.Project @@ -18,7 +20,7 @@ type Project struct { //export open_project // open_project opens project using uplink -func open_project(uplinkHandle C.UplinkRef, satelliteAddr *C.char, apikeyHandle C.APIKeyRef, cerr **C.char) C.ProjectRef { +func open_project(uplinkHandle C.UplinkRef, satelliteAddr *C.char, apikeyHandle C.APIKeyRef, cProjectOpts *C.ProjectOptions, cerr **C.char) C.ProjectRef { uplink, ok := universe.Get(uplinkHandle._handle).(*Uplink) if !ok { *cerr = C.CString("invalid uplink") @@ -33,8 +35,16 @@ func open_project(uplinkHandle C.UplinkRef, satelliteAddr *C.char, apikeyHandle scope := uplink.scope.child() - // TODO: add project options argument - project, err := uplink.OpenProject(scope.ctx, C.GoString(satelliteAddr), apikey, nil) + var opts *libuplink.ProjectOptions + if unsafe.Pointer(cProjectOpts) != nil { + opts = &libuplink.ProjectOptions{} + opts.Volatile.EncryptionKey = new(storj.Key) + for i := range cProjectOpts.key { + opts.Volatile.EncryptionKey[i] = byte(cProjectOpts.key[i]) + } + } + + project, err := uplink.OpenProject(scope.ctx, C.GoString(satelliteAddr), apikey, opts) if err != nil { *cerr = C.CString(err.Error()) return C.ProjectRef{} diff --git a/lib/uplinkc/testdata/apikey_test.c b/lib/uplinkc/testdata/apikey_test.c index adc7d42be..85bfbf2fe 100644 --- a/lib/uplinkc/testdata/apikey_test.c +++ b/lib/uplinkc/testdata/apikey_test.c @@ -31,6 +31,7 @@ int main(int argc, char *argv[]) char *apikeySerialized = serialize_api_key(apikey, err); require_noerror(*err); + requiref(strcmp(apikeySerialized, apikeyStr) == 0, "got invalid serialized %s expected %s\n", apikeySerialized, apikeyStr); free(apikeySerialized); diff --git a/lib/uplinkc/testdata/bucket_test.c b/lib/uplinkc/testdata/bucket_test.c index 05fe2ec0f..10a9fc4d6 100644 --- a/lib/uplinkc/testdata/bucket_test.c +++ b/lib/uplinkc/testdata/bucket_test.c @@ -6,23 +6,23 @@ #include "require.h" #include "uplink.h" -#include "helpers2.h" +#include "helpers.h" void handle_project(ProjectRef project); int main(int argc, char *argv[]) { - with_test_project(&handle_project); + with_test_project(&handle_project, NULL); } void handle_project(ProjectRef project) { char *_err = ""; char **err = &_err; - char *bucket_names[] = {"TestBucket1", "TestBucket2", "TestBucket3", "TestBucket4"}; + char *bucket_names[] = {"test-bucket1", "test-bucket2", "test-bucket3", "test-bucket4"}; int num_of_buckets = sizeof(bucket_names) / sizeof(bucket_names[0]); // TODO: test with different bucket configs - {// Create buckets + { // Create buckets for (int i=0; i < num_of_buckets; i++) { char *bucket_name = bucket_names[i]; diff --git a/lib/uplinkc/testdata/helpers2.h b/lib/uplinkc/testdata/helpers.h similarity index 78% rename from lib/uplinkc/testdata/helpers2.h rename to lib/uplinkc/testdata/helpers.h index ca1133ae7..60adc5d05 100644 --- a/lib/uplinkc/testdata/helpers2.h +++ b/lib/uplinkc/testdata/helpers.h @@ -1,6 +1,9 @@ // Copyright (C) 2019 Storj Labs, Inc. // See LICENSE for copying information. +#include +#include + // test_bucket_config returns test bucket configuration. BucketConfig test_bucket_config() { BucketConfig config = {}; @@ -8,7 +11,7 @@ BucketConfig test_bucket_config() { config.path_cipher = 0; config.encryption_parameters.cipher_suite = 1; // TODO: make a named const - config.encryption_parameters.block_size = 4096; + config.encryption_parameters.block_size = 2048; config.redundancy_scheme.algorithm = 1; // TODO: make a named const config.redundancy_scheme.share_size = 1024; @@ -21,7 +24,7 @@ BucketConfig test_bucket_config() { } // with_test_project opens default test project and calls handleProject callback. -void with_test_project(void (*handleProject)(ProjectRef)) { +void with_test_project(void (*handleProject)(ProjectRef), ProjectOptions *project_opts) { char *_err = ""; char **err = &_err; @@ -48,7 +51,7 @@ void with_test_project(void (*handleProject)(ProjectRef)) { { // open a project - ProjectRef project = open_project(uplink, satellite_addr, apikey, err); + ProjectRef project = open_project(uplink, satellite_addr, apikey, project_opts, err); require_noerror(*err); requiref(project._handle != 0, "got empty project\n"); @@ -69,4 +72,20 @@ void with_test_project(void (*handleProject)(ProjectRef)) { } requiref(internal_UniverseIsEmpty(), "universe is not empty\n"); +} + +void fill_random_data(uint8_t *buffer, size_t length) { + for(size_t i = 0; i < length; i++) { + buffer[i] = (uint8_t)i*31; + } +} + +bool array_contains(char *item, char *array[], int array_size) { + for (int i = 0; i < array_size; i++) { + if(strcmp(array[i], item) == 0) { + return true; + } + } + + return false; } \ No newline at end of file diff --git a/lib/uplinkc/testdata/object_test.c b/lib/uplinkc/testdata/object_test.c new file mode 100644 index 000000000..11fde1d1b --- /dev/null +++ b/lib/uplinkc/testdata/object_test.c @@ -0,0 +1,109 @@ +// Copyright (C) 2019 Storj Labs, Inc. +// See LICENSE for copying information. + +#include +#include + +#include "require.h" +#include "uplink.h" +#include "helpers.h" + +void handle_project(ProjectRef project); + +int main(int argc, char *argv[]) { + ProjectOptions opts = {{0}}; + memcpy(&opts.key, "hello", 5); + + with_test_project(&handle_project, &opts); +} + +void handle_project(ProjectRef project) { + char *_err = ""; + char **err = &_err; + + char *bucket_name = "test-bucket"; + EncryptionAccess access = {{0}}; + memcpy(&access.key, "hello", 5); + + char *object_paths[] = {"test-object1","test-object2","test-object3","test-object4"}; + int num_of_objects = 4; + + // NB: about +500 years from time of writing + int64_t future_expiration_timestamp = 17329017831; + + { // create buckets + BucketConfig config = test_bucket_config(); + BucketInfo info = create_bucket(project, bucket_name, &config, err); + require_noerror(*err); + free_bucket_info(&info); + } + + // open bucket + BucketRef bucket = open_bucket(project, bucket_name, access, err); + require_noerror(*err); + + + for(int i = 0; i < num_of_objects; i++) { + size_t data_len = 1024 * (i + 1) * (i + 1); + uint8_t *data = malloc(data_len); + fill_random_data(data, data_len); + + { // upload + UploadOptions opts = { + "text/plain", + future_expiration_timestamp, + }; + + UploaderRef uploader = upload(bucket, object_paths[i], &opts, err); + require_noerror(*err); + + size_t uploaded = 0; + while (uploaded < data_len) { + int to_write_len = (data_len - uploaded > 256) ? 256 : data_len - uploaded; + int write_len = upload_write(uploader, (uint8_t *)data+uploaded, to_write_len, err); + require_noerror(*err); + + if (write_len == 0) { + break; + } + + uploaded += write_len; + } + + upload_commit(uploader, err); + require_noerror(*err); + } + + { // download + DownloaderRef downloader = download(bucket, object_paths[i], err); + require_noerror(*err); + + uint8_t downloadedData[data_len]; + memset(downloadedData, '\0', data_len); + size_t downloadedTotal = 0; + + uint64_t size_to_read = 256 + i; + while (true) { + uint64_t downloadedSize = download_read(downloader, &downloadedData[downloadedTotal], size_to_read, err); + require_noerror(*err); + + if (downloadedSize == EOF) { + break; + } + + downloadedTotal += downloadedSize; + } + + download_close(downloader, err); + require_noerror(*err); + require(memcmp(data, downloadedData, data_len) == 0); + } + + if (data != NULL) { + free(data); + } + } + + close_bucket(bucket, err); + require_noerror(*err); +} \ No newline at end of file diff --git a/lib/uplinkc/testdata/project_test.c b/lib/uplinkc/testdata/project_test.c index 73b2b5d70..08bccf9c9 100644 --- a/lib/uplinkc/testdata/project_test.c +++ b/lib/uplinkc/testdata/project_test.c @@ -6,11 +6,11 @@ #include "require.h" #include "uplink.h" -#include "helpers2.h" +#include "helpers.h" void handle_project(ProjectRef project) {}; int main(int argc, char *argv[]) { - with_test_project(&handle_project); + with_test_project(&handle_project, NULL); } diff --git a/lib/uplinkc/testdata_test.go b/lib/uplinkc/testdata_test.go index c43f3bb1a..bf8be9fc5 100644 --- a/lib/uplinkc/testdata_test.go +++ b/lib/uplinkc/testdata_test.go @@ -35,6 +35,9 @@ func RunPlanet(t *testing.T, run func(ctx *testcontext.Context, planet *testplan planet.Start(ctx) + // make sure nodes are refreshed in db + planet.Satellites[0].Discovery.Service.Refresh.TriggerWait() + run(ctx, planet) } diff --git a/lib/uplinkc/uplink_definitions.h b/lib/uplinkc/uplink_definitions.h index 1cd4166b2..89e06db5f 100644 --- a/lib/uplinkc/uplink_definitions.h +++ b/lib/uplinkc/uplink_definitions.h @@ -3,11 +3,15 @@ #include #include #include +#include +#include -typedef struct APIKey { long _handle; } APIKeyRef; -typedef struct Uplink { long _handle; } UplinkRef; -typedef struct Project { long _handle; } ProjectRef; -typedef struct Bucket { long _handle; } BucketRef; +typedef struct APIKey { long _handle; } APIKeyRef; +typedef struct Uplink { long _handle; } UplinkRef; +typedef struct Project { long _handle; } ProjectRef; +typedef struct Bucket { long _handle; } BucketRef; +typedef struct Downloader { long _handle; } DownloaderRef; +typedef struct Uploader { long _handle; } UploaderRef; typedef struct UplinkConfig { struct { @@ -17,6 +21,10 @@ typedef struct UplinkConfig { } Volatile; } UplinkConfig; +typedef struct ProjectOptions { + char key[32]; +} ProjectOptions; + typedef struct EncryptionParameters { uint8_t cipher_suite; int32_t block_size; @@ -32,19 +40,16 @@ typedef struct RedundancyScheme { } RedundancyScheme; typedef struct BucketInfo { - char *name; - - int64_t created; - uint8_t path_cipher; - uint64_t segment_size; - + char *name; + int64_t created; + uint8_t path_cipher; + uint64_t segment_size; EncryptionParameters encryption_parameters; RedundancyScheme redundancy_scheme; } BucketInfo; typedef struct BucketConfig { - uint8_t path_cipher; - + uint8_t path_cipher; EncryptionParameters encryption_parameters; RedundancyScheme redundancy_scheme; } BucketConfig; @@ -64,3 +69,8 @@ typedef struct BucketList { typedef struct EncryptionAccess { char key[32]; } EncryptionAccess; + +typedef struct UploadOptions { + char *content_type; + int64_t expires; +} UploadOptions;