uplink/satellite: fix for case when inline segment is last one (#3062)
* uplink/satellite: fix when inline seg is last one * review comments
This commit is contained in:
parent
ccdd435610
commit
1c72e80e40
@ -232,11 +232,16 @@ waitformore:
|
||||
// handlePointer deals with a pointer for a single observer
|
||||
// if there is some error on the observer, handle the error and return false. Otherwise, return true
|
||||
func handlePointer(ctx context.Context, observer *observerContext, path ScopedPath, isLastSegment bool, pointer *pb.Pointer) bool {
|
||||
if pointer.GetRemote() != nil {
|
||||
switch pointer.GetType() {
|
||||
case pb.Pointer_REMOTE:
|
||||
if observer.HandleError(observer.RemoteSegment(ctx, path, pointer)) {
|
||||
return false
|
||||
}
|
||||
} else if observer.HandleError(observer.InlineSegment(ctx, path, pointer)) {
|
||||
case pb.Pointer_INLINE:
|
||||
if observer.HandleError(observer.InlineSegment(ctx, path, pointer)) {
|
||||
return false
|
||||
}
|
||||
default:
|
||||
return false
|
||||
}
|
||||
if isLastSegment {
|
||||
|
@ -1082,6 +1082,11 @@ func (endpoint *Endpoint) CommitObject(ctx context.Context, req *pb.ObjectCommit
|
||||
return nil, status.Errorf(codes.NotFound, "unable to find object: %q/%q", streamID.Bucket, streamID.EncryptedPath)
|
||||
}
|
||||
|
||||
if lastSegmentPointer.Remote == nil {
|
||||
lastSegmentPointer.Remote = &pb.RemoteSegment{}
|
||||
}
|
||||
// RS is set always for last segment to emulate RS per object
|
||||
lastSegmentPointer.Remote.Redundancy = streamID.Redundancy
|
||||
lastSegmentPointer.Metadata = req.EncryptedMetadata
|
||||
|
||||
err = endpoint.metainfo.Delete(ctx, lastSegmentPath)
|
||||
@ -1158,6 +1163,34 @@ func (endpoint *Endpoint) GetObject(ctx context.Context, req *pb.ObjectGetReques
|
||||
|
||||
if pointer.Remote != nil {
|
||||
object.RedundancyScheme = pointer.Remote.Redundancy
|
||||
|
||||
// NumberOfSegments == 0 - pointer with encrypted num of segments
|
||||
// NumberOfSegments > 1 - pointer with unencrypted num of segments and multiple segments
|
||||
} else if streamMeta.NumberOfSegments == 0 || streamMeta.NumberOfSegments > 1 {
|
||||
// workaround
|
||||
// The new metainfo API redundancy scheme is on object level (not per segment).
|
||||
// Because of that, RS is always taken from the last segment.
|
||||
// The old implementation saves RS per segment, and in some cases
|
||||
// when the remote file's last segment is an inline segment, we end up
|
||||
// missing an RS scheme. This loop will search for RS in segments other than the last one.
|
||||
|
||||
index := int64(0)
|
||||
for {
|
||||
pointer, _, err = endpoint.getPointer(ctx, keyInfo.ProjectID, index, req.Bucket, req.EncryptedPath)
|
||||
if err != nil {
|
||||
if storage.ErrKeyNotFound.Has(err) {
|
||||
break
|
||||
}
|
||||
|
||||
endpoint.log.Error("unable to get pointer", zap.Error(err))
|
||||
return nil, status.Error(codes.Internal, "unable to get object")
|
||||
}
|
||||
if pointer.Remote != nil {
|
||||
object.RedundancyScheme = pointer.Remote.Redundancy
|
||||
break
|
||||
}
|
||||
index++
|
||||
}
|
||||
}
|
||||
|
||||
return &pb.ObjectGetResponse{
|
||||
@ -1541,12 +1574,18 @@ func (endpoint *Endpoint) MakeInlineSegment(ctx context.Context, req *pb.Segment
|
||||
// that will be affected is our per-project bandwidth and storage limits.
|
||||
}
|
||||
|
||||
metadata, err := proto.Marshal(&pb.SegmentMeta{
|
||||
EncryptedKey: req.EncryptedKey,
|
||||
KeyNonce: req.EncryptedKeyNonce.Bytes(),
|
||||
})
|
||||
|
||||
pointer := &pb.Pointer{
|
||||
Type: pb.Pointer_INLINE,
|
||||
SegmentSize: inlineUsed,
|
||||
CreationDate: streamID.CreationDate,
|
||||
ExpirationDate: streamID.ExpirationDate,
|
||||
InlineSegment: req.EncryptedInlineData,
|
||||
Metadata: metadata,
|
||||
}
|
||||
|
||||
err = endpoint.metainfo.Put(ctx, path, pointer)
|
||||
@ -1840,6 +1879,7 @@ func (endpoint *Endpoint) DownloadSegment(ctx context.Context, req *pb.SegmentDo
|
||||
}
|
||||
segmentMeta = streamMeta.LastSegmentMeta
|
||||
} else {
|
||||
segmentMeta = &pb.SegmentMeta{}
|
||||
err = proto.Unmarshal(pointer.Metadata, segmentMeta)
|
||||
if err != nil {
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
|
@ -145,7 +145,7 @@ func (checker *Checker) updateIrreparableSegmentStatus(ctx context.Context, poin
|
||||
// TODO figure out how to reduce duplicate code between here and checkerObs.RemoteSegment
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
remote := pointer.GetRemote()
|
||||
if remote == nil {
|
||||
if pointer.GetType() == pb.Pointer_INLINE || remote == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -96,6 +96,7 @@ func TestIdentifyIrreparableSegments(t *testing.T) {
|
||||
// when number of healthy piece is less than minimum required number of piece in redundancy,
|
||||
// the piece is considered irreparable and will be put into irreparable DB
|
||||
pointer := &pb.Pointer{
|
||||
Type: pb.Pointer_REMOTE,
|
||||
CreationDate: time.Now(),
|
||||
Remote: &pb.RemoteSegment{
|
||||
Redundancy: &pb.RedundancyScheme{
|
||||
@ -146,6 +147,7 @@ func TestIdentifyIrreparableSegments(t *testing.T) {
|
||||
|
||||
// make the pointer repairable
|
||||
pointer = &pb.Pointer{
|
||||
Type: pb.Pointer_REMOTE,
|
||||
CreationDate: time.Now(),
|
||||
Remote: &pb.RemoteSegment{
|
||||
Redundancy: &pb.RedundancyScheme{
|
||||
@ -197,6 +199,7 @@ func makePointer(t *testing.T, planet *testplanet.Planet, pointerPath string, cr
|
||||
minReq, repairThreshold = numOfStorageNodes-1, numOfStorageNodes+1
|
||||
}
|
||||
pointer := &pb.Pointer{
|
||||
Type: pb.Pointer_REMOTE,
|
||||
CreationDate: time.Now(),
|
||||
Remote: &pb.RemoteSegment{
|
||||
Redundancy: &pb.RedundancyScheme{
|
||||
|
@ -13,10 +13,11 @@ setup(){
|
||||
random_bytes_file () {
|
||||
size=$1
|
||||
output=$2
|
||||
dd if=/dev/urandom of="$output" count=1 bs="$size" >/dev/null 2>&1
|
||||
head -c $size </dev/urandom > $output
|
||||
}
|
||||
random_bytes_file 2x1024 "$TEST_FILES_DIR/small-upload-testfile" # create 2kb file of random bytes (inline)
|
||||
random_bytes_file 5x1024x1024 "$TEST_FILES_DIR/big-upload-testfile" # create 5mb file of random bytes (remote)
|
||||
random_bytes_file "2K" "$TEST_FILES_DIR/small-upload-testfile" # create 2kb file of random bytes (inline)
|
||||
random_bytes_file "5M" "$TEST_FILES_DIR/big-upload-testfile" # create 5mb file of random bytes (remote)
|
||||
random_bytes_file "128M" "$TEST_FILES_DIR/multisegment-upload-testfile" # create 128mb file of random bytes (remote)
|
||||
|
||||
echo "setup test successfully"
|
||||
}
|
||||
@ -26,11 +27,13 @@ if [[ "$1" == "upload" ]]; then
|
||||
|
||||
uplink --config-dir "$GATEWAY_0_DIR" mb "sj://$BUCKET/"
|
||||
|
||||
uplink --config-dir "$GATEWAY_0_DIR" cp "$TEST_FILES_DIR/small-upload-testfile" "sj://$BUCKET/"
|
||||
uplink --config-dir "$GATEWAY_0_DIR" cp "$TEST_FILES_DIR/big-upload-testfile" "sj://$BUCKET/"
|
||||
uplink --config-dir "$GATEWAY_0_DIR" cp --progress=false "$TEST_FILES_DIR/small-upload-testfile" "sj://$BUCKET/"
|
||||
uplink --config-dir "$GATEWAY_0_DIR" cp --progress=false "$TEST_FILES_DIR/big-upload-testfile" "sj://$BUCKET/"
|
||||
uplink --config-dir "$GATEWAY_0_DIR" cp --progress=false "$TEST_FILES_DIR/multisegment-upload-testfile" "sj://$BUCKET/"
|
||||
|
||||
uplink --config-dir "$GATEWAY_0_DIR" cp "sj://$BUCKET/small-upload-testfile" "$RELEASE_DST_DIR"
|
||||
uplink --config-dir "$GATEWAY_0_DIR" cp "sj://$BUCKET/big-upload-testfile" "$RELEASE_DST_DIR"
|
||||
uplink --config-dir "$GATEWAY_0_DIR" cp --progress=false "sj://$BUCKET/small-upload-testfile" "$RELEASE_DST_DIR"
|
||||
uplink --config-dir "$GATEWAY_0_DIR" cp --progress=false "sj://$BUCKET/big-upload-testfile" "$RELEASE_DST_DIR"
|
||||
uplink --config-dir "$GATEWAY_0_DIR" cp --progress=false "sj://$BUCKET/multisegment-upload-testfile" "$RELEASE_DST_DIR"
|
||||
|
||||
if cmp "$TEST_FILES_DIR/small-upload-testfile" "$RELEASE_DST_DIR/small-upload-testfile"
|
||||
then
|
||||
@ -47,11 +50,24 @@ if [[ "$1" == "upload" ]]; then
|
||||
echo "upload test on release tag: big upload testfile does not match uploaded file"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
if cmp "$TEST_FILES_DIR/multisegment-upload-testfile" "$RELEASE_DST_DIR/multisegment-upload-testfile"
|
||||
then
|
||||
echo "upload test on release tag: multisegment upload testfile matches uploaded file"
|
||||
else
|
||||
echo "upload test on release tag: multisegment upload testfile does not match uploaded file"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
rm "$RELEASE_DST_DIR/small-upload-testfile"
|
||||
rm "$RELEASE_DST_DIR/big-upload-testfile"
|
||||
rm "$RELEASE_DST_DIR/multisegment-upload-testfile"
|
||||
fi
|
||||
|
||||
if [[ "$1" == "download" ]]; then
|
||||
uplink --config-dir "$GATEWAY_0_DIR" cp "sj://$BUCKET/small-upload-testfile" "$BRANCH_DST_DIR"
|
||||
uplink --config-dir "$GATEWAY_0_DIR" cp "sj://$BUCKET/big-upload-testfile" "$BRANCH_DST_DIR"
|
||||
uplink --config-dir "$GATEWAY_0_DIR" cp --progress=false "sj://$BUCKET/small-upload-testfile" "$BRANCH_DST_DIR"
|
||||
uplink --config-dir "$GATEWAY_0_DIR" cp --progress=false "sj://$BUCKET/big-upload-testfile" "$BRANCH_DST_DIR"
|
||||
uplink --config-dir "$GATEWAY_0_DIR" cp --progress=false "sj://$BUCKET/multisegment-upload-testfile" "$BRANCH_DST_DIR"
|
||||
|
||||
if cmp "$TEST_FILES_DIR/small-upload-testfile" "$BRANCH_DST_DIR/small-upload-testfile"
|
||||
then
|
||||
@ -68,10 +84,23 @@ if [[ "$1" == "download" ]]; then
|
||||
echo "download test on current branch: big upload testfile does not match uploaded file"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
if cmp "$TEST_FILES_DIR/multisegment-upload-testfile" "$BRANCH_DST_DIR/multisegment-upload-testfile"
|
||||
then
|
||||
echo "download test on current branch: multisegment upload testfile matches uploaded file"
|
||||
else
|
||||
echo "download test on current branch: multisegment upload testfile does not match uploaded file"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
rm "$BRANCH_DST_DIR/small-upload-testfile"
|
||||
rm "$BRANCH_DST_DIR/big-upload-testfile"
|
||||
rm "$BRANCH_DST_DIR/multisegment-upload-testfile"
|
||||
fi
|
||||
|
||||
if [[ "$1" == "cleanup" ]]; then
|
||||
uplink --config-dir "$GATEWAY_0_DIR" rm "sj://$BUCKET/small-upload-testfile"
|
||||
uplink --config-dir "$GATEWAY_0_DIR" rm "sj://$BUCKET/big-upload-testfile"
|
||||
uplink --config-dir "$GATEWAY_0_DIR" rm "sj://$BUCKET/multisegment-upload-testfile"
|
||||
uplink --config-dir "$GATEWAY_0_DIR" rb "sj://$BUCKET"
|
||||
fi
|
||||
|
@ -19,29 +19,33 @@ mkdir -p "$SRC_DIR" "$DST_DIR"
|
||||
random_bytes_file () {
|
||||
size=$1
|
||||
output=$2
|
||||
dd if=/dev/urandom of="$output" count=1 bs="$size" >/dev/null 2>&1
|
||||
head -c $size </dev/urandom > $output
|
||||
}
|
||||
|
||||
random_bytes_file 2x1024 "$SRC_DIR/small-upload-testfile" # create 2kb file of random bytes (inline)
|
||||
random_bytes_file 5x1024x1024 "$SRC_DIR/big-upload-testfile" # create 5mb file of random bytes (remote)
|
||||
random_bytes_file 5x1024x1024 "$SRC_DIR/multisegment-upload-testfile" # create 5mb file of random bytes (remote)
|
||||
random_bytes_file "2KiB" "$SRC_DIR/small-upload-testfile" # create 2kb file of random bytes (inline)
|
||||
random_bytes_file "5MiB" "$SRC_DIR/big-upload-testfile" # create 5mb file of random bytes (remote)
|
||||
random_bytes_file "128MiB" "$SRC_DIR/multisegment-upload-testfile" # create 128mb file of random bytes (remote)
|
||||
random_bytes_file "73MiB" "$SRC_DIR/diff-size-segments" # create 73mb file of random bytes (remote)
|
||||
|
||||
UPLINK_DEBUG_ADDR=""
|
||||
|
||||
uplink --config-dir "$GATEWAY_0_DIR" --debug.addr "$UPLINK_DEBUG_ADDR" mb "sj://$BUCKET/"
|
||||
|
||||
uplink --config-dir "$GATEWAY_0_DIR" --debug.addr "$UPLINK_DEBUG_ADDR" cp "$SRC_DIR/small-upload-testfile" "sj://$BUCKET/"
|
||||
uplink --config-dir "$GATEWAY_0_DIR" --debug.addr "$UPLINK_DEBUG_ADDR" cp "$SRC_DIR/big-upload-testfile" "sj://$BUCKET/"
|
||||
uplink --config-dir "$GATEWAY_0_DIR" --debug.addr "$UPLINK_DEBUG_ADDR" --client.segment-size "1MiB" cp "$SRC_DIR/multisegment-upload-testfile" "sj://$BUCKET/"
|
||||
uplink --config-dir "$GATEWAY_0_DIR" --progress=false --debug.addr "$UPLINK_DEBUG_ADDR" cp "$SRC_DIR/small-upload-testfile" "sj://$BUCKET/"
|
||||
uplink --config-dir "$GATEWAY_0_DIR" --progress=false --debug.addr "$UPLINK_DEBUG_ADDR" cp "$SRC_DIR/big-upload-testfile" "sj://$BUCKET/"
|
||||
uplink --config-dir "$GATEWAY_0_DIR" --progress=false --debug.addr "$UPLINK_DEBUG_ADDR" cp "$SRC_DIR/multisegment-upload-testfile" "sj://$BUCKET/"
|
||||
uplink --config-dir "$GATEWAY_0_DIR" --progress=false --debug.addr "$UPLINK_DEBUG_ADDR" cp "$SRC_DIR/diff-size-segments" "sj://$BUCKET/"
|
||||
|
||||
uplink --config-dir "$GATEWAY_0_DIR" --debug.addr "$UPLINK_DEBUG_ADDR" cp "sj://$BUCKET/small-upload-testfile" "$DST_DIR"
|
||||
uplink --config-dir "$GATEWAY_0_DIR" --debug.addr "$UPLINK_DEBUG_ADDR" cp "sj://$BUCKET/big-upload-testfile" "$DST_DIR"
|
||||
uplink --config-dir "$GATEWAY_0_DIR" --debug.addr "$UPLINK_DEBUG_ADDR" cp "sj://$BUCKET/multisegment-upload-testfile" "$DST_DIR"
|
||||
uplink --config-dir "$GATEWAY_0_DIR" --progress=false --debug.addr "$UPLINK_DEBUG_ADDR" cp "sj://$BUCKET/small-upload-testfile" "$DST_DIR"
|
||||
uplink --config-dir "$GATEWAY_0_DIR" --progress=false --debug.addr "$UPLINK_DEBUG_ADDR" cp "sj://$BUCKET/big-upload-testfile" "$DST_DIR"
|
||||
uplink --config-dir "$GATEWAY_0_DIR" --progress=false --debug.addr "$UPLINK_DEBUG_ADDR" cp "sj://$BUCKET/multisegment-upload-testfile" "$DST_DIR"
|
||||
uplink --config-dir "$GATEWAY_0_DIR" --progress=false --debug.addr "$UPLINK_DEBUG_ADDR" cp "sj://$BUCKET/diff-size-segments" "$DST_DIR"
|
||||
|
||||
|
||||
uplink --config-dir "$GATEWAY_0_DIR" --debug.addr "$UPLINK_DEBUG_ADDR" rm "sj://$BUCKET/small-upload-testfile"
|
||||
uplink --config-dir "$GATEWAY_0_DIR" --debug.addr "$UPLINK_DEBUG_ADDR" rm "sj://$BUCKET/big-upload-testfile"
|
||||
uplink --config-dir "$GATEWAY_0_DIR" --debug.addr "$UPLINK_DEBUG_ADDR" rm "sj://$BUCKET/multisegment-upload-testfile"
|
||||
uplink --config-dir "$GATEWAY_0_DIR" --debug.addr "$UPLINK_DEBUG_ADDR" rm "sj://$BUCKET/diff-size-segments"
|
||||
|
||||
uplink --config-dir "$GATEWAY_0_DIR" --debug.addr "$UPLINK_DEBUG_ADDR" ls "sj://$BUCKET"
|
||||
|
||||
@ -71,6 +75,15 @@ else
|
||||
exit 1
|
||||
fi
|
||||
|
||||
if cmp "$SRC_DIR/diff-size-segments" "$DST_DIR/diff-size-segments"
|
||||
then
|
||||
echo "diff-size-segments testfile matches uploaded file"
|
||||
else
|
||||
echo "diff-size-segments testfile does not match uploaded file"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
|
||||
# check if all data files were removed
|
||||
# FILES=$(find "$STORAGENODE_0_DIR/../" -type f -path "*/blob/*" ! -name "info.*")
|
||||
# if [ -z "$FILES" ];
|
||||
|
@ -161,7 +161,8 @@ func (s *segmentStore) Get(ctx context.Context, streamID storj.StreamID, segment
|
||||
}
|
||||
|
||||
switch {
|
||||
case len(info.EncryptedInlineData) != 0:
|
||||
// no order limits also means its inline segment
|
||||
case len(info.EncryptedInlineData) != 0 || len(limits) == 0:
|
||||
return ranger.ByteRanger(info.EncryptedInlineData), info.SegmentEncryption, nil
|
||||
default:
|
||||
needed := CalcNeededNodes(objectRS)
|
||||
|
Loading…
Reference in New Issue
Block a user