Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ Changelog for NeoFS Node
- `object search` CLI command is now the same as `object searchv2` (#3931)
- Optimized integer comparison in SearchV2 (#3938)
- Optimized number of nodes for SEARCH in EC containers (#3940)
- Optimized post-initial placement replication (#3923)

### Removed
- `policer.max_workers` configuration (#3920)
Expand Down
29 changes: 28 additions & 1 deletion cmd/neofs-node/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,32 @@ type objectSvc struct {
delete *deletesvc.Service
}

type putPostInitialPlacementReplicator struct {
log *zap.Logger
replicator *replicator.Replicator
}

func (r putPostInitialPlacementReplicator) HandlePostPlacement(obj *object.Object, nodes []netmapsdk.NodeInfo) {
copies := uint32(len(nodes))
if pi, err := iec.GetPartInfo(*obj); err == nil && pi.Index >= 0 {
copies = 1
}

var task replicator.Task
task.SetObjectAddress(obj.Address())
task.SetObject(obj)
task.SetNodes(nodes)
task.SetCopiesNumber(copies)

if err := r.replicator.EnqueueTask(task); err != nil {
r.log.Warn("could not enqueue post-placement replication",
zap.Stringer("object", obj.Address()),
zap.Uint32("expected", copies),
zap.Error(err),
)
}
}

func (c *cfg) MaxObjectSize() uint64 {
sz, err := c.nCli.MaxObjectSize()
if err != nil {
Expand Down Expand Up @@ -212,7 +238,7 @@ func initObjectService(c *cfg) {
policer.WithBoostMultiplier(c.appCfg.Policer.BoostMultiplier),
)

c.workers = append(c.workers, c.policer)
c.workers = append(c.workers, c.policer, c.replicator)

nnsResolver := nns.NewResolver(c.cli)

Expand Down Expand Up @@ -246,6 +272,7 @@ func initObjectService(c *cfg) {
putsvc.WithContainerSource(c.cnrSrc),
putsvc.WithNetworkState(c.cfgNetmap.state),
putsvc.WithRemoteWorkerPool(c.cfgObject.pool.putRemote),
putsvc.WithPostPlacementReplicator(putPostInitialPlacementReplicator{log: c.log, replicator: c.replicator}),
putsvc.WithLogger(c.log),
putsvc.WithSplitChainVerifier(split.NewVerifier(sGet)),
putsvc.WithTombstoneVerifier(tombstone.NewVerifier(os)),
Expand Down
128 changes: 123 additions & 5 deletions pkg/services/object/put/distributed.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ type distributedTarget struct {
ecPart iec.PartInfo

initialPolicy *netmap.InitialPlacementPolicy

postPlacementReplicator PostPlacementReplicator
}

type nodeDesc struct {
Expand Down Expand Up @@ -190,6 +192,7 @@ func (t *distributedTarget) saveObject(obj object.Object, encObj encodedObject)
// TODO: handle rules in parallel. https://github.com/nspcc-dev/neofs-node/issues/3503

repRules := t.containerNodes.PrimaryCounts()
fullRepRules := repRules
ecRules := t.containerNodes.ECRules()
if typ := obj.Type(); typ == object.TypeTombstone || typ == object.TypeLock || typ == object.TypeLink || len(obj.Children()) > 0 {
broadcast := typ != object.TypeLock || !t.localOnly
Expand All @@ -213,17 +216,21 @@ func (t *distributedTarget) saveObject(obj object.Object, encObj encodedObject)
initial := t.initialPolicy != nil

if t.ecPart.RuleIndex >= 0 { // already encoded EC part
total := int(ecRules[t.ecPart.RuleIndex].DataPartNum + ecRules[t.ecPart.RuleIndex].ParityPartNum)
nodes := objNodeLists[len(repRules)+t.ecPart.RuleIndex]

// part info should already be verified, so we don't prevent out-of-range panic here
if initial && len(t.initialPolicy.ReplicaLimits()) != 0 && t.initialPolicy.ReplicaLimits()[len(repRules)+t.ecPart.RuleIndex] == 0 {
// Client can seal objects himself, but he is unlikely to enforce placement policies.
// Using SDK slicer as an example, the object is PUT to any SN.
// But this object should still not be saved according to initial policy.
// So, this is no-op.
// But this object should still be replicated according to the full policy.
postPlacementNodes := ecNodesForPart(nodes, t.ecPart.Index, total)
if len(postPlacementNodes) > 0 {
t.postPlacementReplicator.HandlePostPlacement(&obj, postPlacementNodes)
}
return nil
}

total := int(ecRules[t.ecPart.RuleIndex].DataPartNum + ecRules[t.ecPart.RuleIndex].ParityPartNum)
nodes := objNodeLists[len(repRules)+t.ecPart.RuleIndex]
return t.saveECPart(obj, encObj, t.ecPart.RuleIndex, t.ecPart.Index, total, nodes, &t.metaCollection)
}

Expand All @@ -240,9 +247,12 @@ func (t *distributedTarget) saveObject(obj object.Object, encObj encodedObject)
var maxReplicas uint
var ecLimits []uint32
var ruleOrder []int
var appliedECRules []bool
Comment thread
carpawell marked this conversation as resolved.
ruleNum := len(repRules) + len(ecRules)

if initial {
appliedECRules = make([]bool, len(ecRules))

initialLimits := t.initialPolicy.ReplicaLimits()
if t.sessionSigner == nil { // same as above
if len(initialLimits) < len(repRules) {
Expand Down Expand Up @@ -331,6 +341,9 @@ func (t *distributedTarget) saveObject(obj object.Object, encObj encodedObject)
t.placementIterator.log.Info("PUT by EC rule failure", zap.Stringer("object", obj.Address()), zap.Error(err))
return false, nil
}
if appliedECRules != nil {
appliedECRules[ecRuleIdx] = true
}
if maxReplicas > 0 {
leftReplicas--
return leftReplicas == 0, nil
Expand Down Expand Up @@ -428,12 +441,76 @@ nextRule:
}

if len(repRules) > 0 {
return t.submitMetaCollection(obj.Address(), &t.metaCollection)
err = t.submitMetaCollection(obj.Address(), &t.metaCollection)
if err != nil {
return err
}
}

if initial {
if len(fullRepRules) > 0 && repProg != nil {
t.replicateRemainingPrimaryNodes(obj, objNodeLists[:len(fullRepRules)], fullRepRules, repProg)
}
if t.sessionSigner != nil && len(ecRules) > 0 {
t.replicateRemainingECRules(obj, ecRules, objNodeLists[len(fullRepRules):], appliedECRules)
}
}
return nil
}

func (t *distributedTarget) replicateRemainingPrimaryNodes(obj object.Object, nodeLists [][]netmap.NodeInfo, repRules []uint, prog *repProgress) {
remainingNodes := prog.remainingPrimaryNodes(nodeLists, repRules)
if len(remainingNodes) == 0 {
return
}

t.postPlacementReplicator.HandlePostPlacement(&obj, remainingNodes)
}

func (t *distributedTarget) replicateRemainingECRules(obj object.Object, ecRules []iec.Rule, nodeLists [][]netmap.NodeInfo, applied []bool) {
encodedByRule := make(map[iec.Rule][][]byte, len(ecRules))

for ruleIdx := range ecRules {
if applied[ruleIdx] {
continue
}

payloadParts, ok := encodedByRule[ecRules[ruleIdx]]
if !ok {
var err error
payloadParts, err = iec.Encode(ecRules[ruleIdx], obj.Payload())
if err != nil {
t.placementIterator.log.Info("failed to encode object for post-placement EC replication",
zap.Stringer("object", obj.Address()),
zap.Int("rule_idx", ruleIdx),
zap.Stringer("rule", ecRules[ruleIdx]),
zap.Error(err))
continue
}
encodedByRule[ecRules[ruleIdx]] = payloadParts
}

totalParts := len(payloadParts)
for partIdx := range payloadParts {
partObj, err := formObjectForECPart(t.sessionSigner, obj, ruleIdx, partIdx, payloadParts)
if err != nil {
t.placementIterator.log.Info("failed to form EC part object for post-placement replication",
zap.Stringer("object", obj.Address()),
zap.Int("rule_idx", ruleIdx),
zap.Int("part_idx", partIdx),
zap.Error(err))
continue
}

nodes := ecNodesForPart(nodeLists[ruleIdx], partIdx, totalParts)
if len(nodes) == 0 {
continue
}
t.postPlacementReplicator.HandlePostPlacement(&partObj, nodes)
}
}
}

func (t *distributedTarget) resetMetaCollection() {
// this field is reused for sliced objects of the same container with
// the same placement policy; placement's len must be kept the same, do
Expand Down Expand Up @@ -719,6 +796,47 @@ func newRepProgress(nodeLists [][]netmap.NodeInfo) *repProgress {
}
}

func (p *repProgress) remainingPrimaryNodes(nodeLists [][]netmap.NodeInfo, repRules []uint) []netmap.NodeInfo {
var totalPrimaries int
for i := range nodeLists {
totalPrimaries += primaryNodesCount(nodeLists[i], repRules, i)
}

remainingNodes := make([]netmap.NodeInfo, 0, totalPrimaries)
seen := make(map[string]struct{}, totalPrimaries)

p.nodeResultsMtx.RLock()
defer p.nodeResultsMtx.RUnlock()

for i := range nodeLists {
for j := range primaryNodesCount(nodeLists[i], repRules, i) {
node := nodeLists[i][j]
pk := string(node.PublicKey())
if _, ok := seen[pk]; ok {
continue
}
seen[pk] = struct{}{}

res, ok := p.nodeResults[pk]
if ok && res.succeeded {
continue
}

remainingNodes = append(remainingNodes, node)
}
}

return remainingNodes
}

func primaryNodesCount(nodes []netmap.NodeInfo, repRules []uint, i int) int {
if i >= len(repRules) || int(repRules[i]) >= len(nodes) {
return len(nodes)
}

return int(repRules[i])
}

func repToNode(l *zap.Logger, prog *repProgress, pubKeyStr string, listInd int, nr nodeResult, f func(nodeDesc) error) {
nr.desc.placementVector = listInd
err := f(nr.desc)
Expand Down
27 changes: 22 additions & 5 deletions pkg/services/object/put/ec.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,9 @@ func (t *distributedTarget) applyECRule(signer neofscrypto.Signer, obj object.Ob
}

func (t *distributedTarget) formAndSaveObjectForECPart(signer neofscrypto.Signer, obj object.Object, ruleIdx, partIdx int, payloadParts [][]byte, nodeList []netmap.NodeInfo) error {
partObj, err := iec.FormObjectForECPart(signer, obj, payloadParts[partIdx], iec.PartInfo{
RuleIndex: ruleIdx,
Index: partIdx,
})
partObj, err := formObjectForECPart(signer, obj, ruleIdx, partIdx, payloadParts)
if err != nil {
return fmt.Errorf("form object for part: %w", err)
return err
}

var encObj encodedObject
Expand Down Expand Up @@ -77,6 +74,18 @@ func (t *distributedTarget) formAndSaveObjectForECPart(signer neofscrypto.Signer
return nil
}

func formObjectForECPart(signer neofscrypto.Signer, obj object.Object, ruleIdx, partIdx int, payloadParts [][]byte) (object.Object, error) {
partObj, err := iec.FormObjectForECPart(signer, obj, payloadParts[partIdx], iec.PartInfo{
RuleIndex: ruleIdx,
Index: partIdx,
})
if err != nil {
return object.Object{}, fmt.Errorf("form object for part: %w", err)
}

return partObj, nil
}

func (t *distributedTarget) saveECPart(part object.Object, encObj encodedObject, ruleIdx, partIdx, totalParts int, nodeList []netmap.NodeInfo,
metaC *metaCollection) error {
return t.distributeObjectWithMeta(part, encObj, metaC, func(obj object.Object, encObj encodedObject) error {
Expand Down Expand Up @@ -117,3 +126,11 @@ func (t *distributedTarget) saveECPartOnNode(ruleIdx int, obj object.Object, enc

return t.sendObject(obj, enc, n, metaC)
}

func ecNodesForPart(nodeList []netmap.NodeInfo, partIdx, totalParts int) []netmap.NodeInfo {
res := make([]netmap.NodeInfo, 0, len(nodeList))
for i := range iec.NodeSequenceForPart(partIdx, totalParts, len(nodeList)) {
res = append(res, nodeList[i])
}
return res
}
16 changes: 16 additions & 0 deletions pkg/services/object/put/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/nspcc-dev/neofs-node/pkg/util"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
netmapsdk "github.com/nspcc-dev/neofs-sdk-go/netmap"
"github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"github.com/nspcc-dev/neofs-sdk-go/session/v2"
"github.com/nspcc-dev/neofs-sdk-go/user"
Expand Down Expand Up @@ -65,6 +66,13 @@ type ClientConstructor interface {
Get(context.Context, client.NodeInfo) (client.MultiAddressClient, error)
}

// PostPlacementReplicator performs replication of objects that
// were initially placed using relaxed limits and should be brought to the full
// placement policy immediately after PUT succeeds.
type PostPlacementReplicator interface {
HandlePostPlacement(*object.Object, []netmapsdk.NodeInfo)
}

// ContainerNodes provides access to storage nodes matching storage policy of
// the particular container.
type ContainerNodes interface {
Expand Down Expand Up @@ -139,6 +147,8 @@ type cfg struct {

metaSvc *meta.Meta

postPlacementReplicator PostPlacementReplicator

quotaLimiter QuotaLimiter
payments PaymentChecker

Expand Down Expand Up @@ -264,3 +274,9 @@ func WithNNSResolver(resolver session.NNSResolver) Option {
c.nnsResolver = resolver
}
}

func WithPostPlacementReplicator(v PostPlacementReplicator) Option {
return func(c *cfg) {
c.postPlacementReplicator = v
}
}
Loading
Loading