Delete pending audit entries from containment DB when files get deleted (#2050)

This commit is contained in:
Fadila 2019-05-24 21:56:08 +02:00 committed by Maximillian von Briesen
parent 2cdc55d345
commit 8f078d2841
2 changed files with 17 additions and 2 deletions

View File

@ -48,12 +48,18 @@ type Revocations interface {
GetByProjectID(ctx context.Context, projectID uuid.UUID) ([][]byte, error)
}
// Containment is a copy/paste of containment interface to avoid import cycle error
type Containment interface {
Delete(ctx context.Context, nodeID pb.NodeID) (bool, error)
}
// Endpoint metainfo endpoint
type Endpoint struct {
log *zap.Logger
metainfo *Service
orders *orders.Service
cache *overlay.Cache
containment Containment
apiKeys APIKeys
storagenodeAccountingDB accounting.StoragenodeAccounting
projectAccountingDB accounting.ProjectAccounting
@ -62,17 +68,17 @@ type Endpoint struct {
}
// NewEndpoint creates new metainfo endpoint instance
func NewEndpoint(log *zap.Logger, metainfo *Service, orders *orders.Service, cache *overlay.Cache,
func NewEndpoint(log *zap.Logger, metainfo *Service, orders *orders.Service, cache *overlay.Cache, containment Containment,
apiKeys APIKeys, sdb accounting.StoragenodeAccounting,
pdb accounting.ProjectAccounting, liveAccounting live.Service,
maxAlphaUsage memory.Size) *Endpoint {
// TODO do something with too many params
return &Endpoint{
log: log,
metainfo: metainfo,
orders: orders,
cache: cache,
containment: containment,
apiKeys: apiKeys,
storagenodeAccountingDB: sdb,
projectAccountingDB: pdb,
@ -419,6 +425,7 @@ func (endpoint *Endpoint) DeleteSegment(ctx context.Context, req *pb.SegmentDele
}
err = endpoint.metainfo.Delete(path)
if err != nil {
return nil, status.Errorf(codes.Internal, err.Error())
}
@ -429,6 +436,13 @@ func (endpoint *Endpoint) DeleteSegment(ctx context.Context, req *pb.SegmentDele
return nil, status.Errorf(codes.Internal, err.Error())
}
for _, piece := range pointer.GetRemote().GetRemotePieces() {
_, err := endpoint.containment.Delete(ctx, piece.NodeId)
if err != nil {
return nil, status.Errorf(codes.Internal, err.Error())
}
}
bucketID := createBucketID(keyInfo.ProjectID, req.Bucket)
limits, err := endpoint.orders.CreateDeleteOrderLimits(ctx, uplinkIdentity, bucketID, pointer)
if err != nil {

View File

@ -357,6 +357,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, config *Config, ve
peer.Metainfo.Service,
peer.Orders.Service,
peer.Overlay.Service,
peer.DB.Containment(),
peer.DB.Console().APIKeys(),
peer.DB.StoragenodeAccounting(),
peer.DB.ProjectAccounting(),