Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import com.google.common.annotations.VisibleForTesting;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -72,6 +74,8 @@ public class PredownloadScheduler {
@VisibleForTesting
Set<String> _failedSegments;
private PredownloadMetrics _predownloadMetrics;
private boolean _peerDownloadEnabled;
private String _peerDownloadScheme;
private int _numOfSkippedSegments;
private int _numOfUnableToDownloadSegments;
private int _numOfDownloadSegments;
Expand All @@ -80,7 +84,7 @@ public class PredownloadScheduler {
private List<PredownloadSegmentInfo> _predownloadSegmentInfoList;
private Map<String, PredownloadTableInfo> _tableInfoMap;

public PredownloadScheduler(PropertiesConfiguration properties)
public PredownloadScheduler(PropertiesConfiguration properties, boolean peerDownloadEnabled)
throws Exception {
_properties = properties;
_clusterName = properties.getString(CommonConstants.Helix.CONFIG_OF_CLUSTER_NAME);
Expand All @@ -107,6 +111,11 @@ public PredownloadScheduler(PropertiesConfiguration properties)
_failedSegments = ConcurrentHashMap.newKeySet();
_executor = Executors.newFixedThreadPool(predownloadParallelism);
LOGGER.info("Created thread pool with num of threads: {}", predownloadParallelism);

_peerDownloadEnabled = peerDownloadEnabled;
_peerDownloadScheme = _instanceDataManagerConfig.getSegmentPeerDownloadScheme();
LOGGER.info("Peer download enabled: {}, scheme: {}", _peerDownloadEnabled, _peerDownloadScheme);

_numOfSkippedSegments = 0;
_numOfDownloadSegments = 0;
}
Expand Down Expand Up @@ -301,23 +310,15 @@ void downloadSegment(PredownloadSegmentInfo predownloadSegmentInfo)
throws Exception {
try {
long startTime = System.currentTimeMillis();
File tempRootDir = getTmpSegmentDataDir(predownloadSegmentInfo);
if (_instanceDataManagerConfig.isStreamSegmentDownloadUntar()
&& predownloadSegmentInfo.getCrypterName() == null) {
try {
// TODO: increase rate limit here
File untaredSegDir = downloadAndStreamUntarWithRateLimit(predownloadSegmentInfo, tempRootDir,
_instanceDataManagerConfig.getStreamSegmentDownloadUntarRateLimit());
moveSegment(predownloadSegmentInfo, untaredSegDir);
} finally {
FileUtils.deleteQuietly(tempRootDir);
}
} else {
try {
File tarFile = downloadAndDecrypt(predownloadSegmentInfo, tempRootDir);
untarAndMoveSegment(predownloadSegmentInfo, tarFile, tempRootDir);
} finally {
FileUtils.deleteQuietly(tempRootDir);
try {
downloadFromDeepStore(predownloadSegmentInfo);
} catch (Exception e) {
if (_peerDownloadEnabled && _peerDownloadScheme != null) {
LOGGER.warn("Deep store download failed for segment: {} of table: {}, falling back to peer download",
predownloadSegmentInfo.getSegmentName(), predownloadSegmentInfo.getTableNameWithType(), e);
downloadFromPeers(predownloadSegmentInfo);
} else {
throw e;
}
}
_failedSegments.remove(predownloadSegmentInfo.getSegmentName());
Expand All @@ -336,6 +337,52 @@ void downloadSegment(PredownloadSegmentInfo predownloadSegmentInfo)
}
}

private void downloadFromDeepStore(PredownloadSegmentInfo predownloadSegmentInfo)
throws Exception {
File tempRootDir = getTmpSegmentDataDir(predownloadSegmentInfo);
if (_instanceDataManagerConfig.isStreamSegmentDownloadUntar()
&& predownloadSegmentInfo.getCrypterName() == null) {
try {
File untaredSegDir = downloadAndStreamUntarWithRateLimit(predownloadSegmentInfo, tempRootDir,
_instanceDataManagerConfig.getStreamSegmentDownloadUntarRateLimit());
moveSegment(predownloadSegmentInfo, untaredSegDir);
} finally {
FileUtils.deleteQuietly(tempRootDir);
}
} else {
try {
File tarFile = downloadAndDecrypt(predownloadSegmentInfo, tempRootDir);
untarAndMoveSegment(predownloadSegmentInfo, tarFile, tempRootDir);
} finally {
FileUtils.deleteQuietly(tempRootDir);
}
}
}

private void downloadFromPeers(PredownloadSegmentInfo predownloadSegmentInfo)
throws Exception {
String segmentName = predownloadSegmentInfo.getSegmentName();
String tableNameWithType = predownloadSegmentInfo.getTableNameWithType();
LOGGER.info("Downloading segment: {} of table: {} from peers using scheme: {}", segmentName, tableNameWithType,
_peerDownloadScheme);
File tempRootDir = getTmpSegmentDataDir(predownloadSegmentInfo);
try {
File segmentTarFile = new File(tempRootDir, segmentName + TarCompressionUtils.TAR_GZ_FILE_EXTENSION);
SegmentFetcherFactory.fetchAndDecryptSegmentToLocal(segmentName, _peerDownloadScheme, () -> {
List<URI> peerURIs =
_predownloadZkClient.getPeerServerURIs(_predownloadZkClient.getDataAccessor(), tableNameWithType,
segmentName, _peerDownloadScheme);
Collections.shuffle(peerURIs);
return peerURIs;
}, segmentTarFile, predownloadSegmentInfo.getCrypterName());
LOGGER.info("Downloaded segment: {} from peers to: {}, file length: {}", segmentName, segmentTarFile,
segmentTarFile.length());
untarAndMoveSegment(predownloadSegmentInfo, segmentTarFile, tempRootDir);
} finally {
FileUtils.deleteQuietly(tempRootDir);
}
}

private File getTmpSegmentDataDir(PredownloadSegmentInfo predownloadSegmentInfo)
throws Exception {
PredownloadTableInfo predownloadTableInfo = _tableInfoMap.get(predownloadSegmentInfo.getTableNameWithType());
Expand Down Expand Up @@ -395,7 +442,6 @@ File downloadAndDecrypt(PredownloadSegmentInfo predownloadSegmentInfo, File temp
} catch (AttemptsExceededException e) {
LOGGER.error("Attempts exceeded when downloading segment: {} for table: {} from: {} to: {}", segmentName,
tableNameWithType, uri, tarFile);
// TODO: add download from peer logic
throw e;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pinot.server.predownload;

import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand All @@ -28,6 +29,7 @@
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.store.zk.AutoFallbackPropertyStore;
Expand Down Expand Up @@ -147,6 +149,51 @@ public List<PredownloadSegmentInfo> getSegmentsOfInstance(HelixDataAccessor acce
return predownloadSegmentInfos;
}

/**
* Returns URIs of ONLINE peer servers hosting the given segment, excluding the current instance.
* Uses ExternalView to discover peers — mirrors PeerServerSegmentFinder without requiring HelixManager.
*/
public List<URI> getPeerServerURIs(HelixDataAccessor accessor, String tableNameWithType, String segmentName,
String downloadScheme) {
org.apache.helix.PropertyKey.Builder keyBuilder = accessor.keyBuilder();
ExternalView externalView = accessor.getProperty(keyBuilder.externalView(tableNameWithType));
if (externalView == null) {
LOGGER.warn("No external view for table: {} when finding peers for segment: {}", tableNameWithType, segmentName);
return new ArrayList<>();
}
Map<String, String> instanceStateMap = externalView.getStateMap(segmentName);
if (instanceStateMap == null) {
LOGGER.warn("Segment: {} not found in external view of table: {}", segmentName, tableNameWithType);
return new ArrayList<>();
}
String adminPortKey = CommonConstants.HTTP_PROTOCOL.equals(downloadScheme)
? CommonConstants.Helix.Instance.ADMIN_PORT_KEY
: CommonConstants.Helix.Instance.ADMIN_HTTPS_PORT_KEY;
List<URI> peerURIs = new ArrayList<>();
for (Map.Entry<String, String> entry : instanceStateMap.entrySet()) {
String instanceId = entry.getKey();
if (!CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE.equals(entry.getValue())) {
continue;
}
InstanceConfig instanceConfig = accessor.getProperty(keyBuilder.instanceConfig(instanceId));
if (instanceConfig == null) {
LOGGER.warn("Failed to get instance config for peer: {}", instanceId);
continue;
}
String hostName = instanceConfig.getHostName();
int port = instanceConfig.getRecord()
.getIntField(adminPortKey, CommonConstants.Server.DEFAULT_ADMIN_API_PORT);
try {
peerURIs.add(new URI(
String.format("%s://%s:%d/segments/%s/%s",
downloadScheme, hostName, port, tableNameWithType, segmentName)));
} catch (Exception e) {
LOGGER.warn("Failed to construct peer URI for instance: {}", instanceId, e);
}
}
return peerURIs;
}

/**
* Update the segment deepstore metadata from ZK.
*
Expand Down
Loading