Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,10 @@
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.util.TableMetadataReader;
import org.apache.pinot.segment.spi.creator.name.SegmentNameUtils;
import org.apache.pinot.spi.config.table.DedupConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.UpsertConfig;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
Expand Down Expand Up @@ -86,6 +89,7 @@ public SuccessResponse reloadSegment(String tableName, String segmentName, boole
long startTimeMs = System.currentTimeMillis();
segmentName = URIUtils.decode(segmentName);
String tableNameWithType = getExistingTable(tableName, segmentName);
validateForceDownloadAllowed(tableNameWithType, forceDownload);
Pair<Integer, String> msgInfo =
_pinotHelixResourceManager.reloadSegment(tableNameWithType, segmentName, forceDownload, targetInstance);
boolean zkJobMetaWriteSuccess = false;
Expand Down Expand Up @@ -136,6 +140,9 @@ public SuccessResponse reloadAllSegments(String tableName, @Nullable String tabl
}
List<String> tableNamesWithType =
ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, tableTypeFromRequest, LOG);
for (String tableNameWithType : tableNamesWithType) {
validateForceDownloadAllowed(tableNameWithType, forceDownload);
}
if (instanceToSegmentsMapInJson != null) {
Map<String, List<String>> instanceToSegmentsMap =
JsonUtils.stringToObject(instanceToSegmentsMapInJson, new TypeReference<>() {
Expand Down Expand Up @@ -223,6 +230,9 @@ public SuccessResponse reloadSegmentsInTimeRange(String tableName, @Nullable Str
TableType tableTypeFromRequest = resolveTableTypeForReload(tableName, tableTypeStr, forceDownload);
List<String> tableNamesWithType =
ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, tableTypeFromRequest, LOG);
for (String tableNameWithType : tableNamesWithType) {
validateForceDownloadAllowed(tableNameWithType, forceDownload);
}
Map<String, Map<String, String>> perTableMsgData = new LinkedHashMap<>();
for (String tableNameWithType : tableNamesWithType) {
List<String> segments =
Expand Down Expand Up @@ -331,6 +341,43 @@ public String needReload(String tableNameWithType, boolean verbose, HttpHeaders
}


/**
* Blocks force-download reload on upsert/dedup tables that use TTL-based metadata cleanup.
* Force download rebuilds the segment from the original deep-store copy, which still holds keys that the TTL has
* already expired or deleted, so it would resurrect those keys and return wrong results.
*/
private void validateForceDownloadAllowed(String tableNameWithType, boolean forceDownload) {
if (!forceDownload) {
return;
}
TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType);
if (tableConfig == null) {
// The table name was just resolved from an existing table, so a null config means the read failed. Fail closed
// instead of letting a potentially unsafe force download through.
throw new ControllerApplicationException(LOG,
String.format("Cannot validate forceDownload for table: %s because its table config could not be read",
tableNameWithType), Response.Status.INTERNAL_SERVER_ERROR);
}
if (tableConfig.isUpsertEnabled()) {
UpsertConfig upsertConfig = tableConfig.getUpsertConfig();
if (upsertConfig.getMetadataTTL() > 0 || upsertConfig.getDeletedKeysTTL() > 0) {
throw new ControllerApplicationException(LOG, String.format(
"Cannot reload table: %s with forceDownload=true because upsert metadataTTL/deletedKeysTTL is enabled; "
+ "force download discards the upsert metadata and may resurrect expired or deleted keys",
tableNameWithType), Response.Status.BAD_REQUEST);
}
}
if (tableConfig.isDedupEnabled()) {
DedupConfig dedupConfig = tableConfig.getDedupConfig();
if (dedupConfig.getMetadataTTL() > 0) {
throw new ControllerApplicationException(LOG, String.format(
"Cannot reload table: %s with forceDownload=true because dedup metadataTTL is enabled; "
+ "force download discards the dedup metadata and may resurrect expired keys",
tableNameWithType), Response.Status.BAD_REQUEST);
}
}
}

/**
* Resolves the effective table type for a reload request.
* When forceDownload is requested and no table type is specified (neither in the table name nor the request),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,30 @@
import java.util.Map;
import java.util.concurrent.Executor;
import javax.ws.rs.core.Response;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hc.client5.http.io.HttpClientConnectionManager;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.api.exception.ControllerApplicationException;
import org.apache.pinot.controller.api.resources.SuccessResponse;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.segment.spi.creator.name.SegmentNameUtils;
import org.apache.pinot.spi.config.table.DedupConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.UpsertConfig;
import org.apache.pinot.spi.utils.JsonUtils;
import org.mockito.ArgumentCaptor;
import org.testng.annotations.Test;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -151,6 +158,8 @@ public void testReloadSegmentsInTimeRangeForceDownloadDefaultsToOffline() throws

when(helixResourceManager.getExistingTableNamesWithType("rawTable", TableType.OFFLINE))
.thenReturn(List.of("rawTable_OFFLINE"));
// Non-upsert, non-dedup table: force download is permitted, so the request proceeds to segment selection.
when(helixResourceManager.getTableConfig("rawTable_OFFLINE")).thenReturn(mock(TableConfig.class));
when(helixResourceManager.getSegmentsFor("rawTable_OFFLINE", true, 0L, 10L, false)).thenReturn(List.of());

ControllerApplicationException exception =
Expand Down Expand Up @@ -356,6 +365,188 @@ public void testReloadSegmentsInTimeRangeRejectsStartNotLessThanEnd() {
verifyNoInteractions(helixResourceManager);
}

@Test
public void testReloadAllSegmentsForceDownloadBlockedForUpsertMetadataTtl() throws Exception {
PinotHelixResourceManager helixResourceManager = mock(PinotHelixResourceManager.class);
PinotTableReloadService service = new PinotTableReloadService(helixResourceManager, new ControllerConf(),
mock(Executor.class), mock(HttpClientConnectionManager.class));

String tableNameWithType = "myTable_REALTIME";
when(helixResourceManager.getExistingTableNamesWithType("myTable", TableType.REALTIME))
.thenReturn(List.of(tableNameWithType));
UpsertConfig upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
upsertConfig.setMetadataTTL(3600);
TableConfig tableConfig = mock(TableConfig.class);
when(tableConfig.isUpsertEnabled()).thenReturn(true);
when(tableConfig.getUpsertConfig()).thenReturn(upsertConfig);
when(helixResourceManager.getTableConfig(tableNameWithType)).thenReturn(tableConfig);

ControllerApplicationException exception =
expectThrows(ControllerApplicationException.class,
() -> service.reloadAllSegments("myTable", "REALTIME", true, null, null, null, null, false, null));
assertEquals(exception.getResponse().getStatus(), Response.Status.BAD_REQUEST.getStatusCode());
verify(helixResourceManager, never()).reloadAllSegments(anyString(), anyBoolean(), any());
}

@Test
public void testReloadAllSegmentsForceDownloadBlockedForUpsertDeletedKeysTtl() throws Exception {
PinotHelixResourceManager helixResourceManager = mock(PinotHelixResourceManager.class);
PinotTableReloadService service = new PinotTableReloadService(helixResourceManager, new ControllerConf(),
mock(Executor.class), mock(HttpClientConnectionManager.class));

String tableNameWithType = "myTable_REALTIME";
when(helixResourceManager.getExistingTableNamesWithType("myTable", TableType.REALTIME))
.thenReturn(List.of(tableNameWithType));
UpsertConfig upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
upsertConfig.setDeletedKeysTTL(3600);
TableConfig tableConfig = mock(TableConfig.class);
when(tableConfig.isUpsertEnabled()).thenReturn(true);
when(tableConfig.getUpsertConfig()).thenReturn(upsertConfig);
when(helixResourceManager.getTableConfig(tableNameWithType)).thenReturn(tableConfig);

ControllerApplicationException exception =
expectThrows(ControllerApplicationException.class,
() -> service.reloadAllSegments("myTable", "REALTIME", true, null, null, null, null, false, null));
assertEquals(exception.getResponse().getStatus(), Response.Status.BAD_REQUEST.getStatusCode());
verify(helixResourceManager, never()).reloadAllSegments(anyString(), anyBoolean(), any());
}

@Test
public void testReloadAllSegmentsForceDownloadBlockedForDedupMetadataTtl() throws Exception {
PinotHelixResourceManager helixResourceManager = mock(PinotHelixResourceManager.class);
PinotTableReloadService service = new PinotTableReloadService(helixResourceManager, new ControllerConf(),
mock(Executor.class), mock(HttpClientConnectionManager.class));

String tableNameWithType = "myTable_REALTIME";
when(helixResourceManager.getExistingTableNamesWithType("myTable", TableType.REALTIME))
.thenReturn(List.of(tableNameWithType));
DedupConfig dedupConfig = new DedupConfig();
dedupConfig.setMetadataTTL(3600);
TableConfig tableConfig = mock(TableConfig.class);
when(tableConfig.isDedupEnabled()).thenReturn(true);
when(tableConfig.getDedupConfig()).thenReturn(dedupConfig);
when(helixResourceManager.getTableConfig(tableNameWithType)).thenReturn(tableConfig);

ControllerApplicationException exception =
expectThrows(ControllerApplicationException.class,
() -> service.reloadAllSegments("myTable", "REALTIME", true, null, null, null, null, false, null));
assertEquals(exception.getResponse().getStatus(), Response.Status.BAD_REQUEST.getStatusCode());
verify(helixResourceManager, never()).reloadAllSegments(anyString(), anyBoolean(), any());
}

@Test
public void testReloadAllSegmentsForceDownloadAllowedForUpsertWithoutTtl() throws Exception {
PinotHelixResourceManager helixResourceManager = mock(PinotHelixResourceManager.class);
PinotTableReloadService service = new PinotTableReloadService(helixResourceManager, new ControllerConf(),
mock(Executor.class), mock(HttpClientConnectionManager.class));

String tableNameWithType = "myTable_REALTIME";
when(helixResourceManager.getExistingTableNamesWithType("myTable", TableType.REALTIME))
.thenReturn(List.of(tableNameWithType));
// Upsert enabled but no TTL configured: force download must be allowed.
UpsertConfig upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
TableConfig tableConfig = mock(TableConfig.class);
when(tableConfig.isUpsertEnabled()).thenReturn(true);
when(tableConfig.getUpsertConfig()).thenReturn(upsertConfig);
when(helixResourceManager.getTableConfig(tableNameWithType)).thenReturn(tableConfig);
when(helixResourceManager.reloadAllSegments(tableNameWithType, true, null)).thenReturn(Pair.of(1, "job1"));
when(helixResourceManager.addNewReloadAllSegmentsJob(eq(tableNameWithType), eq(null), eq("job1"), anyLong(), eq(1)))
.thenReturn(true);

service.reloadAllSegments("myTable", "REALTIME", true, null, null, null, null, false, null);

verify(helixResourceManager).reloadAllSegments(tableNameWithType, true, null);
}

@Test
public void testReloadSegmentForceDownloadBlockedForUpsertMetadataTtl() throws Exception {
PinotHelixResourceManager helixResourceManager = mock(PinotHelixResourceManager.class);
PinotTableReloadService service = new PinotTableReloadService(helixResourceManager, new ControllerConf(),
mock(Executor.class), mock(HttpClientConnectionManager.class));

String tableNameWithType = "myTable_OFFLINE";
when(helixResourceManager.getExistingTableNamesWithType("myTable", TableType.OFFLINE))
.thenReturn(List.of(tableNameWithType));
UpsertConfig upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
upsertConfig.setMetadataTTL(3600);
TableConfig tableConfig = mock(TableConfig.class);
when(tableConfig.isUpsertEnabled()).thenReturn(true);
when(tableConfig.getUpsertConfig()).thenReturn(upsertConfig);
when(helixResourceManager.getTableConfig(tableNameWithType)).thenReturn(tableConfig);

ControllerApplicationException exception =
expectThrows(ControllerApplicationException.class,
() -> service.reloadSegment("myTable", "myTable_seg_0", true, null, null));
assertEquals(exception.getResponse().getStatus(), Response.Status.BAD_REQUEST.getStatusCode());
verify(helixResourceManager, never()).reloadSegment(anyString(), anyString(), anyBoolean(), any());
}

@Test
public void testReloadSegmentsInTimeRangeForceDownloadBlockedForUpsertMetadataTtl() throws Exception {
PinotHelixResourceManager helixResourceManager = mock(PinotHelixResourceManager.class);
PinotTableReloadService service = new PinotTableReloadService(helixResourceManager, new ControllerConf(),
mock(Executor.class), mock(HttpClientConnectionManager.class));

String tableNameWithType = "myTable_REALTIME";
when(helixResourceManager.getExistingTableNamesWithType("myTable", TableType.REALTIME))
.thenReturn(List.of(tableNameWithType));
UpsertConfig upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
upsertConfig.setMetadataTTL(3600);
TableConfig tableConfig = mock(TableConfig.class);
when(tableConfig.isUpsertEnabled()).thenReturn(true);
when(tableConfig.getUpsertConfig()).thenReturn(upsertConfig);
when(helixResourceManager.getTableConfig(tableNameWithType)).thenReturn(tableConfig);

ControllerApplicationException exception =
expectThrows(ControllerApplicationException.class,
() -> service.reloadAllSegments("myTable", "REALTIME", true, null, null, "1000", "2000", false, null));
assertEquals(exception.getResponse().getStatus(), Response.Status.BAD_REQUEST.getStatusCode());
verify(helixResourceManager, never()).getSegmentsFor(anyString(), anyBoolean(), anyLong(), anyLong(), anyBoolean());
}

@Test
public void testReloadAllSegmentsForceDownloadFailsClosedWhenTableConfigUnreadable() throws Exception {
PinotHelixResourceManager helixResourceManager = mock(PinotHelixResourceManager.class);
PinotTableReloadService service = new PinotTableReloadService(helixResourceManager, new ControllerConf(),
mock(Executor.class), mock(HttpClientConnectionManager.class));

String tableNameWithType = "myTable_REALTIME";
when(helixResourceManager.getExistingTableNamesWithType("myTable", TableType.REALTIME))
.thenReturn(List.of(tableNameWithType));
// getTableConfig returns null (transient read failure) for an existing table: must fail closed, not allow.
when(helixResourceManager.getTableConfig(tableNameWithType)).thenReturn(null);

ControllerApplicationException exception =
expectThrows(ControllerApplicationException.class,
() -> service.reloadAllSegments("myTable", "REALTIME", true, null, null, null, null, false, null));
assertEquals(exception.getResponse().getStatus(), Response.Status.INTERNAL_SERVER_ERROR.getStatusCode());
verify(helixResourceManager, never()).reloadAllSegments(anyString(), anyBoolean(), any());
}

@Test
public void testReloadAllSegmentsForceDownloadAllowedForDedupWithoutTtl() throws Exception {
PinotHelixResourceManager helixResourceManager = mock(PinotHelixResourceManager.class);
PinotTableReloadService service = new PinotTableReloadService(helixResourceManager, new ControllerConf(),
mock(Executor.class), mock(HttpClientConnectionManager.class));

String tableNameWithType = "myTable_REALTIME";
when(helixResourceManager.getExistingTableNamesWithType("myTable", TableType.REALTIME))
.thenReturn(List.of(tableNameWithType));
// Dedup enabled but no TTL configured: force download must be allowed.
DedupConfig dedupConfig = new DedupConfig();
TableConfig tableConfig = mock(TableConfig.class);
when(tableConfig.isDedupEnabled()).thenReturn(true);
when(tableConfig.getDedupConfig()).thenReturn(dedupConfig);
when(helixResourceManager.getTableConfig(tableNameWithType)).thenReturn(tableConfig);
when(helixResourceManager.reloadAllSegments(tableNameWithType, true, null)).thenReturn(Pair.of(1, "job1"));
when(helixResourceManager.addNewReloadAllSegmentsJob(eq(tableNameWithType), eq(null), eq("job1"), anyLong(), eq(1)))
.thenReturn(true);

service.reloadAllSegments("myTable", "REALTIME", true, null, null, null, null, false, null);

verify(helixResourceManager).reloadAllSegments(tableNameWithType, true, null);
}

@Test
public void testReloadAllSegmentsRejectsExcludeOverlappingWithoutRange() {
PinotHelixResourceManager helixResourceManager = mock(PinotHelixResourceManager.class);
Expand Down
Loading