Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ Changelog for NeoFS Node
- Treat only HALTed main transactions as successfully executed, retry the rest (#3868)
- `different object owner and session issuer` for stored objects (#3929)
- SearchV2 method returning zero result instead of "bad request" error for incorrect numeric filters (#3934)
- Garbage double-counting in metrics (#3933)

### Changed
- `object search` CLI command is now the same as `object searchv2` (#3931)
Expand Down
37 changes: 31 additions & 6 deletions cmd/neofs-lancet/internal/meta/remove.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package meta

import (
"fmt"
"slices"

common "github.com/nspcc-dev/neofs-node/cmd/neofs-lancet/internal"
blobstorcommon "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"github.com/spf13/cobra"
)
Expand Down Expand Up @@ -49,13 +51,36 @@ func removeFunc(cmd *cobra.Command, _ []string) error {
return fmt.Errorf("can't init metabase: %w", err)
}

res, err := db.Delete(addrs)
if err != nil {
return fmt.Errorf("can't remove objects: %w", err)
var (
cnr cid.ID
oids []oid.ID
)

var delGroup = func(cnr cid.ID, oids []oid.ID) error {
res, err := db.Delete(cnr, oids)
if err != nil {
return fmt.Errorf("can't remove objects: %w", err)
}
for _, r := range res.RemovedObjects {
cmd.Println("Removed:", r.ID.String())
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
cmd.Println("Removed:", r.ID.String())
cmd.Println("Removed: ", r.ID.String())

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Println adds a separator by itself.

}
return nil
}

for _, r := range res.RemovedObjects {
cmd.Println("Removed:", r.Address.String())
slices.SortFunc(addrs, oid.Address.Compare)

for i := range addrs {
if cnr != addrs[i].Container() {
if len(oids) != 0 {
err = delGroup(cnr, oids)
if err != nil {
return err
}
}
cnr = addrs[i].Container()
oids = oids[:0]
}
oids = append(oids, addrs[i].Object())
}
return nil
return delGroup(cnr, oids)
}
9 changes: 5 additions & 4 deletions pkg/local_object_storage/engine/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"slices"

"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
)

Expand All @@ -24,8 +25,8 @@ func (e *StorageEngine) Delete(addr oid.Address) error {
if e.blockErr != nil {
return e.blockErr
}
return e.processAddrDelete(addr, func(sh *shard.Shard, addrs []oid.Address) error {
return sh.MarkGarbage(addrs...)
return e.processAddrDelete(addr, func(sh *shard.Shard, cnr cid.ID, addrs []oid.ID) error {
return sh.MarkGarbage(cnr, addrs)
})
}

Expand Down Expand Up @@ -73,8 +74,8 @@ func (e *StorageEngine) DeleteRedundantCopies(addr oid.Address, shardIDs []strin
return nil
}

return e.processAddrDeleteOnShards(deleteShards, addr, func(sh *shard.Shard, addrs []oid.Address) error {
return sh.MarkGarbage(addrs...)
return e.processAddrDeleteOnShards(deleteShards, addr, func(sh *shard.Shard, cnr cid.ID, addrs []oid.ID) error {
return sh.MarkGarbage(cnr, addrs)
})
}

Expand Down
45 changes: 14 additions & 31 deletions pkg/local_object_storage/engine/inhume.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,13 @@ func (e *StorageEngine) InhumeContainer(cID cid.ID) error {
}

// processAddrDelete processes deletion (inhume or immediate delete) of an object by its address.
func (e *StorageEngine) processAddrDelete(addr oid.Address, deleteFunc func(*shard.Shard, []oid.Address) error) error {
func (e *StorageEngine) processAddrDelete(addr oid.Address, deleteFunc func(*shard.Shard, cid.ID, []oid.ID) error) error {
return e.processAddrDeleteOnShards(e.sortedShards(addr.Object()), addr, deleteFunc)
}

func (e *StorageEngine) processAddrDeleteOnShards(shards []shardWrapper, addr oid.Address, deleteFunc func(*shard.Shard, []oid.Address) error) error {
func (e *StorageEngine) processAddrDeleteOnShards(shards []shardWrapper, addr oid.Address, deleteFunc func(*shard.Shard, cid.ID, []oid.ID) error) error {
var (
children []oid.Address
children []oid.ID
err error
root bool
siNoLink *object.SplitInfo
Expand Down Expand Up @@ -126,13 +126,13 @@ func (e *StorageEngine) processAddrDeleteOnShards(shards []shardWrapper, addr oi
break
}

children = measuredObjsToAddresses(addr.Container(), link.Objects())
children = measuredObjsToOIDs(addr.Container(), link.Objects())
} else {
// v1 split
children = oIDsToAddresses(addr.Container(), linkObj.Children())
children = linkObj.Children()
}

children = append(children, linkAddr)
children = append(children, linkID)

break
}
Expand All @@ -141,7 +141,7 @@ func (e *StorageEngine) processAddrDeleteOnShards(shards []shardWrapper, addr oi
continue
}

err = deleteFunc(sh.Shard, []oid.Address{addr})
err = deleteFunc(sh.Shard, addr.Container(), []oid.ID{addr.Object()})
if err != nil {
if !errors.Is(err, logicerr.Error) {
e.reportShardError(sh, "could not inhume object in shard", err, zap.Stringer("addr", addr))
Expand All @@ -159,14 +159,14 @@ func (e *StorageEngine) processAddrDeleteOnShards(shards []shardWrapper, addr oi
}

var (
addrs = append(children, addr)
addrs = append(children, addr.Object())
ok bool
retErr error
)

// has not found the object on any shard, so delete on the most probable one
for _, sh := range shards {
err = deleteFunc(sh.Shard, addrs)
err = deleteFunc(sh.Shard, addr.Container(), addrs)
if err != nil {
var errLocked apistatus.ObjectLocked

Expand All @@ -193,7 +193,7 @@ func (e *StorageEngine) processAddrDeleteOnShards(shards []shardWrapper, addr oi
return retErr
}

func (e *StorageEngine) collectChildrenWithoutLink(addr oid.Address, si *object.SplitInfo) []oid.Address {
func (e *StorageEngine) collectChildrenWithoutLink(addr oid.Address, si *object.SplitInfo) []oid.ID {
e.log.Info("root object has no link object in split upload",
zap.Stringer("addrBeingInhumed", addr))

Expand All @@ -203,7 +203,7 @@ func (e *StorageEngine) collectChildrenWithoutLink(addr oid.Address, si *object.
case !firstID.IsZero():
res, err := e.collectRawWithAttribute(addr.Container(), object.FilterFirstSplitObject, firstID[:])
if err == nil {
res = append(res, oid.NewAddress(addr.Container(), firstID))
res = append(res, firstID)
return res
}
e.log.Warn("failed to collect objects with first ID", zap.Stringer("addrBeingInhumed", addr), zap.Error(err))
Expand Down Expand Up @@ -276,27 +276,10 @@ func (e *StorageEngine) processExpiredObjects(addrs []oid.Address) {
}
}

func measuredObjsToAddresses(cID cid.ID, mm []object.MeasuredObject) []oid.Address {
var addr oid.Address
addr.SetContainer(cID)

res := make([]oid.Address, 0, len(mm))
func measuredObjsToOIDs(cID cid.ID, mm []object.MeasuredObject) []oid.ID {
res := make([]oid.ID, 0, len(mm))
for i := range mm {
addr.SetObject(mm[i].ObjectID())
res = append(res, addr)
}

return res
}

func oIDsToAddresses(cID cid.ID, oo []oid.ID) []oid.Address {
var addr oid.Address
addr.SetContainer(cID)

res := make([]oid.Address, 0, len(oo))
for _, o := range oo {
addr.SetObject(o)
res = append(res, addr)
res = append(res, mm[i].ObjectID())
}

return res
Expand Down
3 changes: 1 addition & 2 deletions pkg/local_object_storage/engine/put.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,9 +221,8 @@ func (e *StorageEngine) broadcastObject(obj *object.Object, objBin []byte) error

if isFatal && len(goodShards) > 0 {
// Revert potential damage.
var addrs = []oid.Address{addr}
for _, sh := range goodShards {
var err = sh.Delete(addrs)
var err = sh.Delete(addr.Container(), []oid.ID{addr.Object()})
if err != nil {
e.log.Warn("failed to rollback incorrect put",
zap.Stringer("shard", sh.ID()),
Expand Down
18 changes: 9 additions & 9 deletions pkg/local_object_storage/engine/select.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (e *StorageEngine) Search(cnr cid.ID, fs []objectcore.SearchFilter, attrs [
return res[:count], c, nil
}

func (e *StorageEngine) collectRawWithAttribute(cnr cid.ID, attr string, val []byte) ([]oid.Address, error) {
func (e *StorageEngine) collectRawWithAttribute(cnr cid.ID, attr string, val []byte) ([]oid.ID, error) {
var (
err error
shards = e.unsortedShards()
Expand All @@ -123,14 +123,14 @@ func (e *StorageEngine) collectRawWithAttribute(cnr cid.ID, attr string, val []b
return nil, fmt.Errorf("shard %s: %w", sh.ID(), err)
}
}
return mergeOIDs(cnr, ids), nil
return mergeOIDs(ids), nil
}

// mergeOIDs merges given set of lists of object IDs into a single flat
// list of addresses in the same given container. ids are expected to be
// sorted and the result contains no duplicates from different original
// list (lists are expected to not contain any inner duplicates).
func mergeOIDs(cnr cid.ID, ids [][]oid.ID) []oid.Address {
// mergeOIDs merges given set of lists of object IDs into a single flat list.
// ids are expected to be sorted and the result contains no duplicates from
// different original lists (slices are expected to not contain any inner
// duplicates).
func mergeOIDs(ids [][]oid.ID) []oid.ID {
var numOfRes int

for i := range ids {
Expand All @@ -143,7 +143,7 @@ func mergeOIDs(cnr cid.ID, ids [][]oid.ID) []oid.Address {

var (
idx = make([]int, len(ids))
res = make([]oid.Address, 0, numOfRes)
res = make([]oid.ID, 0, numOfRes)
)

var haveUnhandledID = func() bool {
Expand Down Expand Up @@ -179,7 +179,7 @@ func mergeOIDs(cnr cid.ID, ids [][]oid.ID) []oid.Address {
}
}
}
res = append(res, oid.NewAddress(cnr, minID))
res = append(res, minID)
idx[minIdx]++
}

Expand Down
70 changes: 24 additions & 46 deletions pkg/local_object_storage/engine/select_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,100 +4,78 @@ import (
"slices"
"testing"

cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"github.com/stretchr/testify/require"
)

func TestMergeOIDs(t *testing.T) {
var cnr = cid.ID{0xff}

var equalAddresses = func(a, b oid.Address) bool {
var equalIDs = func(a, b oid.ID) bool {
return a.Compare(b) == 0
}

t.Run("nil list", func(t *testing.T) {
require.Nil(t, mergeOIDs(cnr, nil))
require.Nil(t, mergeOIDs(nil))
})
t.Run("empty list", func(t *testing.T) {
require.Nil(t, mergeOIDs(cnr, make([][]oid.ID, 0)))
require.Nil(t, mergeOIDs(make([][]oid.ID, 0)))
})
t.Run("empty elements", func(t *testing.T) {
require.Nil(t, mergeOIDs(cnr, make([][]oid.ID, 4)))
require.Nil(t, mergeOIDs(make([][]oid.ID, 4)))
})
t.Run("single list", func(t *testing.T) {
var (
expected []oid.Address
expected []oid.ID
ids = []oid.ID{{1}, {2}, {3}}
)
for _, id := range ids {
expected = append(expected, oid.NewAddress(cnr, id))
}
require.Equal(t, expected, mergeOIDs(cnr, [][]oid.ID{ids}))
expected = slices.Clone(ids)
require.Equal(t, expected, mergeOIDs([][]oid.ID{ids}))
})
t.Run("two lists", func(t *testing.T) {
var (
expected []oid.Address
expected []oid.ID
ids1 = []oid.ID{{1}, {2}, {3}}
ids2 = []oid.ID{{4}, {5}, {6}}
)
for _, id := range ids1 {
expected = append(expected, oid.NewAddress(cnr, id))
}
for _, id := range ids2 {
expected = append(expected, oid.NewAddress(cnr, id))
}
require.Equal(t, expected, mergeOIDs(cnr, [][]oid.ID{ids1, ids2}))
expected = slices.Concat(ids1, ids2)
require.Equal(t, expected, mergeOIDs([][]oid.ID{ids1, ids2}))
})
t.Run("two mixed lists", func(t *testing.T) {
var (
expected []oid.Address
expected []oid.ID
ids1 = []oid.ID{{1}, {3}, {5}}
ids2 = []oid.ID{{2}, {4}, {6}, {7}}
)
for _, id := range ids1 {
expected = append(expected, oid.NewAddress(cnr, id))
}
for _, id := range ids2 {
expected = append(expected, oid.NewAddress(cnr, id))
}
slices.SortFunc(expected, oid.Address.Compare)
require.Equal(t, expected, mergeOIDs(cnr, [][]oid.ID{ids1, ids2}))
expected = slices.Concat(ids1, ids2)
slices.SortFunc(expected, oid.ID.Compare)
require.Equal(t, expected, mergeOIDs([][]oid.ID{ids1, ids2}))
})
t.Run("two lists with dups", func(t *testing.T) {
var (
expected []oid.Address
expected []oid.ID
ids1 = []oid.ID{{1}, {2}, {3}}
ids2 = []oid.ID{{2}, {3}, {4}, {5}}
)
for _, id := range ids1 {
expected = append(expected, oid.NewAddress(cnr, id))
}
for _, id := range ids2 {
expected = append(expected, oid.NewAddress(cnr, id))
}
slices.SortFunc(expected, oid.Address.Compare)
expected = slices.CompactFunc(expected, equalAddresses)
expected = slices.Concat(ids1, ids2)
slices.SortFunc(expected, oid.ID.Compare)
expected = slices.CompactFunc(expected, equalIDs)
require.Len(t, expected, 5) // Ensure sort/compact.
require.Equal(t, expected, mergeOIDs(cnr, [][]oid.ID{ids1, ids2}))
require.Equal(t, expected, mergeOIDs([][]oid.ID{ids1, ids2}))
})
t.Run("four lists with dups", func(t *testing.T) {
var (
expected []oid.Address
expected []oid.ID
ids1 = []oid.ID{{1}, {2}, {3}}
ids2 = []oid.ID{{2}, {3}, {4}, {5}}
ids3 = []oid.ID{{3}, {4}, {5}}
ids4 = []oid.ID{{1}, {4}, {6}, {7}}
idz = [][]oid.ID{ids1, ids2, ids3, ids4}
)
for _, ids := range idz {
for _, id := range ids {
expected = append(expected, oid.NewAddress(cnr, id))
}
expected = append(expected, ids...)
}
slices.SortFunc(expected, oid.Address.Compare)
expected = slices.CompactFunc(expected, equalAddresses)
slices.SortFunc(expected, oid.ID.Compare)
expected = slices.CompactFunc(expected, equalIDs)
require.Len(t, expected, 7) // Ensure sort/compact.
require.Equal(t, expected, mergeOIDs(cnr, idz))
require.Equal(t, expected, mergeOIDs(idz))
})
}
Loading
Loading