From 1aa6c64e523e8386d8709833c8fbdbf7e5f3b44c Mon Sep 17 00:00:00 2001 From: psinghnegi Date: Sun, 31 May 2026 15:40:04 +0000 Subject: [PATCH] Add peer download fallback to PredownloadScheduler When a segment download from deep storage fails in the predownload container, fall back to downloading from peer servers if peer download is enabled. This improves predownload reliability when deep storage is temporarily unavailable or experiencing issues. Changes: - Accept peerDownloadEnabled flag in PredownloadScheduler constructor - Extract deep store download logic into downloadFromDeepStore() - Add downloadFromPeers() method that discovers ONLINE peer servers via ExternalView and downloads the segment from a shuffled peer list - Add getPeerServerURIs() to PredownloadZKClient to discover peers without requiring HelixManager - Add tests for peer download fallback and ZK peer discovery Co-Authored-By: Claude Opus 4.6 (1M context) --- .../predownload/PredownloadScheduler.java | 84 +++++-- .../predownload/PredownloadZKClient.java | 47 ++++ .../predownload/PredownloadSchedulerTest.java | 218 +++++++++++++++++- .../predownload/PredownloadZKClientTest.java | 72 ++++++ 4 files changed, 397 insertions(+), 24 deletions(-) diff --git a/pinot-server/src/main/java/org/apache/pinot/server/predownload/PredownloadScheduler.java b/pinot-server/src/main/java/org/apache/pinot/server/predownload/PredownloadScheduler.java index 558f055f152a..78f9615313d4 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/predownload/PredownloadScheduler.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/predownload/PredownloadScheduler.java @@ -21,7 +21,9 @@ import com.google.common.annotations.VisibleForTesting; import java.io.File; import java.io.IOException; +import java.net.URI; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -72,6 +74,8 @@ public class PredownloadScheduler { @VisibleForTesting Set _failedSegments; private PredownloadMetrics _predownloadMetrics; + private boolean _peerDownloadEnabled; + private String _peerDownloadScheme; private int _numOfSkippedSegments; private int _numOfUnableToDownloadSegments; private int _numOfDownloadSegments; @@ -80,7 +84,7 @@ public class PredownloadScheduler { private List _predownloadSegmentInfoList; private Map _tableInfoMap; - public PredownloadScheduler(PropertiesConfiguration properties) + public PredownloadScheduler(PropertiesConfiguration properties, boolean peerDownloadEnabled) throws Exception { _properties = properties; _clusterName = properties.getString(CommonConstants.Helix.CONFIG_OF_CLUSTER_NAME); @@ -107,6 +111,11 @@ public PredownloadScheduler(PropertiesConfiguration properties) _failedSegments = ConcurrentHashMap.newKeySet(); _executor = Executors.newFixedThreadPool(predownloadParallelism); LOGGER.info("Created thread pool with num of threads: {}", predownloadParallelism); + + _peerDownloadEnabled = peerDownloadEnabled; + _peerDownloadScheme = _instanceDataManagerConfig.getSegmentPeerDownloadScheme(); + LOGGER.info("Peer download enabled: {}, scheme: {}", _peerDownloadEnabled, _peerDownloadScheme); + _numOfSkippedSegments = 0; _numOfDownloadSegments = 0; } @@ -301,23 +310,15 @@ void downloadSegment(PredownloadSegmentInfo predownloadSegmentInfo) throws Exception { try { long startTime = System.currentTimeMillis(); - File tempRootDir = getTmpSegmentDataDir(predownloadSegmentInfo); - if (_instanceDataManagerConfig.isStreamSegmentDownloadUntar() - && predownloadSegmentInfo.getCrypterName() == null) { - try { - // TODO: increase rate limit here - File untaredSegDir = downloadAndStreamUntarWithRateLimit(predownloadSegmentInfo, tempRootDir, - _instanceDataManagerConfig.getStreamSegmentDownloadUntarRateLimit()); - moveSegment(predownloadSegmentInfo, untaredSegDir); - } finally { - FileUtils.deleteQuietly(tempRootDir); - } - } else { - try { - File tarFile = downloadAndDecrypt(predownloadSegmentInfo, tempRootDir); - untarAndMoveSegment(predownloadSegmentInfo, tarFile, tempRootDir); - } finally { - FileUtils.deleteQuietly(tempRootDir); + try { + downloadFromDeepStore(predownloadSegmentInfo); + } catch (Exception e) { + if (_peerDownloadEnabled && _peerDownloadScheme != null) { + LOGGER.warn("Deep store download failed for segment: {} of table: {}, falling back to peer download", + predownloadSegmentInfo.getSegmentName(), predownloadSegmentInfo.getTableNameWithType(), e); + downloadFromPeers(predownloadSegmentInfo); + } else { + throw e; } } _failedSegments.remove(predownloadSegmentInfo.getSegmentName()); @@ -336,6 +337,52 @@ void downloadSegment(PredownloadSegmentInfo predownloadSegmentInfo) } } + private void downloadFromDeepStore(PredownloadSegmentInfo predownloadSegmentInfo) + throws Exception { + File tempRootDir = getTmpSegmentDataDir(predownloadSegmentInfo); + if (_instanceDataManagerConfig.isStreamSegmentDownloadUntar() + && predownloadSegmentInfo.getCrypterName() == null) { + try { + File untaredSegDir = downloadAndStreamUntarWithRateLimit(predownloadSegmentInfo, tempRootDir, + _instanceDataManagerConfig.getStreamSegmentDownloadUntarRateLimit()); + moveSegment(predownloadSegmentInfo, untaredSegDir); + } finally { + FileUtils.deleteQuietly(tempRootDir); + } + } else { + try { + File tarFile = downloadAndDecrypt(predownloadSegmentInfo, tempRootDir); + untarAndMoveSegment(predownloadSegmentInfo, tarFile, tempRootDir); + } finally { + FileUtils.deleteQuietly(tempRootDir); + } + } + } + + private void downloadFromPeers(PredownloadSegmentInfo predownloadSegmentInfo) + throws Exception { + String segmentName = predownloadSegmentInfo.getSegmentName(); + String tableNameWithType = predownloadSegmentInfo.getTableNameWithType(); + LOGGER.info("Downloading segment: {} of table: {} from peers using scheme: {}", segmentName, tableNameWithType, + _peerDownloadScheme); + File tempRootDir = getTmpSegmentDataDir(predownloadSegmentInfo); + try { + File segmentTarFile = new File(tempRootDir, segmentName + TarCompressionUtils.TAR_GZ_FILE_EXTENSION); + SegmentFetcherFactory.fetchAndDecryptSegmentToLocal(segmentName, _peerDownloadScheme, () -> { + List peerURIs = + _predownloadZkClient.getPeerServerURIs(_predownloadZkClient.getDataAccessor(), tableNameWithType, + segmentName, _peerDownloadScheme); + Collections.shuffle(peerURIs); + return peerURIs; + }, segmentTarFile, predownloadSegmentInfo.getCrypterName()); + LOGGER.info("Downloaded segment: {} from peers to: {}, file length: {}", segmentName, segmentTarFile, + segmentTarFile.length()); + untarAndMoveSegment(predownloadSegmentInfo, segmentTarFile, tempRootDir); + } finally { + FileUtils.deleteQuietly(tempRootDir); + } + } + private File getTmpSegmentDataDir(PredownloadSegmentInfo predownloadSegmentInfo) throws Exception { PredownloadTableInfo predownloadTableInfo = _tableInfoMap.get(predownloadSegmentInfo.getTableNameWithType()); @@ -395,7 +442,6 @@ File downloadAndDecrypt(PredownloadSegmentInfo predownloadSegmentInfo, File temp } catch (AttemptsExceededException e) { LOGGER.error("Attempts exceeded when downloading segment: {} for table: {} from: {} to: {}", segmentName, tableNameWithType, uri, tarFile); - // TODO: add download from peer logic throw e; } } diff --git a/pinot-server/src/main/java/org/apache/pinot/server/predownload/PredownloadZKClient.java b/pinot-server/src/main/java/org/apache/pinot/server/predownload/PredownloadZKClient.java index c504af4a65f2..0dfd5809bea8 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/predownload/PredownloadZKClient.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/predownload/PredownloadZKClient.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.server.predownload; +import java.net.URI; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -28,6 +29,7 @@ import org.apache.helix.manager.zk.ZKHelixDataAccessor; import org.apache.helix.manager.zk.ZkBaseDataAccessor; import org.apache.helix.model.CurrentState; +import org.apache.helix.model.ExternalView; import org.apache.helix.model.InstanceConfig; import org.apache.helix.model.LiveInstance; import org.apache.helix.store.zk.AutoFallbackPropertyStore; @@ -147,6 +149,51 @@ public List getSegmentsOfInstance(HelixDataAccessor acce return predownloadSegmentInfos; } + /** + * Returns URIs of ONLINE peer servers hosting the given segment, excluding the current instance. + * Uses ExternalView to discover peers — mirrors PeerServerSegmentFinder without requiring HelixManager. + */ + public List getPeerServerURIs(HelixDataAccessor accessor, String tableNameWithType, String segmentName, + String downloadScheme) { + org.apache.helix.PropertyKey.Builder keyBuilder = accessor.keyBuilder(); + ExternalView externalView = accessor.getProperty(keyBuilder.externalView(tableNameWithType)); + if (externalView == null) { + LOGGER.warn("No external view for table: {} when finding peers for segment: {}", tableNameWithType, segmentName); + return new ArrayList<>(); + } + Map instanceStateMap = externalView.getStateMap(segmentName); + if (instanceStateMap == null) { + LOGGER.warn("Segment: {} not found in external view of table: {}", segmentName, tableNameWithType); + return new ArrayList<>(); + } + String adminPortKey = CommonConstants.HTTP_PROTOCOL.equals(downloadScheme) + ? CommonConstants.Helix.Instance.ADMIN_PORT_KEY + : CommonConstants.Helix.Instance.ADMIN_HTTPS_PORT_KEY; + List peerURIs = new ArrayList<>(); + for (Map.Entry entry : instanceStateMap.entrySet()) { + String instanceId = entry.getKey(); + if (!CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE.equals(entry.getValue())) { + continue; + } + InstanceConfig instanceConfig = accessor.getProperty(keyBuilder.instanceConfig(instanceId)); + if (instanceConfig == null) { + LOGGER.warn("Failed to get instance config for peer: {}", instanceId); + continue; + } + String hostName = instanceConfig.getHostName(); + int port = instanceConfig.getRecord() + .getIntField(adminPortKey, CommonConstants.Server.DEFAULT_ADMIN_API_PORT); + try { + peerURIs.add(new URI( + String.format("%s://%s:%d/segments/%s/%s", + downloadScheme, hostName, port, tableNameWithType, segmentName))); + } catch (Exception e) { + LOGGER.warn("Failed to construct peer URI for instance: {}", instanceId, e); + } + } + return peerURIs; + } + /** * Update the segment deepstore metadata from ZK. * diff --git a/pinot-server/src/test/java/org/apache/pinot/server/predownload/PredownloadSchedulerTest.java b/pinot-server/src/test/java/org/apache/pinot/server/predownload/PredownloadSchedulerTest.java index a5dca3bd8b5e..2b9f3d89d430 100644 --- a/pinot-server/src/test/java/org/apache/pinot/server/predownload/PredownloadSchedulerTest.java +++ b/pinot-server/src/test/java/org/apache/pinot/server/predownload/PredownloadSchedulerTest.java @@ -22,6 +22,7 @@ import java.io.File; import java.io.FileOutputStream; import java.io.IOException; +import java.lang.reflect.Field; import java.nio.file.Files; import java.util.ArrayList; import java.util.Arrays; @@ -39,6 +40,7 @@ import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.env.CommonsConfigurationUtils; import org.apache.pinot.spi.filesystem.PinotFSFactory; +import org.apache.pinot.spi.utils.retry.AttemptsExceededException; import org.mockito.MockedConstruction; import org.mockito.MockedStatic; import org.testng.annotations.AfterClass; @@ -51,6 +53,9 @@ import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.*; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertThrows; +import static org.testng.Assert.assertTrue; public class PredownloadSchedulerTest { @@ -71,7 +76,7 @@ public void setUp(PropertiesConfiguration properties) throws Exception { _temporaryFolder = new File(FileUtils.getTempDirectory(), this.getClass().getName()); FileUtils.deleteQuietly(_temporaryFolder); - _predownloadScheduler = spy(new PredownloadScheduler(properties)); + _predownloadScheduler = spy(new PredownloadScheduler(properties, false)); _instanceConfig = new InstanceConfig(INSTANCE_ID); _instanceDataManagerConfig = mock(InstanceDataManagerConfig.class); _instanceConfig.addTag(TAG); @@ -299,7 +304,8 @@ public void downloadSegments() } @Test - public void testPredownloadParallelismConfiguration() throws Exception { + public void testPredownloadParallelismConfiguration() + throws Exception { // Test default parallelism (should use numProcessors * 3) Map defaultProps = Map.of( "pinot.server.instance.id", INSTANCE_ID, @@ -311,7 +317,7 @@ public void testPredownloadParallelismConfiguration() throws Exception { PropertiesConfiguration defaultConfig = new PropertiesConfiguration(); defaultProps.forEach((key, value) -> defaultConfig.setProperty(key, value)); - PredownloadScheduler defaultScheduler = new PredownloadScheduler(defaultConfig); + PredownloadScheduler defaultScheduler = new PredownloadScheduler(defaultConfig, false); ThreadPoolExecutor defaultExecutor = (ThreadPoolExecutor) defaultScheduler._executor; int expectedDefaultThreads = Runtime.getRuntime().availableProcessors() * 3; assertEquals(defaultExecutor.getCorePoolSize(), expectedDefaultThreads, @@ -331,7 +337,7 @@ public void testPredownloadParallelismConfiguration() throws Exception { PropertiesConfiguration customConfig = new PropertiesConfiguration(); customProps.forEach((key, value) -> customConfig.setProperty(key, value)); - PredownloadScheduler customScheduler = new PredownloadScheduler(customConfig); + PredownloadScheduler customScheduler = new PredownloadScheduler(customConfig, false); ThreadPoolExecutor customExecutor = (ThreadPoolExecutor) customScheduler._executor; assertEquals(customExecutor.getCorePoolSize(), customParallelism, "Custom parallelism should match configured value"); @@ -349,10 +355,212 @@ public void testPredownloadParallelismConfiguration() throws Exception { PropertiesConfiguration zeroConfig = new PropertiesConfiguration(); zeroProps.forEach((key, value) -> zeroConfig.setProperty(key, value)); - PredownloadScheduler zeroScheduler = new PredownloadScheduler(zeroConfig); + PredownloadScheduler zeroScheduler = new PredownloadScheduler(zeroConfig, false); ThreadPoolExecutor zeroExecutor = (ThreadPoolExecutor) zeroScheduler._executor; assertEquals(zeroExecutor.getCorePoolSize(), expectedDefaultThreads, "Zero parallelism should fall back to default"); zeroScheduler.stop(); } + + // ── Peer download tests ──────────────────────────────────────────────────── + + private PredownloadScheduler buildPeerEnabledScheduler(PropertiesConfiguration properties) + throws Exception { + PredownloadScheduler scheduler = spy(new PredownloadScheduler(properties, true)); + scheduler._executor = Runnable::run; + + Field schemeField = PredownloadScheduler.class.getDeclaredField("_peerDownloadScheme"); + schemeField.setAccessible(true); + schemeField.set(scheduler, "http"); + + Field metricsField = PredownloadScheduler.class.getDeclaredField("_predownloadMetrics"); + metricsField.setAccessible(true); + metricsField.set(scheduler, mock(PredownloadMetrics.class)); + + return scheduler; + } + + private void injectMockZkClient(PredownloadScheduler scheduler, PredownloadZKClient mockZkClient) + throws Exception { + Field zkField = PredownloadScheduler.class.getDeclaredField("_predownloadZkClient"); + zkField.setAccessible(true); + zkField.set(scheduler, mockZkClient); + } + + private void injectSegmentState(PredownloadScheduler scheduler, + List segments, Map tableInfoMap) + throws Exception { + Field segField = PredownloadScheduler.class.getDeclaredField("_predownloadSegmentInfoList"); + segField.setAccessible(true); + segField.set(scheduler, segments); + + Field mapField = PredownloadScheduler.class.getDeclaredField("_tableInfoMap"); + mapField.setAccessible(true); + mapField.set(scheduler, tableInfoMap); + } + + @Test + public void testPeerDownloadEnabledViaConstructor() + throws Exception { + String propertiesFilePath = + this.getClass().getClassLoader().getResource(SAMPLE_PROPERTIES_FILE_NAME).getPath(); + PropertiesConfiguration properties = CommonsConfigurationUtils.fromPath(propertiesFilePath); + + Field enabledField = PredownloadScheduler.class.getDeclaredField("_peerDownloadEnabled"); + enabledField.setAccessible(true); + + PredownloadScheduler disabled = new PredownloadScheduler(properties, false); + assertEquals(enabledField.get(disabled), false); + disabled.stop(); + + PredownloadScheduler enabled = new PredownloadScheduler(properties, true); + assertEquals(enabledField.get(enabled), true); + enabled.stop(); + } + + @Test + public void testDownloadSegmentFallbackToPeerOnDeepStoreFailure() + throws Exception { + String propertiesFilePath = + this.getClass().getClassLoader().getResource(SAMPLE_PROPERTIES_FILE_NAME).getPath(); + PropertiesConfiguration properties = CommonsConfigurationUtils.fromPath(propertiesFilePath); + setUp(properties); + + PredownloadScheduler scheduler = buildPeerEnabledScheduler(properties); + PredownloadZKClient mockZkClient = mock(PredownloadZKClient.class); + when(mockZkClient.getPeerServerURIs(any(), anyString(), anyString(), anyString())).thenReturn(new ArrayList<>()); + injectMockZkClient(scheduler, mockZkClient); + + PredownloadSegmentInfo segment = new PredownloadSegmentInfo(TABLE_NAME, SEGMENT_NAME); + segment.updateSegmentInfo(createSegmentZKMetadata()); + scheduler._failedSegments.add(SEGMENT_NAME); + + File testFolder = new File(_temporaryFolder, "peerFallbackTest"); + testFolder.mkdirs(); + String dataDir = testFolder.getAbsolutePath(); + int lastIndex = dataDir.lastIndexOf(File.separator); + when(_predownloadTableInfo.getInstanceDataManagerConfig()).thenReturn(_instanceDataManagerConfig); + when(_predownloadTableInfo.getTableConfig()).thenReturn(_tableConfig); + when(_instanceDataManagerConfig.getInstanceDataDir()).thenReturn(dataDir.substring(0, lastIndex)); + when(_tableConfig.getTableName()).thenReturn(dataDir.substring(lastIndex + 1)); + when(_predownloadTableInfo.loadSegmentFromLocal(any())).thenReturn(false); + injectSegmentState(scheduler, List.of(segment), Map.of(TABLE_NAME, _predownloadTableInfo)); + + try (MockedStatic sfMock = mockStatic(SegmentFetcherFactory.class)) { + // 3-arg (deep store) throws — simulating exhausted deep store retries + sfMock.when( + () -> SegmentFetcherFactory.fetchAndDecryptSegmentToLocal(anyString(), any(File.class), anyString())) + .thenThrow(new AttemptsExceededException("deep store failed", 3)); + // 5-arg (peer) succeeds — supplier lambda is not invoked by mock, so no ZK call needed + sfMock.when( + () -> SegmentFetcherFactory.fetchAndDecryptSegmentToLocal( + anyString(), anyString(), any(), any(File.class), anyString())) + .thenAnswer(inv -> null); + + try (MockedStatic tarMock = mockStatic(TarCompressionUtils.class)) { + tarMock.when(() -> TarCompressionUtils.untar(any(File.class), any(File.class))) + .thenAnswer(inv -> { + File untarDir = new File(testFolder, "untared_peer"); + untarDir.mkdirs(); + return List.of(untarDir); + }); + scheduler.downloadSegment(segment); + } + } + + assertFalse(scheduler._failedSegments.contains(SEGMENT_NAME), + "Segment should be removed from failed set after successful peer download"); + scheduler._executor = null; // lambda can't be cast to ThreadPoolExecutor + scheduler.stop(); + } + + @Test + public void testDownloadSegmentNoPeerFallbackWhenPeerDisabled() + throws Exception { + String propertiesFilePath = + this.getClass().getClassLoader().getResource(SAMPLE_PROPERTIES_FILE_NAME).getPath(); + PropertiesConfiguration properties = CommonsConfigurationUtils.fromPath(propertiesFilePath); + setUp(properties); + + // peerDownloadEnabled=false — no fallback to peer + PredownloadScheduler scheduler = spy(new PredownloadScheduler(properties, false)); + scheduler._executor = Runnable::run; + Field metricsField = PredownloadScheduler.class.getDeclaredField("_predownloadMetrics"); + metricsField.setAccessible(true); + metricsField.set(scheduler, mock(PredownloadMetrics.class)); + + PredownloadSegmentInfo segment = new PredownloadSegmentInfo(TABLE_NAME, SEGMENT_NAME); + segment.updateSegmentInfo(createSegmentZKMetadata()); + + File testFolder = new File(_temporaryFolder, "noPeerTest"); + testFolder.mkdirs(); + String dataDir = testFolder.getAbsolutePath(); + int lastIndex = dataDir.lastIndexOf(File.separator); + when(_predownloadTableInfo.getInstanceDataManagerConfig()).thenReturn(_instanceDataManagerConfig); + when(_predownloadTableInfo.getTableConfig()).thenReturn(_tableConfig); + when(_instanceDataManagerConfig.getInstanceDataDir()).thenReturn(dataDir.substring(0, lastIndex)); + when(_tableConfig.getTableName()).thenReturn(dataDir.substring(lastIndex + 1)); + injectSegmentState(scheduler, List.of(segment), Map.of(TABLE_NAME, _predownloadTableInfo)); + + try (MockedStatic sfMock = mockStatic(SegmentFetcherFactory.class)) { + sfMock.when( + () -> SegmentFetcherFactory.fetchAndDecryptSegmentToLocal(anyString(), any(File.class), anyString())) + .thenThrow(new AttemptsExceededException("deep store failed", 3)); + + try (MockedStatic tarMock = mockStatic(TarCompressionUtils.class)) { + assertThrows(AttemptsExceededException.class, () -> scheduler.downloadSegment(segment)); + } + } + + assertTrue(scheduler._failedSegments.contains(SEGMENT_NAME), + "Segment should remain in failed set when peer download is disabled"); + scheduler._executor = null; + scheduler.stop(); + } + + @Test + public void testDownloadSegmentBothDeepStoreAndPeerFail() + throws Exception { + String propertiesFilePath = + this.getClass().getClassLoader().getResource(SAMPLE_PROPERTIES_FILE_NAME).getPath(); + PropertiesConfiguration properties = CommonsConfigurationUtils.fromPath(propertiesFilePath); + setUp(properties); + + PredownloadScheduler scheduler = buildPeerEnabledScheduler(properties); + PredownloadZKClient mockZkClient = mock(PredownloadZKClient.class); + when(mockZkClient.getPeerServerURIs(any(), anyString(), anyString(), anyString())).thenReturn(new ArrayList<>()); + injectMockZkClient(scheduler, mockZkClient); + + PredownloadSegmentInfo segment = new PredownloadSegmentInfo(TABLE_NAME, SEGMENT_NAME); + segment.updateSegmentInfo(createSegmentZKMetadata()); + + File testFolder = new File(_temporaryFolder, "bothFailTest"); + testFolder.mkdirs(); + String dataDir = testFolder.getAbsolutePath(); + int lastIndex = dataDir.lastIndexOf(File.separator); + when(_predownloadTableInfo.getInstanceDataManagerConfig()).thenReturn(_instanceDataManagerConfig); + when(_predownloadTableInfo.getTableConfig()).thenReturn(_tableConfig); + when(_instanceDataManagerConfig.getInstanceDataDir()).thenReturn(dataDir.substring(0, lastIndex)); + when(_tableConfig.getTableName()).thenReturn(dataDir.substring(lastIndex + 1)); + injectSegmentState(scheduler, List.of(segment), Map.of(TABLE_NAME, _predownloadTableInfo)); + + try (MockedStatic sfMock = mockStatic(SegmentFetcherFactory.class)) { + sfMock.when( + () -> SegmentFetcherFactory.fetchAndDecryptSegmentToLocal(anyString(), any(File.class), anyString())) + .thenThrow(new AttemptsExceededException("deep store failed", 3)); + sfMock.when( + () -> SegmentFetcherFactory.fetchAndDecryptSegmentToLocal( + anyString(), anyString(), any(), any(File.class), anyString())) + .thenThrow(new AttemptsExceededException("peer download failed", 3)); + + try (MockedStatic tarMock = mockStatic(TarCompressionUtils.class)) { + assertThrows(AttemptsExceededException.class, () -> scheduler.downloadSegment(segment)); + } + } + + assertTrue(scheduler._failedSegments.contains(SEGMENT_NAME), + "Segment should be in failed set when both deep store and peer download fail"); + scheduler._executor = null; + scheduler.stop(); + } } diff --git a/pinot-server/src/test/java/org/apache/pinot/server/predownload/PredownloadZKClientTest.java b/pinot-server/src/test/java/org/apache/pinot/server/predownload/PredownloadZKClientTest.java index fda970a42727..3867c1927d75 100644 --- a/pinot-server/src/test/java/org/apache/pinot/server/predownload/PredownloadZKClientTest.java +++ b/pinot-server/src/test/java/org/apache/pinot/server/predownload/PredownloadZKClientTest.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.server.predownload; +import java.net.URI; import java.util.Arrays; import java.util.HashMap; import java.util.List; @@ -25,15 +26,20 @@ import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixException; import org.apache.helix.PropertyKey; +import org.apache.helix.PropertyType; import org.apache.helix.model.CurrentState; +import org.apache.helix.model.ExternalView; +import org.apache.helix.model.InstanceConfig; import org.apache.helix.model.LiveInstance; import org.apache.helix.zookeeper.api.client.HelixZkClient; +import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.helix.zookeeper.impl.factory.SharedZkClientFactory; import org.apache.pinot.common.metadata.ZKMetadataProvider; import org.apache.pinot.server.starter.helix.HelixInstanceDataManagerConfig; import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.utils.CommonConstants; import org.mockito.MockedStatic; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -41,6 +47,7 @@ import static org.apache.pinot.server.predownload.PredownloadTestUtil.*; import static org.mockito.ArgumentMatchers.*; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mockStatic; import static org.mockito.Mockito.spy; @@ -182,4 +189,69 @@ public void testUpdateSegmentMetadata() assertEquals(predownloadSegmentInfoList.get(1).getCrc(), 0); } } + + @Test + public void testGetPeerServerURIs() + throws Exception { + // Pass accessor directly — no spy needed since method signature accepts accessor as param + HelixDataAccessor dataAccessor = mock(HelixDataAccessor.class); + when(dataAccessor.keyBuilder()).thenReturn(new PropertyKey.Builder(CLUSTER_NAME)); + + ExternalView externalView = mock(ExternalView.class); + String peerInstanceId = "Server_peer_8098"; + InstanceConfig peerConfig = mock(InstanceConfig.class); + when(peerConfig.getHostName()).thenReturn("peerHost"); + ZNRecord peerRecord = new ZNRecord(peerInstanceId); + peerRecord.setIntField(CommonConstants.Helix.Instance.ADMIN_PORT_KEY, 9000); + when(peerConfig.getRecord()).thenReturn(peerRecord); + + // Case 1: ExternalView is null → empty list + doAnswer(inv -> null).when(dataAccessor).getProperty(any(PropertyKey.class)); + assertTrue(_predownloadZkClient.getPeerServerURIs(dataAccessor, TABLE_NAME, SEGMENT_NAME, "http").isEmpty(), + "Should return empty list when ExternalView is null"); + + // Case 2: ExternalView found but segment has no state map → empty list + doAnswer(inv -> { + PropertyKey key = inv.getArgument(0); + return (key.getType() == PropertyType.EXTERNALVIEW) ? externalView : null; + }).when(dataAccessor).getProperty(any(PropertyKey.class)); + when(externalView.getStateMap(SEGMENT_NAME)).thenReturn(null); + assertTrue(_predownloadZkClient.getPeerServerURIs(dataAccessor, TABLE_NAME, SEGMENT_NAME, "http").isEmpty(), + "Should return empty list when segment not in ExternalView"); + + // Case 3: Only self ONLINE → empty list + // In @BeforeClass: new PredownloadZKClient(ZK_ADDRESS, INSTANCE_ID, CLUSTER_NAME) + // → _instanceName = CLUSTER_NAME + when(externalView.getStateMap(SEGMENT_NAME)).thenReturn(Map.of(CLUSTER_NAME, "ONLINE")); + assertTrue(_predownloadZkClient.getPeerServerURIs(dataAccessor, TABLE_NAME, SEGMENT_NAME, "http").isEmpty(), + "Should exclude self instance from peer list"); + + // Case 4: peer ONLINE + one OFFLINE → only peer returned (self-exclusion tested in Case 3) + Map stateMap = new HashMap<>(); + stateMap.put(peerInstanceId, "ONLINE"); // peer → included + stateMap.put("Server_offline", "OFFLINE"); // offline → excluded + when(externalView.getStateMap(SEGMENT_NAME)).thenReturn(stateMap); + doAnswer(inv -> { + PropertyKey key = inv.getArgument(0); + return (key.getType() == PropertyType.EXTERNALVIEW) ? externalView : peerConfig; + }).when(dataAccessor).getProperty(any(PropertyKey.class)); + + List uris = _predownloadZkClient.getPeerServerURIs(dataAccessor, TABLE_NAME, SEGMENT_NAME, "http"); + assertEquals(uris.size(), 1, "Should return exactly one peer URI"); + String uriStr = uris.get(0).toString(); + assertTrue(uriStr.contains("peerHost"), "URI should contain peer hostname"); + assertTrue(uriStr.contains("9000"), "URI should contain peer admin port"); + assertTrue(uriStr.contains(TABLE_NAME), "URI should contain table name"); + assertTrue(uriStr.contains(SEGMENT_NAME), "URI should contain segment name"); + assertTrue(uriStr.startsWith("http://"), "URI should use http scheme"); + + // Case 5: Peer InstanceConfig is null → skipped, empty list + when(externalView.getStateMap(SEGMENT_NAME)).thenReturn(Map.of(peerInstanceId, "ONLINE")); + doAnswer(inv -> { + PropertyKey key = inv.getArgument(0); + return (key.getType() == PropertyType.EXTERNALVIEW) ? externalView : null; + }).when(dataAccessor).getProperty(any(PropertyKey.class)); + assertTrue(_predownloadZkClient.getPeerServerURIs(dataAccessor, TABLE_NAME, SEGMENT_NAME, "http").isEmpty(), + "Should skip peer when its InstanceConfig is null"); + } }