@@ -23,14 +23,12 @@ package replication
2323import (
2424 "context"
2525 "fmt"
26+ "strconv"
2627 "time"
2728
2829 "github.com/arangodb/arangosync-client/client"
29- "github.com/arangodb/arangosync-client/client/synccheck"
30- "github.com/arangodb/go-driver"
3130
3231 api "github.com/arangodb/kube-arangodb/pkg/apis/replication/v1"
33- "github.com/arangodb/kube-arangodb/pkg/deployment/features"
3432 "github.com/arangodb/kube-arangodb/pkg/util/errors"
3533)
3634
@@ -68,7 +66,6 @@ func (dr *DeploymentReplication) inspectDeploymentReplication(lastInterval time.
6866 }
6967 timeout := CancellationTimeout + AbortTimeout
7068 if isTimeExceeded (timestamp , timeout ) {
71- // Cancellation and abort timeout exceeded, so it must go into failed state.
7269 dr .failOnError (err , fmt .Sprintf ("Failed to cancel synchronization in %s" , timeout .String ()))
7370 }
7471 }
@@ -80,12 +77,6 @@ func (dr *DeploymentReplication) inspectDeploymentReplication(lastInterval time.
8077 if err != nil {
8178 dr .log .Err (err ).Warn ("Failed to create destination syncmaster client" )
8279 } else {
83- destArangosyncVersion , err := destClient .Version (ctx )
84- if err != nil {
85- dr .log .Err (err ).Warn ("Failed to get destination arangosync version" )
86- hasError = true
87- }
88-
8980 // Fetch status of destination
9081 updateStatusNeeded := false
9182 configureSyncNeeded := false
@@ -108,8 +99,7 @@ func (dr *DeploymentReplication) inspectDeploymentReplication(lastInterval time.
10899 // Destination is correctly configured
109100 dr .status .Conditions .Update (api .ConditionTypeConfigured , true , api .ConditionConfiguredReasonActive ,
110101 "Destination syncmaster is configured correctly and active" )
111- dr .status .IncomingSynchronization = dr .inspectIncomingSynchronizationStatus (ctx , destClient ,
112- driver .Version (destArangosyncVersion .Version ), destStatus .Shards )
102+ dr .status .IncomingSynchronization = dr .inspectIncomingSynchronizationStatus (destStatus )
113103 updateStatusNeeded = true
114104 } else {
115105 // Sync is active, but from different source
@@ -249,91 +239,28 @@ func (dr *DeploymentReplication) hasOutgoingEndpoint(status client.SyncInfo, epS
249239}
250240
251241// inspectIncomingSynchronizationStatus returns the synchronization status for the incoming sync
252- func (dr * DeploymentReplication ) inspectIncomingSynchronizationStatus (ctx context.Context , syncClient client.API , arangosyncVersion driver.Version , localShards []client.ShardSyncInfo ) api.SynchronizationStatus {
253- dataCentersResp , err := syncClient .Master ().GetDataCentersInfo (ctx )
254- if err != nil {
255- errMsg := "Failed to fetch data-centers info"
256- dr .log .Err (err ).Warn (errMsg )
257- return api.SynchronizationStatus {
258- Error : fmt .Sprintf ("%s: %s" , errMsg , err .Error ()),
259- }
260- }
242+ func (dr * DeploymentReplication ) inspectIncomingSynchronizationStatus (destStatus client.SyncInfo ) api.SynchronizationStatus {
243+ const maxReportedIncomingSyncErrorsPerDatabase = 10
261244
262- ch := synccheck .NewSynchronizationChecker (syncClient , time .Minute )
263- incomingSyncStatus , err := ch .CheckSync (ctx , & dataCentersResp , localShards )
264- if err != nil {
265- errMsg := "Failed to check synchronization status"
266- dr .log .Err (err ).Warn (errMsg )
267- return api.SynchronizationStatus {
268- Error : fmt .Sprintf ("%s: %s" , errMsg , err .Error ()),
245+ dbs := make (map [string ]api.DatabaseSynchronizationStatus , 0 )
246+ for _ , s := range destStatus .Shards {
247+ db := dbs [s .Database ]
248+ db .ShardsTotal ++
249+ if s .Status == client .SyncStatusRunning {
250+ db .ShardsInSync ++
251+ } else if s .Status == client .SyncStatusFailed && len (db .Errors ) < maxReportedIncomingSyncErrorsPerDatabase {
252+ db .Errors = append (db .Errors , api.DatabaseSynchronizationError {
253+ Collection : s .Collection ,
254+ Shard : strconv .Itoa (s .ShardIndex ),
255+ Message : fmt .Sprintf ("shard sync failed: %s" , s .StatusMessage ),
256+ })
269257 }
258+ dbs [s .Database ] = db
270259 }
271- return dr .createSynchronizationStatus (arangosyncVersion , incomingSyncStatus )
272- }
273260
274- // createSynchronizationStatus returns aggregated info about DCSyncStatus
275- func (dr * DeploymentReplication ) createSynchronizationStatus (arangosyncVersion driver.Version , dcSyncStatus * synccheck.DCSyncStatus ) api.SynchronizationStatus {
276- dbs := make (map [string ]api.DatabaseSynchronizationStatus , len (dcSyncStatus .Databases ))
277- i := 0
278- for dbName , dbSyncStatus := range dcSyncStatus .Databases {
279- i ++
280- db := dbName
281- if features .SensitiveInformationProtection ().Enabled () {
282- // internal IDs are not available in older versions
283- if arangosyncVersion .CompareTo ("2.12.0" ) >= 0 {
284- db = dbSyncStatus .ID
285- } else {
286- db = fmt .Sprintf ("<PROTECTED_INFO_%d>" , i )
287- }
288- }
289- dbs [db ] = dr .createDatabaseSynchronizationStatus (dbSyncStatus )
290- }
291261 return api.SynchronizationStatus {
292- AllInSync : dcSyncStatus . AllInSync () ,
262+ AllInSync : destStatus . Status == client . SyncStatusRunning ,
293263 Databases : dbs ,
294264 Error : "" ,
295265 }
296266}
297-
298- // createDatabaseSynchronizationStatus returns sync status for DB
299- func (dr * DeploymentReplication ) createDatabaseSynchronizationStatus (dbSyncStatus synccheck.DatabaseSyncStatus ) api.DatabaseSynchronizationStatus {
300- // use limit for errors because the resulting status object should not be too big
301- const maxReportedIncomingSyncErrors = 20
302-
303- var errs []api.DatabaseSynchronizationError
304- var shardsTotal , shardsInSync int
305- var errorsReportedToLog = 0
306- for colName , colSyncStatus := range dbSyncStatus .Collections {
307- if colSyncStatus .Error != "" && len (errs ) < maxReportedIncomingSyncErrors {
308- col := colName
309- if features .SensitiveInformationProtection ().Enabled () {
310- col = colSyncStatus .ID
311- }
312-
313- errs = append (errs , api.DatabaseSynchronizationError {
314- Collection : col ,
315- Shard : "" ,
316- Message : colSyncStatus .Error ,
317- })
318- }
319-
320- shardsTotal += len (colSyncStatus .Shards )
321- for shardIndex , shardSyncStatus := range colSyncStatus .Shards {
322- if shardSyncStatus .InSync {
323- shardsInSync ++
324- } else if errorsReportedToLog < maxReportedIncomingSyncErrors {
325- dr .log .Str ("db" , dbSyncStatus .ID ).
326- Str ("col" , colSyncStatus .ID ).
327- Int ("shard" , shardIndex ).
328- Debug ("incoming synchronization shard status is not in-sync: %s" , shardSyncStatus .Message )
329- errorsReportedToLog ++
330- }
331- }
332- }
333-
334- return api.DatabaseSynchronizationStatus {
335- ShardsTotal : shardsTotal ,
336- ShardsInSync : shardsInSync ,
337- Errors : errs ,
338- }
339- }
0 commit comments