Uplink C bindings part 3 (#2258)

* add object upload and download
This commit is contained in:
Bryan White 2019-06-21 14:24:06 +02:00 committed by Alexander Leitner
parent a5baebfa65
commit 5f47b7028d
11 changed files with 351 additions and 28 deletions

View File

@ -70,6 +70,12 @@ func (ctx *Context) CompileC(t *testing.T, file string, includes ...Include) str
args = append(args, "-I", filepath.Dir(inc.Header))
}
if inc.Library != "" {
if inc.Standard {
args = append(args,
"-l"+inc.Library,
)
continue
}
if runtime.GOOS == "windows" {
args = append(args,
"-L"+filepath.Dir(inc.Library),
@ -99,4 +105,5 @@ func (ctx *Context) CompileC(t *testing.T, file string, includes ...Include) str
type Include struct {
Header string
Library string
Standard bool
}

View File

@ -85,7 +85,7 @@ func open_bucket(projectHandle C.ProjectRef, name *C.char, encryptionAccess C.En
var access uplink.EncryptionAccess
for i := range access.Key {
access.Key[i] = byte(encryptionAccess.key[0])
access.Key[i] = byte(encryptionAccess.key[i])
}
scope := project.scope.child()

164
lib/uplinkc/object.go Normal file
View File

@ -0,0 +1,164 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package main
// #include "uplink_definitions.h"
import "C"
import (
"io"
"time"
"unsafe"
"storj.io/storj/lib/uplink"
)
type Upload struct {
scope
wc io.WriteCloser // 🚽
}
// upload uploads a new object, if authorized.
//export upload
func upload(cBucket C.BucketRef, path *C.char, cOpts *C.UploadOptions, cErr **C.char) (downloader C.UploaderRef) {
bucket, ok := universe.Get(cBucket._handle).(*Bucket)
if !ok {
*cErr = C.CString("invalid bucket")
return
}
scope := bucket.scope.child()
var opts *uplink.UploadOptions
if cOpts != nil {
var metadata map[string]string
opts = &uplink.UploadOptions{
ContentType: C.GoString(cOpts.content_type),
Metadata: metadata,
Expires: time.Unix(int64(cOpts.expires), 0),
}
}
writeCloser, err := bucket.NewWriter(scope.ctx, C.GoString(path), opts)
if err != nil {
*cErr = C.CString(err.Error())
return
}
return C.UploaderRef{universe.Add(&Upload{
scope: scope,
wc: writeCloser,
})}
}
//export upload_write
func upload_write(uploader C.UploaderRef, bytes *C.uint8_t, length C.int, cErr **C.char) (writeLength C.int) {
upload, ok := universe.Get(uploader._handle).(*Upload)
if !ok {
*cErr = C.CString("invalid uploader")
return C.int(0)
}
buf := (*[1 << 30]byte)(unsafe.Pointer(bytes))[:length]
n, err := upload.wc.Write(buf)
if err == io.EOF {
return C.EOF
}
return C.int(n)
}
//export upload_commit
func upload_commit(uploader C.UploaderRef, cErr **C.char) {
upload, ok := universe.Get(uploader._handle).(*Upload)
if !ok {
*cErr = C.CString("invalid uploader")
return
}
universe.Del(uploader._handle)
defer upload.cancel()
err := upload.wc.Close()
if err != nil {
*cErr = C.CString(err.Error())
return
}
}
type Download struct {
scope
rc interface {
io.Reader
io.Seeker
io.Closer
}
}
// download returns an Object's data. A length of -1 will mean
// (Object.Size - offset).
//export download
func download(bucketRef C.BucketRef, path *C.char, cErr **C.char) (downloader C.DownloaderRef) {
bucket, ok := universe.Get(bucketRef._handle).(*Bucket)
if !ok {
*cErr = C.CString("invalid bucket")
return
}
scope := bucket.scope.child()
rc, err := bucket.NewReader(scope.ctx, C.GoString(path))
if err != nil {
*cErr = C.CString(err.Error())
return
}
return C.DownloaderRef{universe.Add(&Download{
scope: scope,
rc: rc,
})}
}
//export download_read
func download_read(downloader C.DownloaderRef, bytes *C.uint8_t, length C.int, cErr **C.char) (readLength C.int) {
download, ok := universe.Get(downloader._handle).(*Download)
if !ok {
*cErr = C.CString("invalid downloader")
return C.int(0)
}
buf := (*[1 << 30]byte)(unsafe.Pointer(bytes))[:length]
n, err := download.rc.Read(buf)
if err == io.EOF {
return C.EOF
}
return C.int(n)
}
//export download_close
func download_close(downloader C.DownloaderRef, cErr **C.char) {
download, ok := universe.Get(downloader._handle).(*Download)
if !ok {
*cErr = C.CString("invalid downloader")
}
universe.Del(downloader._handle)
defer download.cancel()
err := download.rc.Close()
if err != nil {
*cErr = C.CString(err.Error())
return
}
}
//export free_upload_opts
func free_upload_opts(uploadOpts *C.UploadOptions) {
C.free(unsafe.Pointer(uploadOpts.content_type))
uploadOpts.content_type = nil
}

View File

@ -5,9 +5,11 @@ package main
// #include "uplink_definitions.h"
import "C"
import (
"unsafe"
libuplink "storj.io/storj/lib/uplink"
"storj.io/storj/pkg/storj"
)
// Project is a scoped uplink.Project
@ -18,7 +20,7 @@ type Project struct {
//export open_project
// open_project opens project using uplink
func open_project(uplinkHandle C.UplinkRef, satelliteAddr *C.char, apikeyHandle C.APIKeyRef, cerr **C.char) C.ProjectRef {
func open_project(uplinkHandle C.UplinkRef, satelliteAddr *C.char, apikeyHandle C.APIKeyRef, cProjectOpts *C.ProjectOptions, cerr **C.char) C.ProjectRef {
uplink, ok := universe.Get(uplinkHandle._handle).(*Uplink)
if !ok {
*cerr = C.CString("invalid uplink")
@ -33,8 +35,16 @@ func open_project(uplinkHandle C.UplinkRef, satelliteAddr *C.char, apikeyHandle
scope := uplink.scope.child()
// TODO: add project options argument
project, err := uplink.OpenProject(scope.ctx, C.GoString(satelliteAddr), apikey, nil)
var opts *libuplink.ProjectOptions
if unsafe.Pointer(cProjectOpts) != nil {
opts = &libuplink.ProjectOptions{}
opts.Volatile.EncryptionKey = new(storj.Key)
for i := range cProjectOpts.key {
opts.Volatile.EncryptionKey[i] = byte(cProjectOpts.key[i])
}
}
project, err := uplink.OpenProject(scope.ctx, C.GoString(satelliteAddr), apikey, opts)
if err != nil {
*cerr = C.CString(err.Error())
return C.ProjectRef{}

View File

@ -31,6 +31,7 @@ int main(int argc, char *argv[])
char *apikeySerialized = serialize_api_key(apikey, err);
require_noerror(*err);
requiref(strcmp(apikeySerialized, apikeyStr) == 0,
"got invalid serialized %s expected %s\n", apikeySerialized, apikeyStr);
free(apikeySerialized);

View File

@ -6,23 +6,23 @@
#include "require.h"
#include "uplink.h"
#include "helpers2.h"
#include "helpers.h"
void handle_project(ProjectRef project);
int main(int argc, char *argv[]) {
with_test_project(&handle_project);
with_test_project(&handle_project, NULL);
}
void handle_project(ProjectRef project) {
char *_err = "";
char **err = &_err;
char *bucket_names[] = {"TestBucket1", "TestBucket2", "TestBucket3", "TestBucket4"};
char *bucket_names[] = {"test-bucket1", "test-bucket2", "test-bucket3", "test-bucket4"};
int num_of_buckets = sizeof(bucket_names) / sizeof(bucket_names[0]);
// TODO: test with different bucket configs
{// Create buckets
{ // Create buckets
for (int i=0; i < num_of_buckets; i++) {
char *bucket_name = bucket_names[i];

View File

@ -1,6 +1,9 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
#include <stdlib.h>
#include <time.h>
// test_bucket_config returns test bucket configuration.
BucketConfig test_bucket_config() {
BucketConfig config = {};
@ -8,7 +11,7 @@ BucketConfig test_bucket_config() {
config.path_cipher = 0;
config.encryption_parameters.cipher_suite = 1; // TODO: make a named const
config.encryption_parameters.block_size = 4096;
config.encryption_parameters.block_size = 2048;
config.redundancy_scheme.algorithm = 1; // TODO: make a named const
config.redundancy_scheme.share_size = 1024;
@ -21,7 +24,7 @@ BucketConfig test_bucket_config() {
}
// with_test_project opens default test project and calls handleProject callback.
void with_test_project(void (*handleProject)(ProjectRef)) {
void with_test_project(void (*handleProject)(ProjectRef), ProjectOptions *project_opts) {
char *_err = "";
char **err = &_err;
@ -48,7 +51,7 @@ void with_test_project(void (*handleProject)(ProjectRef)) {
{
// open a project
ProjectRef project = open_project(uplink, satellite_addr, apikey, err);
ProjectRef project = open_project(uplink, satellite_addr, apikey, project_opts, err);
require_noerror(*err);
requiref(project._handle != 0, "got empty project\n");
@ -70,3 +73,19 @@ void with_test_project(void (*handleProject)(ProjectRef)) {
requiref(internal_UniverseIsEmpty(), "universe is not empty\n");
}
void fill_random_data(uint8_t *buffer, size_t length) {
for(size_t i = 0; i < length; i++) {
buffer[i] = (uint8_t)i*31;
}
}
bool array_contains(char *item, char *array[], int array_size) {
for (int i = 0; i < array_size; i++) {
if(strcmp(array[i], item) == 0) {
return true;
}
}
return false;
}

109
lib/uplinkc/testdata/object_test.c vendored Normal file
View File

@ -0,0 +1,109 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
#include <string.h>
#include <stdlib.h>
#include "require.h"
#include "uplink.h"
#include "helpers.h"
void handle_project(ProjectRef project);
int main(int argc, char *argv[]) {
ProjectOptions opts = {{0}};
memcpy(&opts.key, "hello", 5);
with_test_project(&handle_project, &opts);
}
void handle_project(ProjectRef project) {
char *_err = "";
char **err = &_err;
char *bucket_name = "test-bucket";
EncryptionAccess access = {{0}};
memcpy(&access.key, "hello", 5);
char *object_paths[] = {"test-object1","test-object2","test-object3","test-object4"};
int num_of_objects = 4;
// NB: about +500 years from time of writing
int64_t future_expiration_timestamp = 17329017831;
{ // create buckets
BucketConfig config = test_bucket_config();
BucketInfo info = create_bucket(project, bucket_name, &config, err);
require_noerror(*err);
free_bucket_info(&info);
}
// open bucket
BucketRef bucket = open_bucket(project, bucket_name, access, err);
require_noerror(*err);
for(int i = 0; i < num_of_objects; i++) {
size_t data_len = 1024 * (i + 1) * (i + 1);
uint8_t *data = malloc(data_len);
fill_random_data(data, data_len);
{ // upload
UploadOptions opts = {
"text/plain",
future_expiration_timestamp,
};
UploaderRef uploader = upload(bucket, object_paths[i], &opts, err);
require_noerror(*err);
size_t uploaded = 0;
while (uploaded < data_len) {
int to_write_len = (data_len - uploaded > 256) ? 256 : data_len - uploaded;
int write_len = upload_write(uploader, (uint8_t *)data+uploaded, to_write_len, err);
require_noerror(*err);
if (write_len == 0) {
break;
}
uploaded += write_len;
}
upload_commit(uploader, err);
require_noerror(*err);
}
{ // download
DownloaderRef downloader = download(bucket, object_paths[i], err);
require_noerror(*err);
uint8_t downloadedData[data_len];
memset(downloadedData, '\0', data_len);
size_t downloadedTotal = 0;
uint64_t size_to_read = 256 + i;
while (true) {
uint64_t downloadedSize = download_read(downloader, &downloadedData[downloadedTotal], size_to_read, err);
require_noerror(*err);
if (downloadedSize == EOF) {
break;
}
downloadedTotal += downloadedSize;
}
download_close(downloader, err);
require_noerror(*err);
require(memcmp(data, downloadedData, data_len) == 0);
}
if (data != NULL) {
free(data);
}
}
close_bucket(bucket, err);
require_noerror(*err);
}

View File

@ -6,11 +6,11 @@
#include "require.h"
#include "uplink.h"
#include "helpers2.h"
#include "helpers.h"
void handle_project(ProjectRef project)
{};
int main(int argc, char *argv[]) {
with_test_project(&handle_project);
with_test_project(&handle_project, NULL);
}

View File

@ -35,6 +35,9 @@ func RunPlanet(t *testing.T, run func(ctx *testcontext.Context, planet *testplan
planet.Start(ctx)
// make sure nodes are refreshed in db
planet.Satellites[0].Discovery.Service.Refresh.TriggerWait()
run(ctx, planet)
}

View File

@ -3,11 +3,15 @@
#include <stdint.h>
#include <stdbool.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
typedef struct APIKey { long _handle; } APIKeyRef;
typedef struct Uplink { long _handle; } UplinkRef;
typedef struct Project { long _handle; } ProjectRef;
typedef struct Bucket { long _handle; } BucketRef;
typedef struct Downloader { long _handle; } DownloaderRef;
typedef struct Uploader { long _handle; } UploaderRef;
typedef struct UplinkConfig {
struct {
@ -17,6 +21,10 @@ typedef struct UplinkConfig {
} Volatile;
} UplinkConfig;
typedef struct ProjectOptions {
char key[32];
} ProjectOptions;
typedef struct EncryptionParameters {
uint8_t cipher_suite;
int32_t block_size;
@ -33,18 +41,15 @@ typedef struct RedundancyScheme {
typedef struct BucketInfo {
char *name;
int64_t created;
uint8_t path_cipher;
uint64_t segment_size;
EncryptionParameters encryption_parameters;
RedundancyScheme redundancy_scheme;
} BucketInfo;
typedef struct BucketConfig {
uint8_t path_cipher;
EncryptionParameters encryption_parameters;
RedundancyScheme redundancy_scheme;
} BucketConfig;
@ -64,3 +69,8 @@ typedef struct BucketList {
typedef struct EncryptionAccess {
char key[32];
} EncryptionAccess;
typedef struct UploadOptions {
char *content_type;
int64_t expires;
} UploadOptions;