diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java index b1f68e07f2fb2..adce3afb54b7e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java @@ -580,17 +580,7 @@ private synchronized void updateRealtimeResourceQuota() { } private void compareAndWriteQuota(String bundle, ResourceQuota oldQuota, ResourceQuota newQuota) throws Exception { - boolean needUpdate = true; - if (!oldQuota.getDynamic() || (Math - .abs(newQuota.getMsgRateIn() - oldQuota.getMsgRateIn()) < RESOURCE_QUOTA_MIN_MSGRATE_IN - && Math.abs(newQuota.getMsgRateOut() - oldQuota.getMsgRateOut()) < RESOURCE_QUOTA_MIN_MSGRATE_OUT - && Math.abs(newQuota.getBandwidthIn() - oldQuota.getBandwidthOut()) < RESOURCE_QUOTA_MIN_BANDWIDTH_IN - && Math.abs(newQuota.getBandwidthOut() - oldQuota.getBandwidthOut()) < RESOURCE_QUOTA_MIN_BANDWIDTH_OUT - && Math.abs(newQuota.getMemory() - oldQuota.getMemory()) < RESOURCE_QUOTA_MIN_MEMORY)) { - needUpdate = false; - } - - if (needUpdate) { + if (needToUpdateQuota(oldQuota, newQuota)) { log.info(String.format( "Update quota %s - msgRateIn: %.1f, msgRateOut: %.1f, bandwidthIn: %.1f," + " bandwidthOut: %.1f, memory: %.1f", @@ -605,6 +595,15 @@ private void compareAndWriteQuota(String bundle, ResourceQuota oldQuota, Resourc } } + static boolean needToUpdateQuota(ResourceQuota oldQuota, ResourceQuota newQuota) { + return oldQuota.getDynamic() + && (Math.abs(newQuota.getMsgRateIn() - oldQuota.getMsgRateIn()) >= RESOURCE_QUOTA_MIN_MSGRATE_IN + || Math.abs(newQuota.getMsgRateOut() - oldQuota.getMsgRateOut()) >= RESOURCE_QUOTA_MIN_MSGRATE_OUT + || Math.abs(newQuota.getBandwidthIn() - oldQuota.getBandwidthIn()) >= RESOURCE_QUOTA_MIN_BANDWIDTH_IN + || Math.abs(newQuota.getBandwidthOut() - oldQuota.getBandwidthOut()) >= RESOURCE_QUOTA_MIN_BANDWIDTH_OUT + || Math.abs(newQuota.getMemory() - oldQuota.getMemory()) >= RESOURCE_QUOTA_MIN_MEMORY); + } + @Override public void writeResourceQuotasToZooKeeper() throws Exception { log.info("Writing namespace bundle resource quotas to ZooKeeper as leader broker"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImplQuotaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImplQuotaTest.java new file mode 100644 index 0000000000000..8a2850c427940 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImplQuotaTest.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.loadbalance.impl; + +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; +import org.apache.pulsar.common.policies.data.ResourceQuota; +import org.testng.annotations.Test; + +public class SimpleLoadManagerImplQuotaTest { + + @Test + public void testNeedToUpdateQuotaUsesMatchingBandwidthFields() { + ResourceQuota oldQuota = new ResourceQuota(); + oldQuota.setMsgRateIn(100); + oldQuota.setMsgRateOut(100); + oldQuota.setBandwidthIn(100_000); + oldQuota.setBandwidthOut(200_000); + oldQuota.setMemory(100); + + ResourceQuota newQuota = new ResourceQuota(); + newQuota.setMsgRateIn(oldQuota.getMsgRateIn()); + newQuota.setMsgRateOut(oldQuota.getMsgRateOut()); + newQuota.setBandwidthIn(oldQuota.getBandwidthIn() + 1); + newQuota.setBandwidthOut(oldQuota.getBandwidthOut()); + newQuota.setMemory(oldQuota.getMemory()); + + assertFalse(SimpleLoadManagerImpl.needToUpdateQuota(oldQuota, newQuota)); + + newQuota.setBandwidthIn(oldQuota.getBandwidthIn() + 10_000); + + assertTrue(SimpleLoadManagerImpl.needToUpdateQuota(oldQuota, newQuota)); + } +}