diff --git a/pinot-server/src/main/java/org/apache/pinot/server/predownload/PredownloadScheduler.java b/pinot-server/src/main/java/org/apache/pinot/server/predownload/PredownloadScheduler.java index 558f055f152a..78f9615313d4 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/predownload/PredownloadScheduler.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/predownload/PredownloadScheduler.java @@ -21,7 +21,9 @@ import com.google.common.annotations.VisibleForTesting; import java.io.File; import java.io.IOException; +import java.net.URI; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -72,6 +74,8 @@ public class PredownloadScheduler { @VisibleForTesting Set _failedSegments; private PredownloadMetrics _predownloadMetrics; + private boolean _peerDownloadEnabled; + private String _peerDownloadScheme; private int _numOfSkippedSegments; private int _numOfUnableToDownloadSegments; private int _numOfDownloadSegments; @@ -80,7 +84,7 @@ public class PredownloadScheduler { private List _predownloadSegmentInfoList; private Map _tableInfoMap; - public PredownloadScheduler(PropertiesConfiguration properties) + public PredownloadScheduler(PropertiesConfiguration properties, boolean peerDownloadEnabled) throws Exception { _properties = properties; _clusterName = properties.getString(CommonConstants.Helix.CONFIG_OF_CLUSTER_NAME); @@ -107,6 +111,11 @@ public PredownloadScheduler(PropertiesConfiguration properties) _failedSegments = ConcurrentHashMap.newKeySet(); _executor = Executors.newFixedThreadPool(predownloadParallelism); LOGGER.info("Created thread pool with num of threads: {}", predownloadParallelism); + + _peerDownloadEnabled = peerDownloadEnabled; + _peerDownloadScheme = _instanceDataManagerConfig.getSegmentPeerDownloadScheme(); + LOGGER.info("Peer download enabled: {}, scheme: {}", _peerDownloadEnabled, _peerDownloadScheme); + _numOfSkippedSegments = 0; _numOfDownloadSegments = 0; } @@ -301,23 +310,15 @@ void downloadSegment(PredownloadSegmentInfo predownloadSegmentInfo) throws Exception { try { long startTime = System.currentTimeMillis(); - File tempRootDir = getTmpSegmentDataDir(predownloadSegmentInfo); - if (_instanceDataManagerConfig.isStreamSegmentDownloadUntar() - && predownloadSegmentInfo.getCrypterName() == null) { - try { - // TODO: increase rate limit here - File untaredSegDir = downloadAndStreamUntarWithRateLimit(predownloadSegmentInfo, tempRootDir, - _instanceDataManagerConfig.getStreamSegmentDownloadUntarRateLimit()); - moveSegment(predownloadSegmentInfo, untaredSegDir); - } finally { - FileUtils.deleteQuietly(tempRootDir); - } - } else { - try { - File tarFile = downloadAndDecrypt(predownloadSegmentInfo, tempRootDir); - untarAndMoveSegment(predownloadSegmentInfo, tarFile, tempRootDir); - } finally { - FileUtils.deleteQuietly(tempRootDir); + try { + downloadFromDeepStore(predownloadSegmentInfo); + } catch (Exception e) { + if (_peerDownloadEnabled && _peerDownloadScheme != null) { + LOGGER.warn("Deep store download failed for segment: {} of table: {}, falling back to peer download", + predownloadSegmentInfo.getSegmentName(), predownloadSegmentInfo.getTableNameWithType(), e); + downloadFromPeers(predownloadSegmentInfo); + } else { + throw e; } } _failedSegments.remove(predownloadSegmentInfo.getSegmentName()); @@ -336,6 +337,52 @@ void downloadSegment(PredownloadSegmentInfo predownloadSegmentInfo) } } + private void downloadFromDeepStore(PredownloadSegmentInfo predownloadSegmentInfo) + throws Exception { + File tempRootDir = getTmpSegmentDataDir(predownloadSegmentInfo); + if (_instanceDataManagerConfig.isStreamSegmentDownloadUntar() + && predownloadSegmentInfo.getCrypterName() == null) { + try { + File untaredSegDir = downloadAndStreamUntarWithRateLimit(predownloadSegmentInfo, tempRootDir, + _instanceDataManagerConfig.getStreamSegmentDownloadUntarRateLimit()); + moveSegment(predownloadSegmentInfo, untaredSegDir); + } finally { + FileUtils.deleteQuietly(tempRootDir); + } + } else { + try { + File tarFile = downloadAndDecrypt(predownloadSegmentInfo, tempRootDir); + untarAndMoveSegment(predownloadSegmentInfo, tarFile, tempRootDir); + } finally { + FileUtils.deleteQuietly(tempRootDir); + } + } + } + + private void downloadFromPeers(PredownloadSegmentInfo predownloadSegmentInfo) + throws Exception { + String segmentName = predownloadSegmentInfo.getSegmentName(); + String tableNameWithType = predownloadSegmentInfo.getTableNameWithType(); + LOGGER.info("Downloading segment: {} of table: {} from peers using scheme: {}", segmentName, tableNameWithType, + _peerDownloadScheme); + File tempRootDir = getTmpSegmentDataDir(predownloadSegmentInfo); + try { + File segmentTarFile = new File(tempRootDir, segmentName + TarCompressionUtils.TAR_GZ_FILE_EXTENSION); + SegmentFetcherFactory.fetchAndDecryptSegmentToLocal(segmentName, _peerDownloadScheme, () -> { + List peerURIs = + _predownloadZkClient.getPeerServerURIs(_predownloadZkClient.getDataAccessor(), tableNameWithType, + segmentName, _peerDownloadScheme); + Collections.shuffle(peerURIs); + return peerURIs; + }, segmentTarFile, predownloadSegmentInfo.getCrypterName()); + LOGGER.info("Downloaded segment: {} from peers to: {}, file length: {}", segmentName, segmentTarFile, + segmentTarFile.length()); + untarAndMoveSegment(predownloadSegmentInfo, segmentTarFile, tempRootDir); + } finally { + FileUtils.deleteQuietly(tempRootDir); + } + } + private File getTmpSegmentDataDir(PredownloadSegmentInfo predownloadSegmentInfo) throws Exception { PredownloadTableInfo predownloadTableInfo = _tableInfoMap.get(predownloadSegmentInfo.getTableNameWithType()); @@ -395,7 +442,6 @@ File downloadAndDecrypt(PredownloadSegmentInfo predownloadSegmentInfo, File temp } catch (AttemptsExceededException e) { LOGGER.error("Attempts exceeded when downloading segment: {} for table: {} from: {} to: {}", segmentName, tableNameWithType, uri, tarFile); - // TODO: add download from peer logic throw e; } } diff --git a/pinot-server/src/main/java/org/apache/pinot/server/predownload/PredownloadZKClient.java b/pinot-server/src/main/java/org/apache/pinot/server/predownload/PredownloadZKClient.java index c504af4a65f2..0dfd5809bea8 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/predownload/PredownloadZKClient.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/predownload/PredownloadZKClient.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.server.predownload; +import java.net.URI; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -28,6 +29,7 @@ import org.apache.helix.manager.zk.ZKHelixDataAccessor; import org.apache.helix.manager.zk.ZkBaseDataAccessor; import org.apache.helix.model.CurrentState; +import org.apache.helix.model.ExternalView; import org.apache.helix.model.InstanceConfig; import org.apache.helix.model.LiveInstance; import org.apache.helix.store.zk.AutoFallbackPropertyStore; @@ -147,6 +149,51 @@ public List getSegmentsOfInstance(HelixDataAccessor acce return predownloadSegmentInfos; } + /** + * Returns URIs of ONLINE peer servers hosting the given segment, excluding the current instance. + * Uses ExternalView to discover peers — mirrors PeerServerSegmentFinder without requiring HelixManager. + */ + public List getPeerServerURIs(HelixDataAccessor accessor, String tableNameWithType, String segmentName, + String downloadScheme) { + org.apache.helix.PropertyKey.Builder keyBuilder = accessor.keyBuilder(); + ExternalView externalView = accessor.getProperty(keyBuilder.externalView(tableNameWithType)); + if (externalView == null) { + LOGGER.warn("No external view for table: {} when finding peers for segment: {}", tableNameWithType, segmentName); + return new ArrayList<>(); + } + Map instanceStateMap = externalView.getStateMap(segmentName); + if (instanceStateMap == null) { + LOGGER.warn("Segment: {} not found in external view of table: {}", segmentName, tableNameWithType); + return new ArrayList<>(); + } + String adminPortKey = CommonConstants.HTTP_PROTOCOL.equals(downloadScheme) + ? CommonConstants.Helix.Instance.ADMIN_PORT_KEY + : CommonConstants.Helix.Instance.ADMIN_HTTPS_PORT_KEY; + List peerURIs = new ArrayList<>(); + for (Map.Entry entry : instanceStateMap.entrySet()) { + String instanceId = entry.getKey(); + if (!CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE.equals(entry.getValue())) { + continue; + } + InstanceConfig instanceConfig = accessor.getProperty(keyBuilder.instanceConfig(instanceId)); + if (instanceConfig == null) { + LOGGER.warn("Failed to get instance config for peer: {}", instanceId); + continue; + } + String hostName = instanceConfig.getHostName(); + int port = instanceConfig.getRecord() + .getIntField(adminPortKey, CommonConstants.Server.DEFAULT_ADMIN_API_PORT); + try { + peerURIs.add(new URI( + String.format("%s://%s:%d/segments/%s/%s", + downloadScheme, hostName, port, tableNameWithType, segmentName))); + } catch (Exception e) { + LOGGER.warn("Failed to construct peer URI for instance: {}", instanceId, e); + } + } + return peerURIs; + } + /** * Update the segment deepstore metadata from ZK. * diff --git a/pinot-server/src/test/java/org/apache/pinot/server/predownload/PredownloadSchedulerTest.java b/pinot-server/src/test/java/org/apache/pinot/server/predownload/PredownloadSchedulerTest.java index a5dca3bd8b5e..2b9f3d89d430 100644 --- a/pinot-server/src/test/java/org/apache/pinot/server/predownload/PredownloadSchedulerTest.java +++ b/pinot-server/src/test/java/org/apache/pinot/server/predownload/PredownloadSchedulerTest.java @@ -22,6 +22,7 @@ import java.io.File; import java.io.FileOutputStream; import java.io.IOException; +import java.lang.reflect.Field; import java.nio.file.Files; import java.util.ArrayList; import java.util.Arrays; @@ -39,6 +40,7 @@ import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.env.CommonsConfigurationUtils; import org.apache.pinot.spi.filesystem.PinotFSFactory; +import org.apache.pinot.spi.utils.retry.AttemptsExceededException; import org.mockito.MockedConstruction; import org.mockito.MockedStatic; import org.testng.annotations.AfterClass; @@ -51,6 +53,9 @@ import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.*; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertThrows; +import static org.testng.Assert.assertTrue; public class PredownloadSchedulerTest { @@ -71,7 +76,7 @@ public void setUp(PropertiesConfiguration properties) throws Exception { _temporaryFolder = new File(FileUtils.getTempDirectory(), this.getClass().getName()); FileUtils.deleteQuietly(_temporaryFolder); - _predownloadScheduler = spy(new PredownloadScheduler(properties)); + _predownloadScheduler = spy(new PredownloadScheduler(properties, false)); _instanceConfig = new InstanceConfig(INSTANCE_ID); _instanceDataManagerConfig = mock(InstanceDataManagerConfig.class); _instanceConfig.addTag(TAG); @@ -299,7 +304,8 @@ public void downloadSegments() } @Test - public void testPredownloadParallelismConfiguration() throws Exception { + public void testPredownloadParallelismConfiguration() + throws Exception { // Test default parallelism (should use numProcessors * 3) Map defaultProps = Map.of( "pinot.server.instance.id", INSTANCE_ID, @@ -311,7 +317,7 @@ public void testPredownloadParallelismConfiguration() throws Exception { PropertiesConfiguration defaultConfig = new PropertiesConfiguration(); defaultProps.forEach((key, value) -> defaultConfig.setProperty(key, value)); - PredownloadScheduler defaultScheduler = new PredownloadScheduler(defaultConfig); + PredownloadScheduler defaultScheduler = new PredownloadScheduler(defaultConfig, false); ThreadPoolExecutor defaultExecutor = (ThreadPoolExecutor) defaultScheduler._executor; int expectedDefaultThreads = Runtime.getRuntime().availableProcessors() * 3; assertEquals(defaultExecutor.getCorePoolSize(), expectedDefaultThreads, @@ -331,7 +337,7 @@ public void testPredownloadParallelismConfiguration() throws Exception { PropertiesConfiguration customConfig = new PropertiesConfiguration(); customProps.forEach((key, value) -> customConfig.setProperty(key, value)); - PredownloadScheduler customScheduler = new PredownloadScheduler(customConfig); + PredownloadScheduler customScheduler = new PredownloadScheduler(customConfig, false); ThreadPoolExecutor customExecutor = (ThreadPoolExecutor) customScheduler._executor; assertEquals(customExecutor.getCorePoolSize(), customParallelism, "Custom parallelism should match configured value"); @@ -349,10 +355,212 @@ public void testPredownloadParallelismConfiguration() throws Exception { PropertiesConfiguration zeroConfig = new PropertiesConfiguration(); zeroProps.forEach((key, value) -> zeroConfig.setProperty(key, value)); - PredownloadScheduler zeroScheduler = new PredownloadScheduler(zeroConfig); + PredownloadScheduler zeroScheduler = new PredownloadScheduler(zeroConfig, false); ThreadPoolExecutor zeroExecutor = (ThreadPoolExecutor) zeroScheduler._executor; assertEquals(zeroExecutor.getCorePoolSize(), expectedDefaultThreads, "Zero parallelism should fall back to default"); zeroScheduler.stop(); } + + // ── Peer download tests ──────────────────────────────────────────────────── + + private PredownloadScheduler buildPeerEnabledScheduler(PropertiesConfiguration properties) + throws Exception { + PredownloadScheduler scheduler = spy(new PredownloadScheduler(properties, true)); + scheduler._executor = Runnable::run; + + Field schemeField = PredownloadScheduler.class.getDeclaredField("_peerDownloadScheme"); + schemeField.setAccessible(true); + schemeField.set(scheduler, "http"); + + Field metricsField = PredownloadScheduler.class.getDeclaredField("_predownloadMetrics"); + metricsField.setAccessible(true); + metricsField.set(scheduler, mock(PredownloadMetrics.class)); + + return scheduler; + } + + private void injectMockZkClient(PredownloadScheduler scheduler, PredownloadZKClient mockZkClient) + throws Exception { + Field zkField = PredownloadScheduler.class.getDeclaredField("_predownloadZkClient"); + zkField.setAccessible(true); + zkField.set(scheduler, mockZkClient); + } + + private void injectSegmentState(PredownloadScheduler scheduler, + List segments, Map tableInfoMap) + throws Exception { + Field segField = PredownloadScheduler.class.getDeclaredField("_predownloadSegmentInfoList"); + segField.setAccessible(true); + segField.set(scheduler, segments); + + Field mapField = PredownloadScheduler.class.getDeclaredField("_tableInfoMap"); + mapField.setAccessible(true); + mapField.set(scheduler, tableInfoMap); + } + + @Test + public void testPeerDownloadEnabledViaConstructor() + throws Exception { + String propertiesFilePath = + this.getClass().getClassLoader().getResource(SAMPLE_PROPERTIES_FILE_NAME).getPath(); + PropertiesConfiguration properties = CommonsConfigurationUtils.fromPath(propertiesFilePath); + + Field enabledField = PredownloadScheduler.class.getDeclaredField("_peerDownloadEnabled"); + enabledField.setAccessible(true); + + PredownloadScheduler disabled = new PredownloadScheduler(properties, false); + assertEquals(enabledField.get(disabled), false); + disabled.stop(); + + PredownloadScheduler enabled = new PredownloadScheduler(properties, true); + assertEquals(enabledField.get(enabled), true); + enabled.stop(); + } + + @Test + public void testDownloadSegmentFallbackToPeerOnDeepStoreFailure() + throws Exception { + String propertiesFilePath = + this.getClass().getClassLoader().getResource(SAMPLE_PROPERTIES_FILE_NAME).getPath(); + PropertiesConfiguration properties = CommonsConfigurationUtils.fromPath(propertiesFilePath); + setUp(properties); + + PredownloadScheduler scheduler = buildPeerEnabledScheduler(properties); + PredownloadZKClient mockZkClient = mock(PredownloadZKClient.class); + when(mockZkClient.getPeerServerURIs(any(), anyString(), anyString(), anyString())).thenReturn(new ArrayList<>()); + injectMockZkClient(scheduler, mockZkClient); + + PredownloadSegmentInfo segment = new PredownloadSegmentInfo(TABLE_NAME, SEGMENT_NAME); + segment.updateSegmentInfo(createSegmentZKMetadata()); + scheduler._failedSegments.add(SEGMENT_NAME); + + File testFolder = new File(_temporaryFolder, "peerFallbackTest"); + testFolder.mkdirs(); + String dataDir = testFolder.getAbsolutePath(); + int lastIndex = dataDir.lastIndexOf(File.separator); + when(_predownloadTableInfo.getInstanceDataManagerConfig()).thenReturn(_instanceDataManagerConfig); + when(_predownloadTableInfo.getTableConfig()).thenReturn(_tableConfig); + when(_instanceDataManagerConfig.getInstanceDataDir()).thenReturn(dataDir.substring(0, lastIndex)); + when(_tableConfig.getTableName()).thenReturn(dataDir.substring(lastIndex + 1)); + when(_predownloadTableInfo.loadSegmentFromLocal(any())).thenReturn(false); + injectSegmentState(scheduler, List.of(segment), Map.of(TABLE_NAME, _predownloadTableInfo)); + + try (MockedStatic sfMock = mockStatic(SegmentFetcherFactory.class)) { + // 3-arg (deep store) throws — simulating exhausted deep store retries + sfMock.when( + () -> SegmentFetcherFactory.fetchAndDecryptSegmentToLocal(anyString(), any(File.class), anyString())) + .thenThrow(new AttemptsExceededException("deep store failed", 3)); + // 5-arg (peer) succeeds — supplier lambda is not invoked by mock, so no ZK call needed + sfMock.when( + () -> SegmentFetcherFactory.fetchAndDecryptSegmentToLocal( + anyString(), anyString(), any(), any(File.class), anyString())) + .thenAnswer(inv -> null); + + try (MockedStatic tarMock = mockStatic(TarCompressionUtils.class)) { + tarMock.when(() -> TarCompressionUtils.untar(any(File.class), any(File.class))) + .thenAnswer(inv -> { + File untarDir = new File(testFolder, "untared_peer"); + untarDir.mkdirs(); + return List.of(untarDir); + }); + scheduler.downloadSegment(segment); + } + } + + assertFalse(scheduler._failedSegments.contains(SEGMENT_NAME), + "Segment should be removed from failed set after successful peer download"); + scheduler._executor = null; // lambda can't be cast to ThreadPoolExecutor + scheduler.stop(); + } + + @Test + public void testDownloadSegmentNoPeerFallbackWhenPeerDisabled() + throws Exception { + String propertiesFilePath = + this.getClass().getClassLoader().getResource(SAMPLE_PROPERTIES_FILE_NAME).getPath(); + PropertiesConfiguration properties = CommonsConfigurationUtils.fromPath(propertiesFilePath); + setUp(properties); + + // peerDownloadEnabled=false — no fallback to peer + PredownloadScheduler scheduler = spy(new PredownloadScheduler(properties, false)); + scheduler._executor = Runnable::run; + Field metricsField = PredownloadScheduler.class.getDeclaredField("_predownloadMetrics"); + metricsField.setAccessible(true); + metricsField.set(scheduler, mock(PredownloadMetrics.class)); + + PredownloadSegmentInfo segment = new PredownloadSegmentInfo(TABLE_NAME, SEGMENT_NAME); + segment.updateSegmentInfo(createSegmentZKMetadata()); + + File testFolder = new File(_temporaryFolder, "noPeerTest"); + testFolder.mkdirs(); + String dataDir = testFolder.getAbsolutePath(); + int lastIndex = dataDir.lastIndexOf(File.separator); + when(_predownloadTableInfo.getInstanceDataManagerConfig()).thenReturn(_instanceDataManagerConfig); + when(_predownloadTableInfo.getTableConfig()).thenReturn(_tableConfig); + when(_instanceDataManagerConfig.getInstanceDataDir()).thenReturn(dataDir.substring(0, lastIndex)); + when(_tableConfig.getTableName()).thenReturn(dataDir.substring(lastIndex + 1)); + injectSegmentState(scheduler, List.of(segment), Map.of(TABLE_NAME, _predownloadTableInfo)); + + try (MockedStatic sfMock = mockStatic(SegmentFetcherFactory.class)) { + sfMock.when( + () -> SegmentFetcherFactory.fetchAndDecryptSegmentToLocal(anyString(), any(File.class), anyString())) + .thenThrow(new AttemptsExceededException("deep store failed", 3)); + + try (MockedStatic tarMock = mockStatic(TarCompressionUtils.class)) { + assertThrows(AttemptsExceededException.class, () -> scheduler.downloadSegment(segment)); + } + } + + assertTrue(scheduler._failedSegments.contains(SEGMENT_NAME), + "Segment should remain in failed set when peer download is disabled"); + scheduler._executor = null; + scheduler.stop(); + } + + @Test + public void testDownloadSegmentBothDeepStoreAndPeerFail() + throws Exception { + String propertiesFilePath = + this.getClass().getClassLoader().getResource(SAMPLE_PROPERTIES_FILE_NAME).getPath(); + PropertiesConfiguration properties = CommonsConfigurationUtils.fromPath(propertiesFilePath); + setUp(properties); + + PredownloadScheduler scheduler = buildPeerEnabledScheduler(properties); + PredownloadZKClient mockZkClient = mock(PredownloadZKClient.class); + when(mockZkClient.getPeerServerURIs(any(), anyString(), anyString(), anyString())).thenReturn(new ArrayList<>()); + injectMockZkClient(scheduler, mockZkClient); + + PredownloadSegmentInfo segment = new PredownloadSegmentInfo(TABLE_NAME, SEGMENT_NAME); + segment.updateSegmentInfo(createSegmentZKMetadata()); + + File testFolder = new File(_temporaryFolder, "bothFailTest"); + testFolder.mkdirs(); + String dataDir = testFolder.getAbsolutePath(); + int lastIndex = dataDir.lastIndexOf(File.separator); + when(_predownloadTableInfo.getInstanceDataManagerConfig()).thenReturn(_instanceDataManagerConfig); + when(_predownloadTableInfo.getTableConfig()).thenReturn(_tableConfig); + when(_instanceDataManagerConfig.getInstanceDataDir()).thenReturn(dataDir.substring(0, lastIndex)); + when(_tableConfig.getTableName()).thenReturn(dataDir.substring(lastIndex + 1)); + injectSegmentState(scheduler, List.of(segment), Map.of(TABLE_NAME, _predownloadTableInfo)); + + try (MockedStatic sfMock = mockStatic(SegmentFetcherFactory.class)) { + sfMock.when( + () -> SegmentFetcherFactory.fetchAndDecryptSegmentToLocal(anyString(), any(File.class), anyString())) + .thenThrow(new AttemptsExceededException("deep store failed", 3)); + sfMock.when( + () -> SegmentFetcherFactory.fetchAndDecryptSegmentToLocal( + anyString(), anyString(), any(), any(File.class), anyString())) + .thenThrow(new AttemptsExceededException("peer download failed", 3)); + + try (MockedStatic tarMock = mockStatic(TarCompressionUtils.class)) { + assertThrows(AttemptsExceededException.class, () -> scheduler.downloadSegment(segment)); + } + } + + assertTrue(scheduler._failedSegments.contains(SEGMENT_NAME), + "Segment should be in failed set when both deep store and peer download fail"); + scheduler._executor = null; + scheduler.stop(); + } } diff --git a/pinot-server/src/test/java/org/apache/pinot/server/predownload/PredownloadZKClientTest.java b/pinot-server/src/test/java/org/apache/pinot/server/predownload/PredownloadZKClientTest.java index fda970a42727..3867c1927d75 100644 --- a/pinot-server/src/test/java/org/apache/pinot/server/predownload/PredownloadZKClientTest.java +++ b/pinot-server/src/test/java/org/apache/pinot/server/predownload/PredownloadZKClientTest.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.server.predownload; +import java.net.URI; import java.util.Arrays; import java.util.HashMap; import java.util.List; @@ -25,15 +26,20 @@ import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixException; import org.apache.helix.PropertyKey; +import org.apache.helix.PropertyType; import org.apache.helix.model.CurrentState; +import org.apache.helix.model.ExternalView; +import org.apache.helix.model.InstanceConfig; import org.apache.helix.model.LiveInstance; import org.apache.helix.zookeeper.api.client.HelixZkClient; +import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.helix.zookeeper.impl.factory.SharedZkClientFactory; import org.apache.pinot.common.metadata.ZKMetadataProvider; import org.apache.pinot.server.starter.helix.HelixInstanceDataManagerConfig; import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.utils.CommonConstants; import org.mockito.MockedStatic; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -41,6 +47,7 @@ import static org.apache.pinot.server.predownload.PredownloadTestUtil.*; import static org.mockito.ArgumentMatchers.*; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mockStatic; import static org.mockito.Mockito.spy; @@ -182,4 +189,69 @@ public void testUpdateSegmentMetadata() assertEquals(predownloadSegmentInfoList.get(1).getCrc(), 0); } } + + @Test + public void testGetPeerServerURIs() + throws Exception { + // Pass accessor directly — no spy needed since method signature accepts accessor as param + HelixDataAccessor dataAccessor = mock(HelixDataAccessor.class); + when(dataAccessor.keyBuilder()).thenReturn(new PropertyKey.Builder(CLUSTER_NAME)); + + ExternalView externalView = mock(ExternalView.class); + String peerInstanceId = "Server_peer_8098"; + InstanceConfig peerConfig = mock(InstanceConfig.class); + when(peerConfig.getHostName()).thenReturn("peerHost"); + ZNRecord peerRecord = new ZNRecord(peerInstanceId); + peerRecord.setIntField(CommonConstants.Helix.Instance.ADMIN_PORT_KEY, 9000); + when(peerConfig.getRecord()).thenReturn(peerRecord); + + // Case 1: ExternalView is null → empty list + doAnswer(inv -> null).when(dataAccessor).getProperty(any(PropertyKey.class)); + assertTrue(_predownloadZkClient.getPeerServerURIs(dataAccessor, TABLE_NAME, SEGMENT_NAME, "http").isEmpty(), + "Should return empty list when ExternalView is null"); + + // Case 2: ExternalView found but segment has no state map → empty list + doAnswer(inv -> { + PropertyKey key = inv.getArgument(0); + return (key.getType() == PropertyType.EXTERNALVIEW) ? externalView : null; + }).when(dataAccessor).getProperty(any(PropertyKey.class)); + when(externalView.getStateMap(SEGMENT_NAME)).thenReturn(null); + assertTrue(_predownloadZkClient.getPeerServerURIs(dataAccessor, TABLE_NAME, SEGMENT_NAME, "http").isEmpty(), + "Should return empty list when segment not in ExternalView"); + + // Case 3: Only self ONLINE → empty list + // In @BeforeClass: new PredownloadZKClient(ZK_ADDRESS, INSTANCE_ID, CLUSTER_NAME) + // → _instanceName = CLUSTER_NAME + when(externalView.getStateMap(SEGMENT_NAME)).thenReturn(Map.of(CLUSTER_NAME, "ONLINE")); + assertTrue(_predownloadZkClient.getPeerServerURIs(dataAccessor, TABLE_NAME, SEGMENT_NAME, "http").isEmpty(), + "Should exclude self instance from peer list"); + + // Case 4: peer ONLINE + one OFFLINE → only peer returned (self-exclusion tested in Case 3) + Map stateMap = new HashMap<>(); + stateMap.put(peerInstanceId, "ONLINE"); // peer → included + stateMap.put("Server_offline", "OFFLINE"); // offline → excluded + when(externalView.getStateMap(SEGMENT_NAME)).thenReturn(stateMap); + doAnswer(inv -> { + PropertyKey key = inv.getArgument(0); + return (key.getType() == PropertyType.EXTERNALVIEW) ? externalView : peerConfig; + }).when(dataAccessor).getProperty(any(PropertyKey.class)); + + List uris = _predownloadZkClient.getPeerServerURIs(dataAccessor, TABLE_NAME, SEGMENT_NAME, "http"); + assertEquals(uris.size(), 1, "Should return exactly one peer URI"); + String uriStr = uris.get(0).toString(); + assertTrue(uriStr.contains("peerHost"), "URI should contain peer hostname"); + assertTrue(uriStr.contains("9000"), "URI should contain peer admin port"); + assertTrue(uriStr.contains(TABLE_NAME), "URI should contain table name"); + assertTrue(uriStr.contains(SEGMENT_NAME), "URI should contain segment name"); + assertTrue(uriStr.startsWith("http://"), "URI should use http scheme"); + + // Case 5: Peer InstanceConfig is null → skipped, empty list + when(externalView.getStateMap(SEGMENT_NAME)).thenReturn(Map.of(peerInstanceId, "ONLINE")); + doAnswer(inv -> { + PropertyKey key = inv.getArgument(0); + return (key.getType() == PropertyType.EXTERNALVIEW) ? externalView : null; + }).when(dataAccessor).getProperty(any(PropertyKey.class)); + assertTrue(_predownloadZkClient.getPeerServerURIs(dataAccessor, TABLE_NAME, SEGMENT_NAME, "http").isEmpty(), + "Should skip peer when its InstanceConfig is null"); + } }