sn/object: add post-initial placement replication#3923
sn/object: add post-initial placement replication#3923
Conversation
| type cfg struct { | ||
| putTimeout time.Duration | ||
| queueSize int | ||
| workers int |
There was a problem hiding this comment.
These parameters can be made configurable if this type of queue is suitable
There was a problem hiding this comment.
Meh, we've just removed replicator.pool_size. It'd be nice to not have any configurations, let's have like 8 for now and we'll improve on it later.
There was a problem hiding this comment.
no need to have them as cfg fields if they are not configurable. Just use consts
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #3923 +/- ##
==========================================
+ Coverage 26.82% 26.88% +0.05%
==========================================
Files 670 671 +1
Lines 44294 44427 +133
==========================================
+ Hits 11881 11943 +62
- Misses 31309 31373 +64
- Partials 1104 1111 +7 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
| type cfg struct { | ||
| putTimeout time.Duration | ||
| queueSize int | ||
| workers int |
There was a problem hiding this comment.
Meh, we've just removed replicator.pool_size. It'd be nice to not have any configurations, let's have like 8 for now and we'll improve on it later.
| } | ||
|
|
||
| t.replicateRemainingPrimaryNodes(obj, objNodeLists[:len(fullRepRules)], fullRepRules, repProg) | ||
| t.replicateRemainingECRules(obj, ecRules, objNodeLists[len(fullRepRules):], appliedECRules) |
There was a problem hiding this comment.
Needs to be checked for various buffer reuses. Although obj is not a problem and objNodeLists should be fine too.
There was a problem hiding this comment.
The only thing I found is that replicator.Task now crosses the boundary of an asynchronous call, so it needs to maintain its own list of nodes instead of aliasing caller memory.
ba50162 to
664367c
Compare
pkg/services/replicator/queue.go
Outdated
| ) | ||
|
|
||
| // ErrQueueFull means that a replication task could not be queued immediately. | ||
| var ErrQueueFull = errors.New("replication queue is full") |
There was a problem hiding this comment.
used inside the package only, why exported?
pkg/services/replicator/queue.go
Outdated
| }) | ||
| } | ||
|
|
||
| <-ctx.Done() |
There was a problem hiding this comment.
seems excessive, we wait all workes to be interrupted by ctx anyway
pkg/services/replicator/queue.go
Outdated
| case <-ctx.Done(): | ||
| return | ||
| case task := <-p.taskQueue: | ||
| res := new(countingReplicationResult) |
There was a problem hiding this comment.
can be reused b/w iterations
| // SetNodes sets a list of potential object holders. | ||
| func (t *Task) SetNodes(v []netmap.NodeInfo) { | ||
| t.nodes = v | ||
| t.nodes = slices.Clone(v) |
There was a problem hiding this comment.
why cloning is needed?
There was a problem hiding this comment.
I thought about how the queue changed Task lifetime: it is no longer executed immediately, so it should own nodes instead of aliasing caller memory.
There was a problem hiding this comment.
instead of aliasing caller memory
why not? It does not mutate it
There was a problem hiding this comment.
You're right, I don't currently see a place where that slice is mutated after being passed in. I added the clone as a safety measure because Task is now queued and can outlive the caller. If we don't want the extra copy, i can drop it and document that the caller must not modify or reuse the slice after SetNodes.
|
|
||
| localNodeKey LocalNodeKey | ||
|
|
||
| taskQueue chan Task |
There was a problem hiding this comment.
lets make this a Replicator field. It's configured but initialized internally
| type cfg struct { | ||
| putTimeout time.Duration | ||
| queueSize int | ||
| workers int |
There was a problem hiding this comment.
no need to have them as cfg fields if they are not configurable. Just use consts
cmd/neofs-node/object.go
Outdated
| policer.WithBoostMultiplier(c.appCfg.Policer.BoostMultiplier), | ||
| ) | ||
|
|
||
| c.workers = append(c.workers, c.replicator) |
There was a problem hiding this comment.
merge into existing append
pkg/services/object/put/ec.go
Outdated
| } | ||
|
|
||
| func ecNodesForPart(nodeList []netmap.NodeInfo, partIdx, totalParts int) []netmap.NodeInfo { | ||
| return slices.Collect(func(ni func(netmap.NodeInfo) bool) { |
There was a problem hiding this comment.
looks trickier than it might be. Just make a slice of len(nodeList) and fill in for
cmd/neofs-node/object.go
Outdated
| } | ||
|
|
||
| func (r putPostPlacementReplicator) HandlePostPlacement(obj *object.Object, nodes []netmapsdk.NodeInfo) { | ||
| if len(nodes) == 0 { |
There was a problem hiding this comment.
it's senseless to me to call this w/ empty node list. Single current caller prevents this on it's own
There was a problem hiding this comment.
There were two callers, but I understood the point and checked before making the call.
| } | ||
|
|
||
| func (t *distributedTarget) replicateRemainingPrimaryNodes(obj object.Object, nodeLists [][]netmap.NodeInfo, repRules []uint, prog *repProgress) { | ||
| if t.initialPolicy == nil || t.postPlacementReplicator == nil || len(nodeLists) == 0 || prog == nil { |
There was a problem hiding this comment.
i wouldn't call this method in these cases at all. Same for EC
btw in which cases t.postPlacementReplicator is nil?
There was a problem hiding this comment.
Done.
btw in which cases t.postPlacementReplicator is nil?
Only in the tests, but I fixed that behavior.
14be6c2 to
a4161c8
Compare
Queue post-placement tasks in replicator. Start post-placement replication after successful initial placement. Handle both REP replicas and EC part placement. Add tests. Closes #3880. Signed-off-by: Andrey Butusov <andrey@nspcc.io>
Closes #3880.