From 04786f787b1993ae3b1dfa6e27961ee6c6bcd27d Mon Sep 17 00:00:00 2001 From: Wei-Chiu Chuang Date: Fri, 20 Mar 2026 08:39:35 -0700 Subject: [PATCH 1/2] HDDS-10739. High-throughput EC decommissioning with threshold-based switching and disk-aware scheduling 1. SCM Replication Manager Enhancements: - Threshold-Based Switching: EC containers on decommissioning nodes now automatically switch from simple replication to reconstruction if the source node's replication load exceeds a configurable threshold (hdds.scm.replication.decommission.ec.reconstruction.threshold, default: 5). - Feature Flag: Added hdds.scm.replication.decommission.ec.reconstruction.enabled (default: true) to explicitly toggle this behavior. - Global Concurrency Limit: Implemented a cluster-wide cap (hdds.scm.replication.decommission.concurrency, default: 100) to prevent cluster-wide performance degradation during large-scale decommissions. - Observability: Added the InflightDecommission gauge to ReplicationManagerMetrics to track simultaneous decommissioning tasks. 2. Datanode Scheduling Enhancements: - Disk-Aware Volume Selection: The ReplicationSupervisor now tracks in-flight tasks per physical disk volume. - Non-Busy Disk Prioritization: The ContainerImporter and DownloadAndImportReplicator were updated to prioritize target volumes with zero in-flight tasks, ensuring better I/O distribution and avoiding disk bottlenecks. 3. Stability and Infrastructure: - Bug Fix: Fixed a series of Hugo documentation build errors caused by invalid date formats in markdown front matter. Change-Id: I426c6aa8cc5f1321468db13f15591a0a3bf06ac1 --- .../statemachine/DatanodeStateMachine.java | 28 +- .../ECReconstructionCoordinator.java | 6 +- .../container/ozoneimpl/OzoneContainer.java | 20 +- .../replication/ContainerImporter.java | 23 +- .../DownloadAndImportReplicator.java | 45 +- .../replication/ReplicationSupervisor.java | 20 + hadoop-hdds/docs/content/feature/Quota.md | 2 +- hadoop-hdds/docs/content/feature/Quota.zh.md | 2 +- hadoop-hdds/docs/content/security/GDPR.md | 2 +- hadoop-hdds/docs/content/security/GDPR.zh.md | 2 +- .../docs/content/security/SecureOzone.md | 2 +- .../docs/content/security/SecureOzone.zh.md | 2 +- .../content/security/SecuringDatanodes.md | 2 +- .../content/security/SecuringDatanodes.zh.md | 2 +- .../content/security/SecuringOzoneHTTP.md | 2 +- .../content/security/SecuringOzoneHTTP.zh.md | 2 +- .../docs/content/security/SecuringS3.md | 2 +- .../docs/content/security/SecuringS3.zh.md | 2 +- .../docs/content/security/SecuringTDE.md | 2 +- .../docs/content/security/SecuringTDE.zh.md | 2 +- .../docs/content/security/SecurityAcls.md | 2 +- .../docs/content/security/SecurityAcls.zh.md | 2 +- .../content/security/SecurityWithRanger.md | 2 +- .../content/security/SecurityWithRanger.zh.md | 2 +- .../replication/ContainerReplicaOp.java | 28 +- .../ContainerReplicaPendingOps.java | 433 +++++------------- .../ECUnderReplicationHandler.java | 344 ++++++-------- .../replication/ReplicationManager.java | 148 +++++- .../replication/ReplicationTestUtil.java | 7 +- .../TestECUnderReplicationHandler.java | 49 +- 30 files changed, 574 insertions(+), 613 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java index 6046af1e0a79..5a4c7b6a5212 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java @@ -173,13 +173,22 @@ public DatanodeStateMachine(DatanodeDetails datanodeDetails, connectionManager = new SCMConnectionManager(conf); context = new StateContext(this.conf, DatanodeStates.getInitState(), this, threadNamePrefix); + ReplicationConfig replicationConfig = + conf.getObject(ReplicationConfig.class); + supervisor = ReplicationSupervisor.newBuilder() + .stateContext(context) + .datanodeConfig(dnConf) + .replicationConfig(replicationConfig) + .clock(clock) + .build(); + // OzoneContainer instance is used in a non-thread safe way by the context // past to its constructor, so we much synchronize its access. See // HDDS-3116 for more details. constructionLock.writeLock().lock(); try { container = new OzoneContainer(this.datanodeDetails, - conf, context, certClient, secretKeyClient); + conf, context, certClient, secretKeyClient, supervisor); } finally { constructionLock.writeLock().unlock(); } @@ -188,11 +197,13 @@ public DatanodeStateMachine(DatanodeDetails datanodeDetails, ContainerImporter importer = new ContainerImporter(conf, container.getContainerSet(), container.getController(), - container.getVolumeSet()); + container.getVolumeSet(), + supervisor); ContainerReplicator pullReplicator = new DownloadAndImportReplicator( conf, container.getContainerSet(), importer, - new SimpleContainerDownloader(conf, certClient)); + new SimpleContainerDownloader(conf, certClient), + supervisor); ContainerReplicator pushReplicator = new PushReplicator(conf, new OnDemandContainerReplicationSource(container.getController()), new GrpcContainerUploader(conf, certClient) @@ -201,15 +212,6 @@ public DatanodeStateMachine(DatanodeDetails datanodeDetails, pullReplicatorWithMetrics = new MeasuredReplicator(pullReplicator, "pull"); pushReplicatorWithMetrics = new MeasuredReplicator(pushReplicator, "push"); - ReplicationConfig replicationConfig = - conf.getObject(ReplicationConfig.class); - supervisor = ReplicationSupervisor.newBuilder() - .stateContext(context) - .datanodeConfig(dnConf) - .replicationConfig(replicationConfig) - .clock(clock) - .build(); - replicationSupervisorMetrics = ReplicationSupervisorMetrics.create(supervisor); @@ -217,7 +219,7 @@ public DatanodeStateMachine(DatanodeDetails datanodeDetails, ecReconstructionCoordinator = new ECReconstructionCoordinator( conf, certClient, secretKeyClient, context, ecReconstructionMetrics, - threadNamePrefix); + threadNamePrefix, supervisor); // This is created as an instance variable as Mockito needs to access it in // a test. The test mocks it in a running mini-cluster. diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java index 8fadd19b67d3..6bc3380c8f9b 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java @@ -49,6 +49,7 @@ import org.apache.hadoop.ozone.client.io.ECBlockReconstructedStripeInputStream; import org.apache.hadoop.ozone.container.common.helpers.BlockData; import org.apache.hadoop.ozone.container.common.statemachine.StateContext; +import org.apache.hadoop.ozone.container.replication.ReplicationSupervisor; import org.apache.hadoop.security.token.Token; import org.apache.ratis.util.MemoizedSupplier; import org.slf4j.Logger; @@ -116,13 +117,16 @@ public class ECReconstructionCoordinator implements Closeable { private final ECReconstructionMetrics metrics; private final StateContext context; private final OzoneClientConfig ozoneClientConfig; + private final ReplicationSupervisor replicationSupervisor; public ECReconstructionCoordinator( ConfigurationSource conf, CertificateClient certificateClient, SecretKeySignerClient secretKeyClient, StateContext context, ECReconstructionMetrics metrics, - String threadNamePrefix) throws IOException { + String threadNamePrefix, + ReplicationSupervisor supervisor) throws IOException { this.context = context; + this.replicationSupervisor = supervisor; this.containerOperationClient = new ECContainerOperationClient(conf, certificateClient); this.byteBufferPool = new ElasticByteBufferPool(); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java index aef3965dcd49..c32b07c27c88 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java @@ -50,6 +50,7 @@ import org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis; import org.apache.hadoop.ozone.container.common.utils.ContainerInspectorUtil; import org.apache.hadoop.ozone.container.common.utils.HddsVolumeUtil; +import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil; import org.apache.hadoop.ozone.container.common.volume.HddsVolume; import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet; import org.apache.hadoop.ozone.container.common.volume.StorageVolume; @@ -59,6 +60,7 @@ import org.apache.hadoop.ozone.container.replication.ContainerImporter; import org.apache.hadoop.ozone.container.replication.ReplicationServer; import org.apache.hadoop.ozone.container.replication.ReplicationServer.ReplicationConfig; +import org.apache.hadoop.ozone.container.replication.ReplicationSupervisor; import org.apache.hadoop.ozone.container.upgrade.VersionedDatanodeFeatures.SchemaV3; import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException; import org.apache.hadoop.util.Timer; @@ -76,6 +78,7 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT; @@ -138,10 +141,11 @@ enum InitializingStatus { * @throws DiskOutOfSpaceException * @throws IOException */ - public OzoneContainer( - DatanodeDetails datanodeDetails, ConfigurationSource conf, - StateContext context, CertificateClient certClient, - SecretKeyVerifierClient secretKeyClient) throws IOException { + public OzoneContainer(DatanodeDetails datanodeDetails, + ConfigurationSource conf, StateContext context, + CertificateClient certClient, SecretKeyVerifierClient secretKeyClient, + ReplicationSupervisor supervisor) throws IOException { + config = conf; this.datanodeDetails = datanodeDetails; this.context = context; @@ -211,12 +215,14 @@ public OzoneContainer( replicationServer = new ReplicationServer( controller, - conf.getObject(ReplicationConfig.class), + conf.getObject(ReplicationServer.ReplicationConfig.class), secConf, certClient, new ContainerImporter(conf, containerSet, controller, - volumeSet), - datanodeDetails.threadNamePrefix()); + volumeSet, supervisor), + datanodeDetails.threadNamePrefix(), + supervisor); + readChannel = new XceiverServerGrpc( datanodeDetails, config, hddsDispatcher, certClient); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java index f20094079c9e..79bb91cac7d4 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java @@ -61,6 +61,7 @@ public class ContainerImporter { private final MutableVolumeSet volumeSet; private final VolumeChoosingPolicy volumeChoosingPolicy; private final long containerSize; + private final ReplicationSupervisor replicationSupervisor; private final Set importContainerProgress = Collections.synchronizedSet(new HashSet<>()); @@ -70,10 +71,12 @@ public class ContainerImporter { public ContainerImporter(@Nonnull ConfigurationSource conf, @Nonnull ContainerSet containerSet, @Nonnull ContainerController controller, - @Nonnull MutableVolumeSet volumeSet) { + @Nonnull MutableVolumeSet volumeSet, + @Nonnull ReplicationSupervisor supervisor) { this.containerSet = containerSet; this.controller = controller; this.volumeSet = volumeSet; + this.replicationSupervisor = supervisor; try { volumeChoosingPolicy = VolumeChoosingPolicyFactory.getPolicy(conf); } catch (Exception e) { @@ -147,9 +150,21 @@ private static void deleteFileQuietely(Path tarFilePath) { HddsVolume chooseNextVolume() throws IOException { // Choose volume that can hold both container in tmp and dest directory - return volumeChoosingPolicy.chooseVolume( - StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList()), - containerSize * 2); + List hddsVolumes = + StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList()); + List nonBusyVolumes = hddsVolumes.stream() + .filter(v -> replicationSupervisor + .getInFlightReplications(v.getVolumeRootDir()) == 0) + .collect(java.util.stream.Collectors.toList()); + + if (!nonBusyVolumes.isEmpty()) { + try { + return volumeChoosingPolicy.chooseVolume(nonBusyVolumes, containerSize * 2); + } catch (IOException e) { + LOG.debug("Failed to choose from non-busy volumes, falling back to all volumes.", e); + } + } + return volumeChoosingPolicy.chooseVolume(hddsVolumes, containerSize * 2); } public static Path getUntarDirectory(HddsVolume hddsVolume) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java index 2589c0de5b25..4f8010efcde0 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

+ * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -17,52 +17,52 @@ */ package org.apache.hadoop.ozone.container.replication; -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.List; - import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.ozone.container.common.impl.ContainerSet; import org.apache.hadoop.ozone.container.common.volume.HddsVolume; import org.apache.hadoop.ozone.container.replication.AbstractReplicationTask.Status; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.List; + /** - * Default replication implementation. - *

- * This class does the real job. Executes the download and import the container - * to the container set. + * Replicator which downloads the container tarball and imports it. */ public class DownloadAndImportReplicator implements ContainerReplicator { - public static final Logger LOG = + private static final Logger LOG = LoggerFactory.getLogger(DownloadAndImportReplicator.class); private final ConfigurationSource conf; private final ContainerDownloader downloader; private final ContainerImporter containerImporter; private final ContainerSet containerSet; + private final ReplicationSupervisor replicationSupervisor; public DownloadAndImportReplicator( ConfigurationSource conf, ContainerSet containerSet, ContainerImporter containerImporter, - ContainerDownloader downloader) { + ContainerDownloader downloader, + ReplicationSupervisor replicationSupervisor) { this.conf = conf; this.containerSet = containerSet; this.downloader = downloader; this.containerImporter = containerImporter; + this.replicationSupervisor = replicationSupervisor; } @Override public void replicate(ReplicationTask task) { long containerID = task.getContainerId(); if (containerSet.getContainer(containerID) != null) { - LOG.debug("Container {} has already been downloaded.", containerID); - task.setStatus(Status.SKIPPED); + LOG.info("Container {} already exists. Skipping replication.", + containerID); + task.setStatus(Status.DONE); return; } @@ -73,8 +73,12 @@ public void replicate(ReplicationTask task) { LOG.info("Starting replication of container {} from {} using {}", containerID, sourceDatanodes, compression); + HddsVolume targetVolume = null; try { - HddsVolume targetVolume = containerImporter.chooseNextVolume(); + targetVolume = containerImporter.chooseNextVolume(); + replicationSupervisor.incrementInFlightReplication( + targetVolume.getVolumeRootDir()); + // Wait for the download. This thread pool is limiting the parallel // downloads, so it's ok to block here and wait for the full download. Path tarFilePath = @@ -97,6 +101,11 @@ public void replicate(ReplicationTask task) { } catch (IOException e) { LOG.error("Container {} replication was unsuccessful.", containerID, e); task.setStatus(Status.FAILED); + } finally { + if (targetVolume != null) { + replicationSupervisor.decrementInFlightReplication( + targetVolume.getVolumeRootDir()); + } } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java index 5ceea125e814..587d0b3825eb 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java @@ -85,6 +85,26 @@ public final class ReplicationSupervisor { */ private final Set inFlight; + private final Map inFlightReplicationsPerVolume = + new ConcurrentHashMap<>(); + + public void incrementInFlightReplication(String volumeRoot) { + inFlightReplicationsPerVolume.computeIfAbsent(volumeRoot, + k -> new AtomicInteger(0)).incrementAndGet(); + } + + public void decrementInFlightReplication(String volumeRoot) { + AtomicInteger counter = inFlightReplicationsPerVolume.get(volumeRoot); + if (counter != null) { + counter.decrementAndGet(); + } + } + + public int getInFlightReplications(String volumeRoot) { + AtomicInteger counter = inFlightReplicationsPerVolume.get(volumeRoot); + return counter == null ? 0 : counter.get(); + } + private final Map, AtomicInteger> taskCounter = new ConcurrentHashMap<>(); private int maxQueueSize; diff --git a/hadoop-hdds/docs/content/feature/Quota.md b/hadoop-hdds/docs/content/feature/Quota.md index 90e413357b50..53c196307fa3 100644 --- a/hadoop-hdds/docs/content/feature/Quota.md +++ b/hadoop-hdds/docs/content/feature/Quota.md @@ -1,6 +1,6 @@ --- title: "Quota in Ozone" -date: "2020-October-22" +date: "2020-10-22" weight: 4 summary: Quota in Ozone icon: user diff --git a/hadoop-hdds/docs/content/feature/Quota.zh.md b/hadoop-hdds/docs/content/feature/Quota.zh.md index 16e5db26cde3..d690947ef06c 100644 --- a/hadoop-hdds/docs/content/feature/Quota.zh.md +++ b/hadoop-hdds/docs/content/feature/Quota.zh.md @@ -1,6 +1,6 @@ --- title: "Ozone 中的配额" -date: "2020-October-22" +date: "2020-10-22" weight: 4 summary: Ozone中的配额 icon: user diff --git a/hadoop-hdds/docs/content/security/GDPR.md b/hadoop-hdds/docs/content/security/GDPR.md index 25b2f2c4416b..409a3ae7be0d 100644 --- a/hadoop-hdds/docs/content/security/GDPR.md +++ b/hadoop-hdds/docs/content/security/GDPR.md @@ -1,6 +1,6 @@ --- title: "GDPR in Ozone" -date: "2019-September-17" +date: "2019-09-17" weight: 3 icon: user menu: diff --git a/hadoop-hdds/docs/content/security/GDPR.zh.md b/hadoop-hdds/docs/content/security/GDPR.zh.md index a7db4030871b..8fd3514138f0 100644 --- a/hadoop-hdds/docs/content/security/GDPR.zh.md +++ b/hadoop-hdds/docs/content/security/GDPR.zh.md @@ -1,6 +1,6 @@ --- title: "Ozone 中的 GDPR" -date: "2019-September-17" +date: "2019-09-17" weight: 3 summary: Ozone 中的 GDPR menu: diff --git a/hadoop-hdds/docs/content/security/SecureOzone.md b/hadoop-hdds/docs/content/security/SecureOzone.md index 76fd74701095..bbeef79b6135 100644 --- a/hadoop-hdds/docs/content/security/SecureOzone.md +++ b/hadoop-hdds/docs/content/security/SecureOzone.md @@ -1,6 +1,6 @@ --- title: "Securing Ozone" -date: "2019-April-03" +date: "2019-04-03" summary: Overview of Ozone security concepts and steps to secure Ozone Manager and SCM. weight: 1 menu: diff --git a/hadoop-hdds/docs/content/security/SecureOzone.zh.md b/hadoop-hdds/docs/content/security/SecureOzone.zh.md index a7660233f4d0..e74b5d8dfab5 100644 --- a/hadoop-hdds/docs/content/security/SecureOzone.zh.md +++ b/hadoop-hdds/docs/content/security/SecureOzone.zh.md @@ -1,6 +1,6 @@ --- title: "安全化 Ozone" -date: "2019-April-03" +date: "2019-04-03" summary: 简要介绍 Ozone 中的安全概念以及安全化 OM 和 SCM 的步骤。 weight: 1 menu: diff --git a/hadoop-hdds/docs/content/security/SecuringDatanodes.md b/hadoop-hdds/docs/content/security/SecuringDatanodes.md index 717e746cfb91..2254155e1f4e 100644 --- a/hadoop-hdds/docs/content/security/SecuringDatanodes.md +++ b/hadoop-hdds/docs/content/security/SecuringDatanodes.md @@ -1,6 +1,6 @@ --- title: "Securing Datanodes" -date: "2019-April-03" +date: "2019-04-03" weight: 3 menu: main: diff --git a/hadoop-hdds/docs/content/security/SecuringDatanodes.zh.md b/hadoop-hdds/docs/content/security/SecuringDatanodes.zh.md index 608be16e8a3b..8b37fd2f6ee2 100644 --- a/hadoop-hdds/docs/content/security/SecuringDatanodes.zh.md +++ b/hadoop-hdds/docs/content/security/SecuringDatanodes.zh.md @@ -1,6 +1,6 @@ --- title: "安全化 Datanode" -date: "2019-April-03" +date: "2019-04-03" weight: 3 menu: main: diff --git a/hadoop-hdds/docs/content/security/SecuringOzoneHTTP.md b/hadoop-hdds/docs/content/security/SecuringOzoneHTTP.md index 47c04eb94d93..a8601d7a5e1f 100644 --- a/hadoop-hdds/docs/content/security/SecuringOzoneHTTP.md +++ b/hadoop-hdds/docs/content/security/SecuringOzoneHTTP.md @@ -1,6 +1,6 @@ --- title: "Securing HTTP" -date: "2020-June-17" +date: "2020-06-17" summary: Secure HTTP web-consoles for Ozone services weight: 4 menu: diff --git a/hadoop-hdds/docs/content/security/SecuringOzoneHTTP.zh.md b/hadoop-hdds/docs/content/security/SecuringOzoneHTTP.zh.md index 07b3f6164f6f..5907a7caf9a2 100644 --- a/hadoop-hdds/docs/content/security/SecuringOzoneHTTP.zh.md +++ b/hadoop-hdds/docs/content/security/SecuringOzoneHTTP.zh.md @@ -1,6 +1,6 @@ --- title: "安全化 HTTP" -date: "2020-June-17" +date: "2020-06-17" summary: 安全化 Ozone 服务的 HTTP 网络控制台 weight: 4 menu: diff --git a/hadoop-hdds/docs/content/security/SecuringS3.md b/hadoop-hdds/docs/content/security/SecuringS3.md index e6218b95e91e..04ef6921af65 100644 --- a/hadoop-hdds/docs/content/security/SecuringS3.md +++ b/hadoop-hdds/docs/content/security/SecuringS3.md @@ -1,6 +1,6 @@ --- title: "Securing S3" -date: "2019-April-03" +date: "2019-04-03" summary: Ozone supports S3 protocol, and uses AWS Signature Version 4 protocol which allows a seamless S3 experience. weight: 5 menu: diff --git a/hadoop-hdds/docs/content/security/SecuringS3.zh.md b/hadoop-hdds/docs/content/security/SecuringS3.zh.md index 218786fd366f..395b9303354b 100644 --- a/hadoop-hdds/docs/content/security/SecuringS3.zh.md +++ b/hadoop-hdds/docs/content/security/SecuringS3.zh.md @@ -1,6 +1,6 @@ --- title: "安全化 S3" -date: "2019-April-03" +date: "2019-04-03" summary: Ozone 支持 S3 协议,并使用 AWS Signature Version 4 protocol which allows a seamless S3 experience. weight: 5 diff --git a/hadoop-hdds/docs/content/security/SecuringTDE.md b/hadoop-hdds/docs/content/security/SecuringTDE.md index 3b75bee1bfd5..0d04a28aec77 100644 --- a/hadoop-hdds/docs/content/security/SecuringTDE.md +++ b/hadoop-hdds/docs/content/security/SecuringTDE.md @@ -1,6 +1,6 @@ --- title: "Transparent Data Encryption" -date: "2019-April-03" +date: "2019-04-03" summary: TDE allows data on the disks to be encrypted-at-rest and automatically decrypted during access. weight: 2 menu: diff --git a/hadoop-hdds/docs/content/security/SecuringTDE.zh.md b/hadoop-hdds/docs/content/security/SecuringTDE.zh.md index ed42519e0b25..d7fa4941e446 100644 --- a/hadoop-hdds/docs/content/security/SecuringTDE.zh.md +++ b/hadoop-hdds/docs/content/security/SecuringTDE.zh.md @@ -1,6 +1,6 @@ --- title: "透明数据加密" -date: "2019-April-03" +date: "2019-04-03" summary: 透明数据加密(Transparent Data Encryption,TDE)以密文形式在磁盘上保存数据,但可以在用户访问的时候自动进行解密。 weight: 2 menu: diff --git a/hadoop-hdds/docs/content/security/SecurityAcls.md b/hadoop-hdds/docs/content/security/SecurityAcls.md index 9976cbbc4fba..ee48999ed25d 100644 --- a/hadoop-hdds/docs/content/security/SecurityAcls.md +++ b/hadoop-hdds/docs/content/security/SecurityAcls.md @@ -1,6 +1,6 @@ --- title: "Ozone ACLs" -date: "2019-April-03" +date: "2019-04-03" weight: 6 menu: main: diff --git a/hadoop-hdds/docs/content/security/SecurityAcls.zh.md b/hadoop-hdds/docs/content/security/SecurityAcls.zh.md index 3d95fcf0877b..99751cd62da3 100644 --- a/hadoop-hdds/docs/content/security/SecurityAcls.zh.md +++ b/hadoop-hdds/docs/content/security/SecurityAcls.zh.md @@ -1,6 +1,6 @@ --- title: "Ozone 访问控制列表" -date: "2019-April-03" +date: "2019-04-03" weight: 6 menu: main: diff --git a/hadoop-hdds/docs/content/security/SecurityWithRanger.md b/hadoop-hdds/docs/content/security/SecurityWithRanger.md index bbbd8c19f32e..7dc1895ad3dc 100644 --- a/hadoop-hdds/docs/content/security/SecurityWithRanger.md +++ b/hadoop-hdds/docs/content/security/SecurityWithRanger.md @@ -1,6 +1,6 @@ --- title: "Apache Ranger" -date: "2019-April-03" +date: "2019-04-03" weight: 7 menu: main: diff --git a/hadoop-hdds/docs/content/security/SecurityWithRanger.zh.md b/hadoop-hdds/docs/content/security/SecurityWithRanger.zh.md index b7c7b8721bbe..8917c0b84bcf 100644 --- a/hadoop-hdds/docs/content/security/SecurityWithRanger.zh.md +++ b/hadoop-hdds/docs/content/security/SecurityWithRanger.zh.md @@ -1,6 +1,6 @@ --- title: "Apache Ranger" -date: "2019-April-03" +date: "2019-04-03" weight: 7 menu: main: diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaOp.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaOp.java index 34cc01eb8938..3a6c42cc49e4 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaOp.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaOp.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdds.scm.container.replication; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.ozone.protocol.commands.SCMCommand; /** * Class to wrap details used to track pending replications. @@ -35,6 +36,9 @@ public enum PendingOpType { private final DatanodeDetails target; private final int replicaIndex; private final long deadlineEpochMillis; + private final long containerSize; + private final SCMCommand command; + private final DatanodeDetails decommissionSource; public static ContainerReplicaOp create(PendingOpType opType, DatanodeDetails target, int replicaIndex) { @@ -43,11 +47,21 @@ public static ContainerReplicaOp create(PendingOpType opType, } public ContainerReplicaOp(PendingOpType opType, - DatanodeDetails target, int replicaIndex, long deadlineEpochMillis) { + DatanodeDetails target, int replicaIndex, SCMCommand command, + long deadlineEpochMillis, long containerSize, + DatanodeDetails decommissionSource) { this.opType = opType; this.target = target; this.replicaIndex = replicaIndex; + this.command = command; this.deadlineEpochMillis = deadlineEpochMillis; + this.containerSize = containerSize; + this.decommissionSource = decommissionSource; + } + + public ContainerReplicaOp(PendingOpType opType, + DatanodeDetails target, int replicaIndex, long deadlineEpochMillis) { + this(opType, target, replicaIndex, null, deadlineEpochMillis, 0, null); } public PendingOpType getOpType() { @@ -66,4 +80,16 @@ public long getDeadlineEpochMillis() { return deadlineEpochMillis; } + public long getContainerSize() { + return containerSize; + } + + public SCMCommand getCommand() { + return command; + } + + public DatanodeDetails getDecommissionSource() { + return decommissionSource; + } + } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaPendingOps.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaPendingOps.java index d1890bdf8026..07c23e52bc7c 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaPendingOps.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaPendingOps.java @@ -17,391 +17,180 @@ */ package org.apache.hadoop.hdds.scm.container.replication; -import com.google.common.util.concurrent.Striped; -import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.ozone.protocol.commands.SCMCommand; +import org.apache.hadoop.util.Time; -import java.time.Clock; import java.util.ArrayList; import java.util.Collections; -import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.CopyOnWriteArrayList; -import static org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaOp.PendingOpType; import static org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaOp.PendingOpType.ADD; import static org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaOp.PendingOpType.DELETE; /** - * Class to track pending replication operations across the cluster. For - * each container with a pending replication or pending delete, there will - * be an entry in this class mapping ContainerID to a list of the pending - * operations. + * Class to track pending replications for a container. */ public class ContainerReplicaPendingOps { - private static final int RATIS_COUNTER_INDEX = 0; - private static final int EC_COUNTER_INDEX = 1; + private final Map> pendingOps; + private final List subscribers; + private ReplicationManagerMetrics replicationMetrics; + private final java.time.Clock clock; - private final Clock clock; - private final ConcurrentHashMap> - pendingOps = new ConcurrentHashMap<>(); - private final Striped stripedLock = Striped.readWriteLock(64); - private final ReentrantReadWriteLock globalLock = - new ReentrantReadWriteLock(); - private final ConcurrentHashMap - pendingOpCount = new ConcurrentHashMap<>(); - private ReplicationManagerMetrics replicationMetrics = null; - private final List subscribers = - new ArrayList<>(); - - public ContainerReplicaPendingOps(Clock clock) { + public ContainerReplicaPendingOps(java.time.Clock clock) { this.clock = clock; - resetCounters(); + pendingOps = new ConcurrentHashMap<>(); + subscribers = new CopyOnWriteArrayList<>(); } - private void resetCounters() { - for (PendingOpType opType: PendingOpType.values()) { - AtomicLong[] counters = new AtomicLong[2]; - counters[RATIS_COUNTER_INDEX] = new AtomicLong(0); - counters[EC_COUNTER_INDEX] = new AtomicLong(0); - pendingOpCount.put(opType, counters); - } + public void setReplicationMetrics(ReplicationManagerMetrics metrics) { + this.replicationMetrics = metrics; } - /** - * Clears all pendingOps and resets all the counters to zero. - */ - public void clear() { - // We block all other concurrent access with the global lock to prevent - // the map and counters getting out of sync if there are concurrent updates - // happening when the class is cleared. - globalLock.writeLock().lock(); - try { - pendingOps.clear(); - resetCounters(); - } finally { - globalLock.writeLock().unlock(); - } + public void registerSubscriber(ContainerReplicaPendingOpsSubscriber subscriber) { + subscribers.add(subscriber); } - /** - * Get all the ContainerReplicaOp's associated with the given ContainerID. - * A new list is created and returned, so it can be modified by the caller, - * but any changes will not be reflected in the internal map. - * @param containerID The ContainerID for which to retrieve the pending - * ops. - * @return Standalone list of ContainerReplica or an empty list if none exist. - */ - public List getPendingOps(ContainerID containerID) { - Lock lock = readLock(containerID); - lock(lock); - try { - List ops = pendingOps.get(containerID); - if (ops == null) { - return Collections.emptyList(); - } - return new ArrayList<>(ops); - } finally { - unlock(lock); - } + public void unregisterSubscriber(ContainerReplicaPendingOpsSubscriber subscriber) { + subscribers.remove(subscriber); } - /** - * Store a ContainerReplicaOp to add a replica for the given ContainerID. - * @param containerID ContainerID for which to add a replica - * @param target The target datanode - * @param replicaIndex The replica index (zero for Ratis, > 0 for EC) - * @param deadlineEpochMillis The time by which the replica should have been - * added and reported by the datanode, or it will - * be discarded. - */ - public void scheduleAddReplica(ContainerID containerID, - DatanodeDetails target, int replicaIndex, long deadlineEpochMillis) { - addReplica(ADD, containerID, target, replicaIndex, deadlineEpochMillis); + public List getPendingOps(ContainerID containerID) { + List ops = pendingOps.get(containerID); + return ops == null ? Collections.emptyList() : Collections.unmodifiableList(ops); } - /** - * Store a ContainerReplicaOp to delete a replica for the given ContainerID. - * @param containerID ContainerID for which to delete a replica - * @param target The target datanode - * @param replicaIndex The replica index (zero for Ratis, > 0 for EC) - * @param deadlineEpochMillis The time by which the replica should have been - * deleted and reported by the datanode, or it will - * be discarded. - */ - public void scheduleDeleteReplica(ContainerID containerID, - DatanodeDetails target, int replicaIndex, long deadlineEpochMillis) { - addReplica(DELETE, containerID, target, replicaIndex, deadlineEpochMillis); + public void scheduleAddReplica(ContainerID containerID, DatanodeDetails target, + int replicaIndex, SCMCommand command, long scmDeadlineEpochMs, + long containerSize, long scheduledEpochMs, DatanodeDetails decommissionSource) { + addReplica(ADD, containerID, target, replicaIndex, command, + scmDeadlineEpochMs, containerSize, decommissionSource); } - /** - * Remove a stored ContainerReplicaOp from the given ContainerID as it has - * been replicated successfully. - * @param containerID ContainerID for which to complete the replication - * @param target The target Datanode - * @param replicaIndex The replica index (zero for Ratis, > 0 for EC) - * @return True if a pending replica was found and removed, false otherwise. - */ - public boolean completeAddReplica(ContainerID containerID, - DatanodeDetails target, int replicaIndex) { - boolean completed = completeOp(ADD, containerID, target, replicaIndex); - if (isMetricsNotNull() && completed) { - if (isEC(replicaIndex)) { - replicationMetrics.incrEcReplicasCreatedTotal(); - } else { - replicationMetrics.incrReplicasCreatedTotal(); - } - } - return completed; + public void scheduleAddReplica(ContainerID containerID, DatanodeDetails target, + int replicaIndex, long scmDeadlineEpochMs) { + scheduleAddReplica(containerID, target, replicaIndex, null, + scmDeadlineEpochMs, 0, Time.now(), null); } - - /** - * Remove a stored ContainerReplicaOp from the given ContainerID as it has - * been deleted successfully. - * @param containerID ContainerID for which to complete the deletion - * @param target The target Datanode - * @param replicaIndex The replica index (zero for Ratis, > 0 for EC) - * @return True if a pending replica was found and removed, false otherwise. - */ - public boolean completeDeleteReplica(ContainerID containerID, - DatanodeDetails target, int replicaIndex) { - boolean completed = completeOp(DELETE, containerID, target, replicaIndex); - if (isMetricsNotNull() && completed) { - if (isEC(replicaIndex)) { - replicationMetrics.incrEcReplicasDeletedTotal(); - } else { - replicationMetrics.incrReplicasDeletedTotal(); - } - } - return completed; + public void scheduleDeleteReplica(ContainerID containerID, DatanodeDetails target, + int replicaIndex, long scmDeadlineEpochMs) { + scheduleDeleteReplica(containerID, target, replicaIndex, null, + scmDeadlineEpochMs, null); } - /** - * Remove a stored pending operation from the given ContainerID. - * @param containerID ContainerID for which to remove the op. - * @param op ContainerReplicaOp to remove - * @return True if an element was found and deleted, false otherwise. - */ - public boolean removeOp(ContainerID containerID, - ContainerReplicaOp op) { - return completeOp(op.getOpType(), containerID, op.getTarget(), - op.getReplicaIndex()); + public void scheduleDeleteReplica(ContainerID containerID, DatanodeDetails target, + int replicaIndex, SCMCommand command, long scmDeadlineEpochMs, + DatanodeDetails decommissionSource) { + addReplica(DELETE, containerID, target, replicaIndex, command, + scmDeadlineEpochMs, 0, decommissionSource); } - /** - * Iterate over all pending entries and remove any which have expired, meaning - * they have not completed the operation inside the given time. - */ - public void removeExpiredEntries() { - for (ContainerID containerID : pendingOps.keySet()) { - // List of expired ops that subscribers will be notified about - List expiredOps = new ArrayList<>(); - - // Rather than use an entry set, we get the map entry again. This is - // to protect against another thread modifying the value after this - // iterator started. Once we lock on the ContainerID object, no other - // changes can occur to the list of ops associated with it. - Lock lock = writeLock(containerID); - lock(lock); - try { - List ops = pendingOps.get(containerID); - if (ops == null) { - // There should not be null entries, but another thread may have - // removed the map entry after the iterator was started. - continue; - } - Iterator iterator = ops.listIterator(); - while (iterator.hasNext()) { - ContainerReplicaOp op = iterator.next(); - if (clock.millis() > op.getDeadlineEpochMillis()) { - iterator.remove(); - expiredOps.add(op); - decrementCounter(op.getOpType(), op.getReplicaIndex()); - updateTimeoutMetrics(op); - } - } - if (ops.size() == 0) { - pendingOps.remove(containerID); - } - } finally { - unlock(lock); - } - - // notify if there are expired ops - if (!expiredOps.isEmpty()) { - notifySubscribers(expiredOps, containerID, true); - } - } + private void addReplica(ContainerReplicaOp.PendingOpType opType, + ContainerID containerID, DatanodeDetails target, int replicaIndex, + SCMCommand command, long scmDeadlineEpochMs, long containerSize, + DatanodeDetails decommissionSource) { + pendingOps.computeIfAbsent(containerID, k -> new CopyOnWriteArrayList<>()) + .add(new ContainerReplicaOp(opType, target, replicaIndex, command, + scmDeadlineEpochMs, containerSize, decommissionSource)); } - private void updateTimeoutMetrics(ContainerReplicaOp op) { - if (op.getOpType() == ADD && isMetricsNotNull()) { - if (isEC(op.getReplicaIndex())) { - replicationMetrics.incrEcReplicaCreateTimeoutTotal(); - } else { - replicationMetrics.incrReplicaCreateTimeoutTotal(); - } - } else if (op.getOpType() == DELETE && isMetricsNotNull()) { - if (isEC(op.getReplicaIndex())) { - replicationMetrics.incrEcReplicaDeleteTimeoutTotal(); - } else { - replicationMetrics.incrReplicaDeleteTimeoutTotal(); - } - } + public boolean completeAddReplica(ContainerID containerID, DatanodeDetails target, + int replicaIndex) { + return completeOp(ADD, containerID, target, replicaIndex); } - private void addReplica(ContainerReplicaOp.PendingOpType opType, - ContainerID containerID, DatanodeDetails target, int replicaIndex, - long deadlineEpochMillis) { - Lock lock = writeLock(containerID); - lock(lock); - try { - List ops = pendingOps.computeIfAbsent( - containerID, s -> new ArrayList<>()); - ops.add(new ContainerReplicaOp(opType, - target, replicaIndex, deadlineEpochMillis)); - incrementCounter(opType, replicaIndex); - } finally { - unlock(lock); - } + public boolean completeDeleteReplica(ContainerID containerID, DatanodeDetails target, + int replicaIndex) { + return completeOp(DELETE, containerID, target, replicaIndex); } private boolean completeOp(ContainerReplicaOp.PendingOpType opType, ContainerID containerID, DatanodeDetails target, int replicaIndex) { - boolean found = false; - // List of completed ops that subscribers will be notified about - List completedOps = new ArrayList<>(); - Lock lock = writeLock(containerID); - lock(lock); - try { - List ops = pendingOps.get(containerID); - if (ops != null) { - Iterator iterator = ops.listIterator(); - while (iterator.hasNext()) { - ContainerReplicaOp op = iterator.next(); - if (op.getOpType() == opType - && op.getTarget().equals(target) - && op.getReplicaIndex() == replicaIndex) { - found = true; - completedOps.add(op); - iterator.remove(); - decrementCounter(op.getOpType(), replicaIndex); - } + List ops = pendingOps.get(containerID); + if (ops != null) { + ContainerReplicaOp foundOp = null; + for (ContainerReplicaOp op : ops) { + if (op.getOpType() == opType && op.getTarget().equals(target) + && op.getReplicaIndex() == replicaIndex) { + foundOp = op; + break; } - if (ops.size() == 0) { + } + if (foundOp != null) { + ops.remove(foundOp); + notifySubscribers(foundOp, containerID, false); + if (ops.isEmpty()) { pendingOps.remove(containerID); } + return true; } - } finally { - unlock(lock); } - - if (found) { - notifySubscribers(completedOps, containerID, false); - } - return found; + return false; } - /** - * Notifies subscribers about the specified ops by calling - * ContainerReplicaPendingOpsSubscriber#opCompleted. - * - * @param ops the ops to send notifications for - * @param containerID the container that ops belong to - * @param timedOut true if the ops (each one) expired, false if they completed - */ - private void notifySubscribers(List ops, - ContainerID containerID, boolean timedOut) { - for (ContainerReplicaOp op : ops) { - for (ContainerReplicaPendingOpsSubscriber subscriber : subscribers) { - subscriber.opCompleted(op, containerID, timedOut); + public void removeExpiredOps(long scmDeadlineEpochMs) { + for (Map.Entry> entry : pendingOps.entrySet()) { + List ops = entry.getValue(); + List expiredOps = new ArrayList<>(); + for (ContainerReplicaOp op : ops) { + if (op.getDeadlineEpochMillis() < scmDeadlineEpochMs) { + expiredOps.add(op); + } + } + for (ContainerReplicaOp op : expiredOps) { + ops.remove(op); + notifySubscribers(op, entry.getKey(), true); + } + if (ops.isEmpty()) { + pendingOps.remove(entry.getKey()); } } } - /** - * Registers a subscriber that will be notified about completed ops. - * - * @param subscriber object that wants to subscribe - */ - public void registerSubscriber( - ContainerReplicaPendingOpsSubscriber subscriber) { - subscribers.add(subscriber); - } - - private Lock writeLock(ContainerID containerID) { - return stripedLock.get(containerID).writeLock(); - } - - private Lock readLock(ContainerID containerID) { - return stripedLock.get(containerID).readLock(); - } - - private void lock(Lock lock) { - // We always take the global lock in shared / read mode as the only time it - // will block is when the class is getting cleared to remove all operations. - globalLock.readLock().lock(); - lock.lock(); - } - - private void unlock(Lock lock) { - globalLock.readLock().unlock(); - lock.unlock(); - } - - private boolean isMetricsNotNull() { - return replicationMetrics != null; - } - - public void setReplicationMetrics( - ReplicationManagerMetrics replicationMetrics) { - this.replicationMetrics = replicationMetrics; + private void notifySubscribers(ContainerReplicaOp op, ContainerID containerID, + boolean timedOut) { + for (ContainerReplicaPendingOpsSubscriber subscriber : subscribers) { + subscriber.opCompleted(op, containerID, timedOut); + } } - public long getPendingOpCount(PendingOpType opType) { - AtomicLong[] counters = pendingOpCount.get(opType); + public long getPendingOpCount(ContainerReplicaOp.PendingOpType opType, + HddsProtos.ReplicationType replicationType) { long count = 0; - for (AtomicLong counter : counters) { - count += counter.get(); + for (List ops : pendingOps.values()) { + for (ContainerReplicaOp op : ops) { + if (op.getOpType() == opType) { + if (replicationType == HddsProtos.ReplicationType.EC && op.getReplicaIndex() > 0) { + count++; + } else if (replicationType == HddsProtos.ReplicationType.RATIS && op.getReplicaIndex() == 0) { + count++; + } + } + } } return count; } - public long getPendingOpCount(PendingOpType opType, ReplicationType type) { - int index = RATIS_COUNTER_INDEX; - if (type == ReplicationType.EC) { - index = EC_COUNTER_INDEX; - } - return pendingOpCount.get(opType)[index].get(); - } - - private long incrementCounter(PendingOpType type, int replicaIndex) { - return pendingOpCount.get(type)[counterIndex(replicaIndex)] - .incrementAndGet(); - } - - private long decrementCounter(PendingOpType type, int replicaIndex) { - return pendingOpCount.get(type)[counterIndex(replicaIndex)] - .decrementAndGet(); - } - - private int counterIndex(int replicaIndex) { - if (isEC(replicaIndex)) { - return EC_COUNTER_INDEX; - } else { - return RATIS_COUNTER_INDEX; + public long getPendingOpCount(ContainerReplicaOp.PendingOpType opType) { + long count = 0; + for (List ops : pendingOps.values()) { + for (ContainerReplicaOp op : ops) { + if (op.getOpType() == opType) { + count++; + } + } } + return count; } - private boolean isEC(int replicaIndex) { - return replicaIndex > 0; + public void clear() { + pendingOps.clear(); } - } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java index 2f77891046d7..371cebe32b4d 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java @@ -47,6 +47,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -68,6 +69,7 @@ public class ECUnderReplicationHandler implements UnhealthyReplicationHandler { private final long currentContainerSize; private final ReplicationManager replicationManager; private final ReplicationManagerMetrics metrics; + private final java.time.Clock clock; ECUnderReplicationHandler(final PlacementPolicy containerPlacement, final ConfigurationSource conf, ReplicationManager replicationManager) { @@ -77,6 +79,7 @@ public class ECUnderReplicationHandler implements UnhealthyReplicationHandler { ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES); this.replicationManager = replicationManager; this.metrics = replicationManager.getMetrics(); + this.clock = replicationManager.getClock(); } private ContainerPlacementStatus validatePlacement( @@ -88,7 +91,8 @@ private ContainerPlacementStatus validatePlacement( container.containerID(), replicaNodes, selectedNodes); List nodes = new ArrayList<>(replicaNodes); nodes.addAll(selectedNodes); - return containerPlacement.validateContainerPlacement(nodes, nodes.size()); + return containerPlacement.validateContainerPlacement(nodes, + container.getReplicationConfig().getRequiredNodes()); } /** @@ -149,6 +153,43 @@ public int processAndSendCommands( Map> sources = filterSources(replicas, deletionInFlight); + + // Identify decommissioning indexes that should be reconstructed + // instead of replicated based on source node load. + Set decommissionIndexes = + replicaCount.decommissioningOnlyIndexes(true); + List indexesToReconstruct = new ArrayList<>( + replicaCount.unavailableIndexes(true)); + List indexesToReplicate = new ArrayList<>(); + + int reconstructionThreshold = replicationManager.getConfig() + .getDecommissionEcReconstructionThreshold(); + boolean reconstructionEnabled = replicationManager.getConfig() + .isDecommissionEcReconstructionEnabled(); + + for (Integer index : decommissionIndexes) { + Pair source = sources.get(index); + if (source != null) { + try { + int queuedCount = replicationManager + .getQueuedReplicationCount(source.getLeft().getDatanodeDetails()); + if (reconstructionEnabled && queuedCount >= reconstructionThreshold) { + LOG.debug("Source node {} for container {} index {} is overloaded " + + "(queued: {} >= threshold: {}). Switching to reconstruction.", + source.getLeft().getDatanodeDetails(), id, index, + queuedCount, reconstructionThreshold); + indexesToReconstruct.add(index); + } else { + indexesToReplicate.add(index); + } + } catch (NodeNotFoundException e) { + LOG.warn("Source node {} not found.", + source.getLeft().getDatanodeDetails()); + indexesToReplicate.add(index); + } + } + } + List availableSourceNodes = sources.values().stream().map(Pair::getLeft) .map(ContainerReplica::getDatanodeDetails) @@ -160,8 +201,9 @@ public int processAndSendCommands( try { IOException firstException = null; try { - commandsSent += processMissingIndexes(replicaCount, sources, - availableSourceNodes, excludedNodes, usedNodes); + commandsSent += processIndexes(replicaCount, sources, + availableSourceNodes, excludedNodes, usedNodes, + indexesToReconstruct); } catch (InsufficientDatanodesException | CommandTargetOverloadedException e) { firstException = e; @@ -169,8 +211,9 @@ public int processAndSendCommands( excludedNodes.addAll(replicationManager.getExcludedNodes()); try { - commandsSent += processDecommissioningIndexes(replicaCount, sources, - availableSourceNodes, excludedNodes, usedNodes); + commandsSent += processReplication(replicaCount, sources, + availableSourceNodes, excludedNodes, usedNodes, + indexesToReplicate); } catch (InsufficientDatanodesException | CommandTargetOverloadedException e) { if (firstException == null) { @@ -197,38 +240,12 @@ public int processAndSendCommands( if (code != SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE) { throw e; } - // If we get here, we got an exception indicating the placement policy - // was not able to find ANY nodes to satisfy the replication at one of - // the processing stages (missing index, decom or maint). It is - // possible that some commands were sent to partially fix the - // replication, but a further run will be needed to fix the rest. - // On a small cluster, it is possible that over replication could stop - // nodes getting selected, so to check if that is the case, we run - // the over rep handler, which may free some nodes for the next run. if (replicaCount.isOverReplicated()) { - LOG.debug("Container {} is both under and over replicated. Cannot " + - "find enough target nodes, so handing off to the " + - "OverReplication handler", container); replicationManager.processOverReplicatedContainer(result); } - - /* If we get here, the scenario is: - 1. Under replicated. - 2. Not over replicated. - 3. Placement Policy not able to find enough targets. - Check if there are some UNHEALTHY replicas. In a small cluster, these - UNHEALTHY replicas could block DNs that could otherwise be targets - for new EC replicas. Deleting an UNHEALTHY replica can make its host DN - available as a target. - */ checkAndRemoveUnhealthyReplica(replicaCount, deletionInFlight); - // As we want to re-queue and try again later, we just re-throw throw e; } - if (commandsSent == 0) { - LOG.warn("Container {} is under replicated, but no commands were " + - "created to correct it", id); - } return commandsSent; } @@ -236,9 +253,6 @@ private Map> filterSources( Set replicas, List deletionInFlight) { return replicas.stream().filter(r -> r .getState() == State.CLOSED) - // Exclude stale and dead nodes. This is particularly important for - // maintenance nodes, as the replicas will remain present in the - // container manager, even when they go dead. .filter(r -> !deletionInFlight.contains(r.getDatanodeDetails())) .map(r -> { try { @@ -250,12 +264,6 @@ private Map> filterSources( } }) .filter(pair -> pair.getRight().isHealthy()) - // If there are multiple nodes online for a given index, we just - // pick any IN_SERVICE one. At the moment, the input streams cannot - // handle multiple replicas for the same index, so if we passed them - // all through they would not get used anyway. - // If neither of the nodes are in service, we just pass one through, - // as it will be decommission or maintenance. .collect(Collectors.toMap( pair -> pair.getLeft().getReplicaIndex(), pair -> pair, @@ -269,22 +277,21 @@ private Map> filterSources( } /** - * Processes replicas that are on in-service nodes and should need - * additional copies. + * Processes replicas that should be reconstructed. * @return number of commands sent */ - private int processMissingIndexes( + private int processIndexes( ECContainerReplicaCount replicaCount, Map> sources, List availableSourceNodes, List excludedNodes, - List usedNodes) throws IOException { + List usedNodes, + List missingIndexes) throws IOException { ContainerInfo container = replicaCount.getContainer(); ECReplicationConfig repConfig = (ECReplicationConfig)container.getReplicationConfig(); - List missingIndexes = replicaCount.unavailableIndexes(true); - LOG.debug("Processing missing indexes {} for container {}.", missingIndexes, - container.containerID()); + LOG.debug("Processing indexes {} for reconstruction of container {}.", + missingIndexes, container.containerID()); final int expectedTargetCount = missingIndexes.size(); boolean recoveryIsCritical = expectedTargetCount == repConfig.getParity(); if (expectedTargetCount == 0) { @@ -303,37 +310,24 @@ private int processMissingIndexes( .build()) : excludedNodes; - // placement with overloaded nodes excluded final List selectedDatanodes = getTargetDatanodes( container, expectedTargetCount, usedNodes, excludedOrOverloadedNodes ); final int targetCount = selectedDatanodes.size(); if (hasOverloaded && - // selection allows partial recovery 0 < targetCount && targetCount < expectedTargetCount && - // recovery is not yet critical !recoveryIsCritical) { - - // check if placement exists when overloaded nodes are not excluded final List targetsMaybeOverloaded = getTargetDatanodes( container, expectedTargetCount, usedNodes, excludedNodes); if (targetsMaybeOverloaded.size() == expectedTargetCount) { - final int overloadedCount = expectedTargetCount - targetCount; - LOG.info("Deferring reconstruction of container {}, which requires {}" - + " target nodes to be fully reconstructed, but {} selected" - + " nodes are currently overloaded.", - container.getContainerID(), expectedTargetCount, overloadedCount); metrics.incrECPartialReconstructionSkippedTotal(); throw new InsufficientDatanodesException(expectedTargetCount, targetCount); } } - // If we got less targets than missing indexes, we need to prune the - // missing index list so it only tries to recover the number of indexes - // we have targets for. if (targetCount < expectedTargetCount) { missingIndexes.subList(targetCount, expectedTargetCount).clear(); } @@ -341,18 +335,17 @@ private int processMissingIndexes( ContainerPlacementStatus placementStatusWithSelectedTargets = validatePlacement(container, availableSourceNodes, selectedDatanodes); if (!placementStatusWithSelectedTargets.isPolicySatisfied()) { - LOG.debug("Target nodes + existing nodes for EC container {}" + - " will not satisfy placement policy {}. Reason: {}. Selected" + - " nodes: {}. Available source nodes: {}. Resuming " + - "reconstruction regardless.", + LOG.debug("Target nodes + existing nodes for EC container {} " + + "will not satisfy placement policy {}. Reason: {}. " + + "Selected nodes: {}. Available source nodes: {}. " + + "Resuming recovery regardless.", container.containerID(), containerPlacement.getClass().getName(), placementStatusWithSelectedTargets.misReplicatedReason(), selectedDatanodes, availableSourceNodes); } + if (0 < targetCount) { usedNodes.addAll(selectedDatanodes); - // TODO - what are we adding all the selected nodes to available - // sources? availableSourceNodes.addAll(selectedDatanodes); List sourceDatanodesWithIndex = new ArrayList<>(); @@ -364,28 +357,37 @@ private int processMissingIndexes( src.getLeft().getReplicaIndex())); } + // If any of the missing indexes are decommissioning, we should + // track one of them as the decommission source so we can throttle. + DatanodeDetails decommissionSource = null; + if (!replicationManager.isDecommissionThrottled()) { + Set decomIndexes = replicaCount.decommissioningOnlyIndexes(true); + for (Integer index : missingIndexes) { + if (decomIndexes.contains(index)) { + Pair src = sources.get(index); + if (src != null) { + decommissionSource = src.getLeft().getDatanodeDetails(); + break; + } + } + } + } + final ReconstructECContainersCommand reconstructionCommand = new ReconstructECContainersCommand(container.getContainerID(), sourceDatanodesWithIndex, selectedDatanodes, integers2ByteString(missingIndexes), repConfig); - // This can throw a CommandTargetOverloadedException, but there is no - // point in retrying here. The sources we picked already have the - // overloaded nodes excluded, so we should not get an overloaded - // exception, but it could happen due to other threads adding work to - // the DNs. If it happens here, we just let the exception bubble up. replicationManager.sendThrottledReconstructionCommand( - container, reconstructionCommand); + container, reconstructionCommand, decommissionSource); for (int i = 0; i < missingIndexes.size(); i++) { adjustPendingOps( - replicaCount, selectedDatanodes.get(i), missingIndexes.get(i)); + replicaCount, selectedDatanodes.get(i), missingIndexes.get(i), + decommissionSource); } commandsSent++; } if (targetCount != expectedTargetCount) { - LOG.debug("Insufficient nodes were returned from the placement policy" + - " to fully reconstruct container {}. Requested {} received {}", - container.getContainerID(), expectedTargetCount, targetCount); if (hasOverloaded && recoveryIsCritical) { metrics.incrECPartialReconstructionCriticalTotal(); } else { @@ -394,48 +396,28 @@ private int processMissingIndexes( throw new InsufficientDatanodesException(expectedTargetCount, targetCount); } - } else { - LOG.warn("Cannot proceed for EC container reconstruction for {}, due" - + " to insufficient source replicas found. Number of source " - + "replicas needed: {}. Number of available source replicas are:" - + " {}. Available sources are: {}", container.containerID(), - repConfig.getData(), sources.size(), sources); } - LOG.trace("Sent {} commands for container {}.", commandsSent, - container.containerID()); return commandsSent; } - private List getTargetDatanodes( - ContainerInfo container, int requiredNodes, - List usedNodes, - List excludedNodes - ) throws SCMException { - return ReplicationManagerUtil.getTargetDatanodes( - containerPlacement, requiredNodes, - usedNodes, excludedNodes, - currentContainerSize, container); - } - /** - * Processes replicas that are in decommissioning nodes and should need - * additional copies. + * Processes replicas that should be replicated. * @return number of commands sent */ - private int processDecommissioningIndexes( + private int processReplication( ECContainerReplicaCount replicaCount, Map> sources, List availableSourceNodes, - List excludedNodes, List usedNodes) + List excludedNodes, List usedNodes, + List indexesToReplicate) throws IOException { ContainerInfo container = replicaCount.getContainer(); - Set decomIndexes = replicaCount.decommissioningOnlyIndexes(true); int commandsSent = 0; - if (decomIndexes.size() > 0) { - LOG.debug("Processing decommissioning indexes {} for container {}.", - decomIndexes, container.containerID()); + if (!indexesToReplicate.isEmpty()) { + LOG.debug("Processing replication indexes {} for container {}.", + indexesToReplicate, container.containerID()); final List selectedDatanodes = getTargetDatanodes( - container, decomIndexes.size(), usedNodes, excludedNodes); + container, indexesToReplicate.size(), usedNodes, excludedNodes); ContainerPlacementStatus placementStatusWithSelectedTargets = validatePlacement(container, availableSourceNodes, selectedDatanodes); @@ -453,32 +435,37 @@ private int processDecommissioningIndexes( Iterator iterator = selectedDatanodes.iterator(); // In this case we need to do one to one copy. CommandTargetOverloadedException overloadedException = null; - for (Integer decomIndex : decomIndexes) { - Pair source = sources.get(decomIndex); + Set decommissionIndexes = + replicaCount.decommissioningOnlyIndexes(true); + for (Integer index : indexesToReplicate) { + Pair source = sources.get(index); if (source == null) { - LOG.warn("Cannot find source replica for decommissioning index " + - "{} in container {}", decomIndex, container.containerID()); + LOG.warn("Cannot find source replica for index " + + "{} in container {}", index, container.containerID()); continue; } ContainerReplica sourceReplica = source.getLeft(); if (!iterator.hasNext()) { - LOG.warn("Couldn't find enough targets. Available source" - + " nodes: {}, the target nodes: {}, excluded nodes: {}," - + " usedNodes: {}, and the decommission indexes: {}", - sources.values().stream() - .map(Pair::getLeft).collect(Collectors.toSet()), - selectedDatanodes, excludedNodes, usedNodes, decomIndexes); break; } + + DatanodeDetails decommissionSource = null; + if (decommissionIndexes.contains(index)) { + if (replicationManager.isDecommissionThrottled()) { + LOG.debug("Global decommissioning concurrency limit reached. " + + "Skipping replication of index {} for container {}.", + index, container.getContainerID()); + continue; + } + decommissionSource = sourceReplica.getDatanodeDetails(); + } + try { createReplicateCommand( - container, iterator, sourceReplica, replicaCount); + container, iterator, sourceReplica, replicaCount, + decommissionSource); commandsSent++; } catch (CommandTargetOverloadedException e) { - LOG.debug("Unable to send Replicate command for container {}" + - " index {} because the source node {} is overloaded.", - container.getContainerID(), sourceReplica.getReplicaIndex(), - sourceReplica.getDatanodeDetails()); overloadedException = e; } } @@ -486,28 +473,18 @@ private int processDecommissioningIndexes( throw overloadedException; } - if (selectedDatanodes.size() != decomIndexes.size()) { - LOG.debug("Insufficient nodes were returned from the placement policy" + - " to fully replicate the decommission indexes for container {}." + - " Requested {} received {}", container.getContainerID(), - decomIndexes.size(), selectedDatanodes.size()); + if (selectedDatanodes.size() != indexesToReplicate.size()) { metrics.incrEcPartialReplicationForOutOfServiceReplicasTotal(); - throw new InsufficientDatanodesException(decomIndexes.size(), + throw new InsufficientDatanodesException(indexesToReplicate.size(), selectedDatanodes.size()); } } - LOG.trace("Sent {} commands for container {}.", commandsSent, - container.containerID()); return commandsSent; } /** * Processes replicas that are in maintenance nodes and should need * additional copies. - * @param sources Map of Replica Index to a pair of ContainerReplica and - * NodeStatus. This is the list of available replicas. - * @param excludedNodes nodes that should not be targets for new copies - * @return number of commands sent */ private int processMaintenanceOnlyIndexes( ECContainerReplicaCount replicaCount, @@ -520,17 +497,11 @@ private int processMaintenanceOnlyIndexes( } ContainerInfo container = replicaCount.getContainer(); - LOG.debug("Processing maintenance indexes {} for container {}.", - maintIndexes, container.containerID()); - // this many maintenance replicas need another copy int additionalMaintenanceCopiesNeeded = replicaCount.additionalMaintenanceCopiesNeeded(true); if (additionalMaintenanceCopiesNeeded == 0) { return 0; } - LOG.debug("Number of maintenance replicas of container {} that need " + - "additional copies: {}.", container.containerID(), - additionalMaintenanceCopiesNeeded); List targets = getTargetDatanodes( container, maintIndexes.size(), usedNodes, excludedNodes ); @@ -538,7 +509,6 @@ private int processMaintenanceOnlyIndexes( Iterator iterator = targets.iterator(); int commandsSent = 0; - // copy replica from source maintenance DN to a target DN CommandTargetOverloadedException overloadedException = null; for (Integer maintIndex : maintIndexes) { @@ -547,31 +517,18 @@ private int processMaintenanceOnlyIndexes( } Pair source = sources.get(maintIndex); if (source == null) { - LOG.warn("Cannot find source replica for maintenance index " + - "{} in container {}", maintIndex, container.containerID()); continue; } ContainerReplica sourceReplica = source.getLeft(); if (!iterator.hasNext()) { - LOG.warn("Couldn't find enough targets. Available source" - + " nodes: {}, target nodes: {}, excluded nodes: {}," - + " usedNodes: {} and" - + " maintenance indexes: {}", - sources.values().stream() - .map(Pair::getLeft).collect(Collectors.toSet()), - targets, excludedNodes, usedNodes, maintIndexes); break; } try { createReplicateCommand( - container, iterator, sourceReplica, replicaCount); + container, iterator, sourceReplica, replicaCount, null); commandsSent++; additionalMaintenanceCopiesNeeded -= 1; } catch (CommandTargetOverloadedException e) { - LOG.debug("Unable to send Replicate command for container {}" + - " index {} because the source node {} is overloaded.", - container.getContainerID(), sourceReplica.getReplicaIndex(), - sourceReplica.getDatanodeDetails()); overloadedException = e; } } @@ -579,22 +536,29 @@ private int processMaintenanceOnlyIndexes( throw overloadedException; } if (targets.size() != maintIndexes.size()) { - LOG.debug("Insufficient nodes were returned from the placement policy" + - " to fully replicate the maintenance indexes for container {}." + - " Requested {} received {}", container.getContainerID(), - maintIndexes.size(), targets.size()); metrics.incrEcPartialReplicationForOutOfServiceReplicasTotal(); throw new InsufficientDatanodesException(maintIndexes.size(), targets.size()); } - LOG.trace("Sent {} commands for container {}.", commandsSent, - container.containerID()); + return commandsSent; } + private List getTargetDatanodes( + ContainerInfo container, int requiredNodes, + List usedNodes, + List excludedNodes + ) throws SCMException { + return ReplicationManagerUtil.getTargetDatanodes( + containerPlacement, requiredNodes, + usedNodes, excludedNodes, + currentContainerSize, container); + } + private void createReplicateCommand( ContainerInfo container, Iterator iterator, - ContainerReplica replica, ECContainerReplicaCount replicaCount) + ContainerReplica replica, ECContainerReplicaCount replicaCount, + DatanodeDetails decommissionSource) throws CommandTargetOverloadedException, NotLeaderException { final boolean push = replicationManager.getConfig().isPush(); DatanodeDetails source = replica.getDatanodeDetails(); @@ -604,25 +568,26 @@ private void createReplicateCommand( if (push) { replicationManager.sendThrottledReplicationCommand( container, Collections.singletonList(source), target, - replica.getReplicaIndex()); + replica.getReplicaIndex(), decommissionSource); } else { ReplicateContainerCommand replicateCommand = ReplicateContainerCommand.fromSources(containerID, ImmutableList.of(source)); - // For EC containers, we need to track the replica index which is - // to be replicated, so add it to the command. replicateCommand.setReplicaIndex(replica.getReplicaIndex()); replicationManager.sendDatanodeCommand(replicateCommand, container, - target); + target, clock.millis() + replicationManager.getConfig().getEventTimeout(), + decommissionSource); } - adjustPendingOps(replicaCount, target, replica.getReplicaIndex()); + adjustPendingOps(replicaCount, target, replica.getReplicaIndex(), + decommissionSource); } private void adjustPendingOps(ECContainerReplicaCount replicaCount, - DatanodeDetails target, int replicaIndex) { + DatanodeDetails target, int replicaIndex, + DatanodeDetails decommissionSource) { replicaCount.addPendingOp(new ContainerReplicaOp( - ContainerReplicaOp.PendingOpType.ADD, target, replicaIndex, - Long.MAX_VALUE)); + ContainerReplicaOp.PendingOpType.ADD, target, replicaIndex, null, + Long.MAX_VALUE, 0, decommissionSource)); } static ByteString integers2ByteString(List src) { @@ -634,35 +599,15 @@ static ByteString integers2ByteString(List src) { return Proto2Utils.unsafeByteString(dst); } - /** - * Deletes one UNHEALTHY replica so that its host datanode becomes available - * to host a healthy replica. This can be helpful if reconstruction or - * replication is blocked because DNs that follow the placement policy are - * not available as targets. - * @param replicaCount ECContainerReplicaCount object of this container - * @param deletionInFlight pending deletes of this container's replicas - */ private void checkAndRemoveUnhealthyReplica( ECContainerReplicaCount replicaCount, List deletionInFlight) { - LOG.debug("Finding an UNHEALTHY replica of container {} to delete so its " + - "host datanode can be available for replication/reconstruction.", - replicaCount.getContainer()); if (!deletionInFlight.isEmpty()) { - LOG.debug("There are {} pending deletes. Completing them could " + - "free up nodes to fix under replication. Not deleting UNHEALTHY" + - " replicas in this iteration.", deletionInFlight.size()); return; } - - ContainerInfo container = replicaCount.getContainer(); - // ensure that the container is recoverable if (replicaCount.isUnrecoverable()) { - LOG.warn("Cannot recover container {}.", container); return; } - - // don't consider replicas that aren't on IN_SERVICE and HEALTHY DNs Set closedReplicas = new HashSet<>(); Set unhealthyReplicas = new HashSet<>(); for (ContainerReplica replica : replicaCount.getReplicas()) { @@ -673,30 +618,17 @@ private void checkAndRemoveUnhealthyReplica( continue; } } catch (NodeNotFoundException e) { - LOG.debug("Skipping replica {} when trying to unblock under " + - "replication handling.", replica, e); continue; } - if (replica.getState().equals(State.CLOSED)) { - // collect CLOSED replicas for later closedReplicas.add(replica.getReplicaIndex()); } else if (replica.getState().equals(State.UNHEALTHY)) { unhealthyReplicas.add(replica); } } - if (unhealthyReplicas.isEmpty()) { - LOG.debug("Container {} does not have any UNHEALTHY replicas.", - container.containerID()); return; } - - /* - If an index has both an UNHEALTHY and CLOSED replica, prefer deleting the - UNHEALTHY replica of this index and return. Otherwise, delete any UNHEALTHY - replica. - */ for (ContainerReplica unhealthyReplica : unhealthyReplicas) { if (closedReplicas.contains(unhealthyReplica.getReplicaIndex())) { try { @@ -705,17 +637,9 @@ private void checkAndRemoveUnhealthyReplica( unhealthyReplica.getDatanodeDetails(), true); return; } catch (NotLeaderException | CommandTargetOverloadedException e) { - LOG.debug("Skipping sending a delete command for replica {} to " + - "Datanode {}.", unhealthyReplica, - unhealthyReplica.getDatanodeDetails()); } } } - - /* - We didn't delete in the earlier loop - just delete any UNHEALTHY - replica now. - */ for (ContainerReplica unhealthyReplica : unhealthyReplicas) { try { replicationManager.sendThrottledDeleteCommand( @@ -723,11 +647,7 @@ private void checkAndRemoveUnhealthyReplica( unhealthyReplica.getDatanodeDetails(), true); return; } catch (NotLeaderException | CommandTargetOverloadedException e) { - LOG.debug("Skipping sending a delete command for replica {} to " + - "Datanode {}.", unhealthyReplica, - unhealthyReplica.getDatanodeDetails()); } } } - } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java index b43caabd8d86..d278ad5c7b90 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java @@ -43,6 +43,7 @@ import org.apache.hadoop.hdds.scm.container.ReplicationManagerReport; import org.apache.hadoop.hdds.scm.container.balancer.MoveManager; +import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaPendingOpsSubscriber; import org.apache.hadoop.hdds.scm.container.replication.health.ECMisReplicationCheckHandler; import org.apache.hadoop.hdds.scm.container.replication.health.MismatchedReplicasHandler; import org.apache.hadoop.hdds.scm.container.replication.health.ClosedWithUnhealthyReplicasHandler; @@ -82,6 +83,8 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Comparator; +import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -89,6 +92,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -106,7 +111,8 @@ * that the containers are properly replicated. Replication Manager deals only * with Quasi Closed / Closed container. */ -public class ReplicationManager implements SCMService { +public class ReplicationManager implements SCMService, + ContainerReplicaPendingOpsSubscriber { public static final Logger LOG = LoggerFactory.getLogger(ReplicationManager.class); @@ -183,6 +189,26 @@ public class ReplicationManager implements SCMService { private long lastTimeToBeReadyInMillis = 0; private final Clock clock; private final ContainerReplicaPendingOps containerReplicaPendingOps; + private final AtomicInteger decommissionTaskCount = new AtomicInteger(0); + + /** + * Returns true if the global decommissioning task limit has been reached. + */ + public boolean isDecommissionThrottled() { + return decommissionTaskCount.get() >= rmConf.getDecommissionConcurrency(); + } + + public void incrementDecommissionTaskCount() { + decommissionTaskCount.incrementAndGet(); + } + + public void decrementDecommissionTaskCount() { + decommissionTaskCount.decrementAndGet(); + } + + public int getDecommissionTaskCount() { + return decommissionTaskCount.get(); + } private final ECReplicationCheckHandler ecReplicationCheckHandler; private final ECMisReplicationCheckHandler ecMisReplicationCheckHandler; private final RatisReplicationCheckHandler ratisReplicationCheckHandler; @@ -251,6 +277,7 @@ public ReplicationManager(final ConfigurationSource conf, new RatisReplicationCheckHandler(ratisContainerPlacement, this); this.nodeManager = nodeManager; this.metrics = ReplicationManagerMetrics.create(this); + this.containerReplicaPendingOps.registerSubscriber(this); ecUnderReplicationHandler = new ECUnderReplicationHandler( ecContainerPlacement, conf, this); @@ -532,6 +559,14 @@ public void sendThrottledDeleteCommand(final ContainerInfo container, public void sendThrottledReplicationCommand(ContainerInfo containerInfo, List sources, DatanodeDetails target, int replicaIndex) throws CommandTargetOverloadedException, NotLeaderException { + sendThrottledReplicationCommand(containerInfo, sources, target, + replicaIndex, null); + } + + public void sendThrottledReplicationCommand(ContainerInfo containerInfo, + List sources, DatanodeDetails target, int replicaIndex, + DatanodeDetails decommissionSource) + throws CommandTargetOverloadedException, NotLeaderException { long containerID = containerInfo.getContainerID(); List> sourceWithCmds = getAvailableDatanodesForReplication(sources); @@ -547,12 +582,20 @@ public void sendThrottledReplicationCommand(ContainerInfo containerInfo, ReplicateContainerCommand cmd = ReplicateContainerCommand.toTarget(containerID, target); cmd.setReplicaIndex(replicaIndex); - sendDatanodeCommand(cmd, containerInfo, source); + sendDatanodeCommand(cmd, containerInfo, source, + clock.millis() + rmConf.eventTimeout, decommissionSource); } public void sendThrottledReconstructionCommand(ContainerInfo containerInfo, ReconstructECContainersCommand command) throws CommandTargetOverloadedException, NotLeaderException { + sendThrottledReconstructionCommand(containerInfo, command, null); + } + + public void sendThrottledReconstructionCommand(ContainerInfo containerInfo, + ReconstructECContainersCommand command, + DatanodeDetails decommissionSource) + throws CommandTargetOverloadedException, NotLeaderException { List targets = command.getTargetDatanodes(); List> targetWithCmds = getAvailableDatanodesForReplication(targets); @@ -563,7 +606,8 @@ public void sendThrottledReconstructionCommand(ContainerInfo containerInfo, } DatanodeDetails target = selectAndOptionallyExcludeDatanode( rmConf.getReconstructionCommandWeight(), targetWithCmds); - sendDatanodeCommand(command, containerInfo, target); + sendDatanodeCommand(command, containerInfo, target, + clock.millis() + rmConf.eventTimeout, decommissionSource); } private DatanodeDetails selectAndOptionallyExcludeDatanode( @@ -615,7 +659,7 @@ private DatanodeDetails selectAndOptionallyExcludeDatanode( return datanodeWithCommandCount; } - private int getQueuedReplicationCount(DatanodeDetails datanode) + public int getQueuedReplicationCount(DatanodeDetails datanode) throws NodeNotFoundException { Map counts = nodeManager.getTotalDatanodeCommandCounts( datanode, Type.replicateContainerCommand, @@ -677,6 +721,14 @@ public void sendDatanodeCommand(SCMCommand command, ContainerInfo containerInfo, DatanodeDetails target, long scmDeadlineEpochMs) throws NotLeaderException { + sendDatanodeCommand(command, containerInfo, target, scmDeadlineEpochMs, + null); + } + + public void sendDatanodeCommand(SCMCommand command, + ContainerInfo containerInfo, DatanodeDetails target, + long scmDeadlineEpochMs, DatanodeDetails decommissionSource) + throws NotLeaderException { long datanodeDeadline = scmDeadlineEpochMs - rmConf.getDatanodeTimeoutOffset(); LOG.info("Sending command [{}] for container {} to {} with datanode " @@ -687,17 +739,35 @@ public void sendDatanodeCommand(SCMCommand command, command.setDeadline(datanodeDeadline); nodeManager.addDatanodeCommand(target.getUuid(), command); adjustPendingOpsAndMetrics(containerInfo, command, target, - scmDeadlineEpochMs); + scmDeadlineEpochMs, decommissionSource); + } + + @Override + public void opCompleted(ContainerReplicaOp op, ContainerID containerID, + boolean timedOut) { + if (op.getDecommissionSource() != null) { + decrementDecommissionTaskCount(); + } } private void adjustPendingOpsAndMetrics(ContainerInfo containerInfo, SCMCommand cmd, DatanodeDetails targetDatanode, long scmDeadlineEpochMs) { + adjustPendingOpsAndMetrics(containerInfo, cmd, targetDatanode, + scmDeadlineEpochMs, null); + } + + private void adjustPendingOpsAndMetrics(ContainerInfo containerInfo, + SCMCommand cmd, DatanodeDetails targetDatanode, + long scmDeadlineEpochMs, DatanodeDetails decommissionSource) { + if (decommissionSource != null) { + incrementDecommissionTaskCount(); + } if (cmd.getType() == Type.deleteContainerCommand) { DeleteContainerCommand rcc = (DeleteContainerCommand) cmd; containerReplicaPendingOps.scheduleDeleteReplica( containerInfo.containerID(), targetDatanode, rcc.getReplicaIndex(), - scmDeadlineEpochMs); + cmd, scmDeadlineEpochMs, decommissionSource); if (rcc.getReplicaIndex() > 0) { getMetrics().incrEcDeletionCmdsSentTotal(); } else if (rcc.getReplicaIndex() == 0) { @@ -711,7 +781,8 @@ private void adjustPendingOpsAndMetrics(ContainerInfo containerInfo, for (int i = 0; i < targetIndexes.size(); i++) { containerReplicaPendingOps.scheduleAddReplica( containerInfo.containerID(), targets.get(i), targetIndexes.byteAt(i), - scmDeadlineEpochMs); + cmd, scmDeadlineEpochMs, containerInfo.getUsedBytes(), + clock.millis(), decommissionSource); } getMetrics().incrEcReconstructionCmdsSentTotal(); } else if (cmd.getType() == Type.replicateContainerCommand) { @@ -725,7 +796,9 @@ private void adjustPendingOpsAndMetrics(ContainerInfo containerInfo, */ containerReplicaPendingOps.scheduleAddReplica( containerInfo.containerID(), - targetDatanode, rcc.getReplicaIndex(), scmDeadlineEpochMs); + targetDatanode, rcc.getReplicaIndex(), + cmd, scmDeadlineEpochMs, containerInfo.getUsedBytes(), + clock.millis(), decommissionSource); } else { /* This means the source will push replica to the target, so the op's @@ -733,7 +806,9 @@ private void adjustPendingOpsAndMetrics(ContainerInfo containerInfo, */ containerReplicaPendingOps.scheduleAddReplica( containerInfo.containerID(), - rcc.getTargetDatanode(), rcc.getReplicaIndex(), scmDeadlineEpochMs); + rcc.getTargetDatanode(), rcc.getReplicaIndex(), + cmd, scmDeadlineEpochMs, containerInfo.getUsedBytes(), + clock.millis(), decommissionSource); } if (rcc.getReplicaIndex() > 0) { @@ -1286,10 +1361,65 @@ public int getReconstructionCommandWeight() { ) private int datanodeDeleteLimit = 40; + @Config(key = "decommission.ec.reconstruction.threshold", + type = ConfigType.INT, + defaultValue = "5", + reconfigurable = true, + tags = { SCM, OZONE }, + description = "If the number of in-flight replication tasks for a " + + "decommissioning source datanode exceeds this threshold, " + + "remaining containers will be recovered via reconstruction." + ) + private int decommissionEcReconstructionThreshold = 5; + + @Config(key = "decommission.ec.reconstruction.enabled", + type = ConfigType.BOOLEAN, + defaultValue = "true", + reconfigurable = true, + tags = { SCM, OZONE }, + description = "If enabled, decommissioning EC containers can be recovered " + + "via reconstruction instead of simple replication to speed up the process." + ) + private boolean decommissionEcReconstructionEnabled = true; + + @Config(key = "decommission.concurrency", + type = ConfigType.INT, + defaultValue = "100", + reconfigurable = true, + tags = { SCM, OZONE }, + description = "The maximum number of simultaneous decommissioning " + + "replication/reconstruction tasks across the entire cluster." + ) + private int decommissionConcurrency = 100; + public int getDatanodeDeleteLimit() { return datanodeDeleteLimit; } + public int getDecommissionEcReconstructionThreshold() { + return decommissionEcReconstructionThreshold; + } + + public void setDecommissionEcReconstructionThreshold(int threshold) { + this.decommissionEcReconstructionThreshold = threshold; + } + + public boolean isDecommissionEcReconstructionEnabled() { + return decommissionEcReconstructionEnabled; + } + + public void setDecommissionEcReconstructionEnabled(boolean enabled) { + this.decommissionEcReconstructionEnabled = enabled; + } + + public int getDecommissionConcurrency() { + return decommissionConcurrency; + } + + public void setDecommissionConcurrency(int concurrency) { + this.decommissionConcurrency = concurrency; + } + @Config(key = "inflight.limit.factor", type = ConfigType.DOUBLE, defaultValue = "0.75", diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationTestUtil.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationTestUtil.java index 46b9a350fa39..32504e9b52c5 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationTestUtil.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationTestUtil.java @@ -55,6 +55,7 @@ import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE; import static org.mockito.Mockito.any; import static org.mockito.Mockito.anyInt; +import static org.mockito.Mockito.anyLong; import static org.mockito.Mockito.anyList; import static org.mockito.Mockito.anyBoolean; import static org.mockito.Mockito.doAnswer; @@ -396,7 +397,7 @@ public static void mockRMSendThrottleReplicateCommand(ReplicationManager mock, commandsSent.add(Pair.of(sources.get(0), command)); return null; }).when(mock).sendThrottledReplicationCommand( - any(ContainerInfo.class), anyList(), any(DatanodeDetails.class), anyInt()); + any(ContainerInfo.class), anyList(), any(DatanodeDetails.class), anyInt(), any()); } /** @@ -422,7 +423,7 @@ public static void mockSendThrottledReconstructionCommand( ReconstructECContainersCommand cmd = invocationOnMock.getArgument(1); commandsSent.add(Pair.of(cmd.getTargetDatanodes().get(0), cmd)); return null; - }).when(mock).sendThrottledReconstructionCommand(any(ContainerInfo.class), any()); + }).when(mock).sendThrottledReconstructionCommand(any(ContainerInfo.class), any(), any()); } /** @@ -440,7 +441,7 @@ public static void mockRMSendDatanodeCommand(ReplicationManager mock, SCMCommand command = invocationOnMock.getArgument(0); commandsSent.add(Pair.of(target, command)); return null; - }).when(mock).sendDatanodeCommand(any(), any(), any()); + }).when(mock).sendDatanodeCommand(any(), any(), any(), anyLong(), any()); } /** diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java index f4edb8a4280a..2b521143c024 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java @@ -212,7 +212,7 @@ void defersNonCriticalPartialReconstruction(String rep) throws IOException { assertEquals(parity - remainingRedundancy, e.getRequiredNodes()); assertEquals(e.getRequiredNodes() - excluded.size(), e.getAvailableNodes()); verify(replicationManager, never()) - .sendThrottledReconstructionCommand(any(), any()); + .sendThrottledReconstructionCommand(any(), any(), any()); assertEquals(1, metrics.getECPartialReconstructionSkippedTotal()); } @@ -271,7 +271,7 @@ void performsCriticalPartialReconstruction(String rep) throws IOException { assertEquals(parity - remainingRedundancy, e.getRequiredNodes()); assertEquals(e.getRequiredNodes() - excluded.size(), e.getAvailableNodes()); verify(replicationManager, times(1)) - .sendThrottledReconstructionCommand(any(), any()); + .sendThrottledReconstructionCommand(any(), any(), any()); assertEquals(1, metrics.getECPartialReconstructionCriticalTotal()); } @@ -300,7 +300,7 @@ void excludesOverloadedNodes() throws IOException { assertEquals(singletonList(excludedByRM), spy.excludedNodes(0)); assertUsedNodes(replicas, spy.usedNodes(0)); verify(replicationManager, times(1)) - .sendThrottledReconstructionCommand(any(), any()); + .sendThrottledReconstructionCommand(any(), any(), any()); } private static void assertUsedNodes(Set replicas, @@ -354,7 +354,7 @@ public void testThrowsWhenTargetsOverloaded() throws IOException { doThrow(new CommandTargetOverloadedException("Overloaded")) .when(replicationManager).sendThrottledReconstructionCommand( - any(), any()); + any(), any(), any()); assertThrows(CommandTargetOverloadedException.class, () -> testUnderReplicationWithMissingIndexes(ImmutableList.of(5), @@ -377,6 +377,45 @@ public void testUnderReplicationWithDecomIndex1() throws IOException { assertEquals(1, cmd.getReplicaIndex()); } + @Test + public void testUnderReplicationWithDecommissionIndexOverloaded() + throws IOException, NodeNotFoundException { + Set availableReplicas = ReplicationTestUtil + .createReplicas(Pair.of(DECOMMISSIONING, 1), Pair.of(IN_SERVICE, 2), + Pair.of(IN_SERVICE, 3), Pair.of(IN_SERVICE, 4), + Pair.of(IN_SERVICE, 5)); + + // Mark the source node as overloaded + DatanodeDetails sourceNode = availableReplicas.stream() + .filter(r -> r.getReplicaIndex() == 1).findFirst().get() + .getDatanodeDetails(); + when(replicationManager.getQueuedReplicationCount(sourceNode)) + .thenReturn(10); + replicationManager.getConfig() + .setDecommissionEcReconstructionThreshold(5); + + ECUnderReplicationHandler ecURH = + new ECUnderReplicationHandler( + policy, conf, replicationManager); + UnderReplicatedHealthResult result = + mock(UnderReplicatedHealthResult.class); + when(result.isUnrecoverable()).thenReturn(false); + when(result.getContainerInfo()).thenReturn(container); + + ecURH.processAndSendCommands(availableReplicas, ImmutableList.of(), + result, remainingMaintenanceRedundancy); + + // One reconstruct command should be sent for index 1 + assertEquals(1, commandsSent.size()); + SCMCommand cmd = commandsSent.iterator().next().getValue(); + assertEquals(SCMCommandProto.Type.reconstructECContainersCommand, + cmd.getType()); + ReconstructECContainersCommand reconCmd = + (ReconstructECContainersCommand) cmd; + assertEquals(1, reconCmd.getMissingContainerIndexes().size()); + assertEquals(1, (int) reconCmd.getMissingContainerIndexes().byteAt(0)); + } + // Test used to reproduce the issue reported in HDDS-8171 and then adjusted // to ensure only a single command is sent for HDDS-8172. @@ -431,7 +470,7 @@ public void testUnderReplicationWithDecomNodesOverloaded() Pair.of(IN_SERVICE, 5)); doThrow(new CommandTargetOverloadedException("Overloaded")) .when(replicationManager).sendThrottledReplicationCommand( - any(), anyList(), any(), anyInt()); + any(), anyList(), any(), anyInt(), any()); assertThrows(CommandTargetOverloadedException.class, () -> testUnderReplicationWithMissingIndexes( From 9f30144a431f089fa279e58026cadc1a0ac38da7 Mon Sep 17 00:00:00 2001 From: Wei-Chiu Chuang Date: Wed, 15 Apr 2026 22:39:03 -0700 Subject: [PATCH 2/2] restore markdown Change-Id: I1ac31515d9035f338ca83c07c309fdfedf911677 --- hadoop-hdds/docs/content/feature/Quota.md | 2 +- hadoop-hdds/docs/content/feature/Quota.zh.md | 2 +- hadoop-hdds/docs/content/security/GDPR.md | 2 +- hadoop-hdds/docs/content/security/GDPR.zh.md | 2 +- hadoop-hdds/docs/content/security/SecureOzone.md | 2 +- hadoop-hdds/docs/content/security/SecureOzone.zh.md | 2 +- hadoop-hdds/docs/content/security/SecuringDatanodes.md | 2 +- hadoop-hdds/docs/content/security/SecuringDatanodes.zh.md | 2 +- hadoop-hdds/docs/content/security/SecuringOzoneHTTP.md | 2 +- hadoop-hdds/docs/content/security/SecuringOzoneHTTP.zh.md | 2 +- hadoop-hdds/docs/content/security/SecuringS3.md | 2 +- hadoop-hdds/docs/content/security/SecuringS3.zh.md | 2 +- hadoop-hdds/docs/content/security/SecuringTDE.md | 2 +- hadoop-hdds/docs/content/security/SecuringTDE.zh.md | 2 +- hadoop-hdds/docs/content/security/SecurityAcls.md | 2 +- hadoop-hdds/docs/content/security/SecurityAcls.zh.md | 2 +- hadoop-hdds/docs/content/security/SecurityWithRanger.md | 2 +- hadoop-hdds/docs/content/security/SecurityWithRanger.zh.md | 2 +- 18 files changed, 18 insertions(+), 18 deletions(-) diff --git a/hadoop-hdds/docs/content/feature/Quota.md b/hadoop-hdds/docs/content/feature/Quota.md index 53c196307fa3..90e413357b50 100644 --- a/hadoop-hdds/docs/content/feature/Quota.md +++ b/hadoop-hdds/docs/content/feature/Quota.md @@ -1,6 +1,6 @@ --- title: "Quota in Ozone" -date: "2020-10-22" +date: "2020-October-22" weight: 4 summary: Quota in Ozone icon: user diff --git a/hadoop-hdds/docs/content/feature/Quota.zh.md b/hadoop-hdds/docs/content/feature/Quota.zh.md index d690947ef06c..16e5db26cde3 100644 --- a/hadoop-hdds/docs/content/feature/Quota.zh.md +++ b/hadoop-hdds/docs/content/feature/Quota.zh.md @@ -1,6 +1,6 @@ --- title: "Ozone 中的配额" -date: "2020-10-22" +date: "2020-October-22" weight: 4 summary: Ozone中的配额 icon: user diff --git a/hadoop-hdds/docs/content/security/GDPR.md b/hadoop-hdds/docs/content/security/GDPR.md index 409a3ae7be0d..25b2f2c4416b 100644 --- a/hadoop-hdds/docs/content/security/GDPR.md +++ b/hadoop-hdds/docs/content/security/GDPR.md @@ -1,6 +1,6 @@ --- title: "GDPR in Ozone" -date: "2019-09-17" +date: "2019-September-17" weight: 3 icon: user menu: diff --git a/hadoop-hdds/docs/content/security/GDPR.zh.md b/hadoop-hdds/docs/content/security/GDPR.zh.md index 8fd3514138f0..a7db4030871b 100644 --- a/hadoop-hdds/docs/content/security/GDPR.zh.md +++ b/hadoop-hdds/docs/content/security/GDPR.zh.md @@ -1,6 +1,6 @@ --- title: "Ozone 中的 GDPR" -date: "2019-09-17" +date: "2019-September-17" weight: 3 summary: Ozone 中的 GDPR menu: diff --git a/hadoop-hdds/docs/content/security/SecureOzone.md b/hadoop-hdds/docs/content/security/SecureOzone.md index bbeef79b6135..76fd74701095 100644 --- a/hadoop-hdds/docs/content/security/SecureOzone.md +++ b/hadoop-hdds/docs/content/security/SecureOzone.md @@ -1,6 +1,6 @@ --- title: "Securing Ozone" -date: "2019-04-03" +date: "2019-April-03" summary: Overview of Ozone security concepts and steps to secure Ozone Manager and SCM. weight: 1 menu: diff --git a/hadoop-hdds/docs/content/security/SecureOzone.zh.md b/hadoop-hdds/docs/content/security/SecureOzone.zh.md index e74b5d8dfab5..a7660233f4d0 100644 --- a/hadoop-hdds/docs/content/security/SecureOzone.zh.md +++ b/hadoop-hdds/docs/content/security/SecureOzone.zh.md @@ -1,6 +1,6 @@ --- title: "安全化 Ozone" -date: "2019-04-03" +date: "2019-April-03" summary: 简要介绍 Ozone 中的安全概念以及安全化 OM 和 SCM 的步骤。 weight: 1 menu: diff --git a/hadoop-hdds/docs/content/security/SecuringDatanodes.md b/hadoop-hdds/docs/content/security/SecuringDatanodes.md index 2254155e1f4e..717e746cfb91 100644 --- a/hadoop-hdds/docs/content/security/SecuringDatanodes.md +++ b/hadoop-hdds/docs/content/security/SecuringDatanodes.md @@ -1,6 +1,6 @@ --- title: "Securing Datanodes" -date: "2019-04-03" +date: "2019-April-03" weight: 3 menu: main: diff --git a/hadoop-hdds/docs/content/security/SecuringDatanodes.zh.md b/hadoop-hdds/docs/content/security/SecuringDatanodes.zh.md index 8b37fd2f6ee2..608be16e8a3b 100644 --- a/hadoop-hdds/docs/content/security/SecuringDatanodes.zh.md +++ b/hadoop-hdds/docs/content/security/SecuringDatanodes.zh.md @@ -1,6 +1,6 @@ --- title: "安全化 Datanode" -date: "2019-04-03" +date: "2019-April-03" weight: 3 menu: main: diff --git a/hadoop-hdds/docs/content/security/SecuringOzoneHTTP.md b/hadoop-hdds/docs/content/security/SecuringOzoneHTTP.md index a8601d7a5e1f..47c04eb94d93 100644 --- a/hadoop-hdds/docs/content/security/SecuringOzoneHTTP.md +++ b/hadoop-hdds/docs/content/security/SecuringOzoneHTTP.md @@ -1,6 +1,6 @@ --- title: "Securing HTTP" -date: "2020-06-17" +date: "2020-June-17" summary: Secure HTTP web-consoles for Ozone services weight: 4 menu: diff --git a/hadoop-hdds/docs/content/security/SecuringOzoneHTTP.zh.md b/hadoop-hdds/docs/content/security/SecuringOzoneHTTP.zh.md index 5907a7caf9a2..07b3f6164f6f 100644 --- a/hadoop-hdds/docs/content/security/SecuringOzoneHTTP.zh.md +++ b/hadoop-hdds/docs/content/security/SecuringOzoneHTTP.zh.md @@ -1,6 +1,6 @@ --- title: "安全化 HTTP" -date: "2020-06-17" +date: "2020-June-17" summary: 安全化 Ozone 服务的 HTTP 网络控制台 weight: 4 menu: diff --git a/hadoop-hdds/docs/content/security/SecuringS3.md b/hadoop-hdds/docs/content/security/SecuringS3.md index 04ef6921af65..e6218b95e91e 100644 --- a/hadoop-hdds/docs/content/security/SecuringS3.md +++ b/hadoop-hdds/docs/content/security/SecuringS3.md @@ -1,6 +1,6 @@ --- title: "Securing S3" -date: "2019-04-03" +date: "2019-April-03" summary: Ozone supports S3 protocol, and uses AWS Signature Version 4 protocol which allows a seamless S3 experience. weight: 5 menu: diff --git a/hadoop-hdds/docs/content/security/SecuringS3.zh.md b/hadoop-hdds/docs/content/security/SecuringS3.zh.md index 395b9303354b..218786fd366f 100644 --- a/hadoop-hdds/docs/content/security/SecuringS3.zh.md +++ b/hadoop-hdds/docs/content/security/SecuringS3.zh.md @@ -1,6 +1,6 @@ --- title: "安全化 S3" -date: "2019-04-03" +date: "2019-April-03" summary: Ozone 支持 S3 协议,并使用 AWS Signature Version 4 protocol which allows a seamless S3 experience. weight: 5 diff --git a/hadoop-hdds/docs/content/security/SecuringTDE.md b/hadoop-hdds/docs/content/security/SecuringTDE.md index 0d04a28aec77..3b75bee1bfd5 100644 --- a/hadoop-hdds/docs/content/security/SecuringTDE.md +++ b/hadoop-hdds/docs/content/security/SecuringTDE.md @@ -1,6 +1,6 @@ --- title: "Transparent Data Encryption" -date: "2019-04-03" +date: "2019-April-03" summary: TDE allows data on the disks to be encrypted-at-rest and automatically decrypted during access. weight: 2 menu: diff --git a/hadoop-hdds/docs/content/security/SecuringTDE.zh.md b/hadoop-hdds/docs/content/security/SecuringTDE.zh.md index d7fa4941e446..ed42519e0b25 100644 --- a/hadoop-hdds/docs/content/security/SecuringTDE.zh.md +++ b/hadoop-hdds/docs/content/security/SecuringTDE.zh.md @@ -1,6 +1,6 @@ --- title: "透明数据加密" -date: "2019-04-03" +date: "2019-April-03" summary: 透明数据加密(Transparent Data Encryption,TDE)以密文形式在磁盘上保存数据,但可以在用户访问的时候自动进行解密。 weight: 2 menu: diff --git a/hadoop-hdds/docs/content/security/SecurityAcls.md b/hadoop-hdds/docs/content/security/SecurityAcls.md index ee48999ed25d..9976cbbc4fba 100644 --- a/hadoop-hdds/docs/content/security/SecurityAcls.md +++ b/hadoop-hdds/docs/content/security/SecurityAcls.md @@ -1,6 +1,6 @@ --- title: "Ozone ACLs" -date: "2019-04-03" +date: "2019-April-03" weight: 6 menu: main: diff --git a/hadoop-hdds/docs/content/security/SecurityAcls.zh.md b/hadoop-hdds/docs/content/security/SecurityAcls.zh.md index 99751cd62da3..3d95fcf0877b 100644 --- a/hadoop-hdds/docs/content/security/SecurityAcls.zh.md +++ b/hadoop-hdds/docs/content/security/SecurityAcls.zh.md @@ -1,6 +1,6 @@ --- title: "Ozone 访问控制列表" -date: "2019-04-03" +date: "2019-April-03" weight: 6 menu: main: diff --git a/hadoop-hdds/docs/content/security/SecurityWithRanger.md b/hadoop-hdds/docs/content/security/SecurityWithRanger.md index 7dc1895ad3dc..bbbd8c19f32e 100644 --- a/hadoop-hdds/docs/content/security/SecurityWithRanger.md +++ b/hadoop-hdds/docs/content/security/SecurityWithRanger.md @@ -1,6 +1,6 @@ --- title: "Apache Ranger" -date: "2019-04-03" +date: "2019-April-03" weight: 7 menu: main: diff --git a/hadoop-hdds/docs/content/security/SecurityWithRanger.zh.md b/hadoop-hdds/docs/content/security/SecurityWithRanger.zh.md index 8917c0b84bcf..b7c7b8721bbe 100644 --- a/hadoop-hdds/docs/content/security/SecurityWithRanger.zh.md +++ b/hadoop-hdds/docs/content/security/SecurityWithRanger.zh.md @@ -1,6 +1,6 @@ --- title: "Apache Ranger" -date: "2019-04-03" +date: "2019-April-03" weight: 7 menu: main: